You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/07/17 01:56:59 UTC
[13/15] carbondata git commit: [CARBONDATA-1232] Datamap
implementation for Blocklet
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
new file mode 100644
index 0000000..defe766
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.row;
+
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+
+/**
+ * It is just a normal row to store data. Implementation classes could be safe and unsafe.
+ * TODO move this class a global row and use across loading after DataType is changed class
+ */
+public abstract class DataMapRow {
+
+ protected DataMapSchema[] schemas;
+
+ public DataMapRow(DataMapSchema[] schemas) {
+ this.schemas = schemas;
+ }
+
+ public abstract byte[] getByteArray(int ordinal);
+
+ public abstract DataMapRow getRow(int ordinal);
+
+ public abstract void setRow(DataMapRow row, int ordinal);
+
+ public abstract void setByteArray(byte[] byteArray, int ordinal);
+
+ public abstract int getInt(int ordinal);
+
+ public abstract void setInt(int value, int ordinal);
+
+ public abstract void setByte(byte value, int ordinal);
+
+ public abstract byte getByte(int ordinal);
+
+ public abstract void setShort(short value, int ordinal);
+
+ public abstract short getShort(int ordinal);
+
+ public abstract void setLong(long value, int ordinal);
+
+ public abstract long getLong(int ordinal);
+
+ public abstract void setFloat(float value, int ordinal);
+
+ public abstract float getFloat(int ordinal);
+
+ public abstract void setDouble(double value, int ordinal);
+
+ public abstract double getDouble(int ordinal);
+
+ public int getTotalSizeInBytes() {
+ int len = 0;
+ for (int i = 0; i < schemas.length; i++) {
+ len += getSizeInBytes(i);
+ }
+ return len;
+ }
+
+ public int getSizeInBytes(int ordinal) {
+ switch (schemas[ordinal].getSchemaType()) {
+ case FIXED:
+ return schemas[ordinal].getLength();
+ case VARIABLE:
+ return getByteArray(ordinal).length + 2;
+ case STRUCT:
+ return getRow(ordinal).getTotalSizeInBytes();
+ default:
+ throw new UnsupportedOperationException("wrong type");
+ }
+ }
+
+ public int getColumnCount() {
+ return schemas.length;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
new file mode 100644
index 0000000..adec346
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.row;
+
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Data map row.
+ */
+public class DataMapRowImpl extends DataMapRow {
+
+ private Object[] data;
+
+ public DataMapRowImpl(DataMapSchema[] schemas) {
+ super(schemas);
+ this.data = new Object[schemas.length];
+ }
+
+ @Override public byte[] getByteArray(int ordinal) {
+ return (byte[]) data[ordinal];
+ }
+
+ @Override public DataMapRow getRow(int ordinal) {
+ return (DataMapRow) data[ordinal];
+ }
+
+ @Override public void setByteArray(byte[] byteArray, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.BYTE_ARRAY);
+ data[ordinal] = byteArray;
+ }
+
+ @Override public int getInt(int ordinal) {
+ return (Integer) data[ordinal];
+ }
+
+ @Override public void setInt(int value, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.INT);
+ data[ordinal] = value;
+ }
+
+ @Override public void setByte(byte value, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.BYTE);
+ data[ordinal] = value;
+ }
+
+ @Override public byte getByte(int ordinal) {
+ return (Byte) data[ordinal];
+ }
+
+ @Override public void setShort(short value, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.SHORT);
+ data[ordinal] = value;
+ }
+
+ @Override public short getShort(int ordinal) {
+ return (Short) data[ordinal];
+ }
+
+ @Override public void setLong(long value, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.LONG);
+ data[ordinal] = value;
+ }
+
+ @Override public long getLong(int ordinal) {
+ return (Long) data[ordinal];
+ }
+
+ @Override public void setFloat(float value, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.FLOAT);
+ data[ordinal] = value;
+ }
+
+ @Override public float getFloat(int ordinal) {
+ return (Float) data[ordinal];
+ }
+
+ @Override public void setDouble(double value, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.DOUBLE);
+ data[ordinal] = value;
+ }
+
+ @Override public void setRow(DataMapRow row, int ordinal) {
+ assert (schemas[ordinal].getDataType() == DataType.STRUCT);
+ data[ordinal] = row;
+ }
+
+ @Override public double getDouble(int ordinal) {
+ return (Double) data[ordinal];
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
new file mode 100644
index 0000000..ef78514
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore.row;
+
+import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
+import org.apache.carbondata.core.memory.MemoryBlock;
+
+import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
+import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe;
+
+/**
+ * Unsafe implementation of data map row.
+ */
+public class UnsafeDataMapRow extends DataMapRow {
+
+ private MemoryBlock block;
+
+ private int pointer;
+
+ public UnsafeDataMapRow(DataMapSchema[] schemas, MemoryBlock block, int pointer) {
+ super(schemas);
+ this.block = block;
+ this.pointer = pointer;
+ }
+
+ @Override public byte[] getByteArray(int ordinal) {
+ int length;
+ int position = getPosition(ordinal);
+ switch (schemas[ordinal].getSchemaType()) {
+ case VARIABLE:
+ length = unsafe.getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position);
+ position += 2;
+ break;
+ default:
+ length = schemas[ordinal].getLength();
+ }
+ byte[] data = new byte[length];
+ unsafe.copyMemory(block.getBaseObject(), block.getBaseOffset() + pointer + position, data,
+ BYTE_ARRAY_OFFSET, data.length);
+ return data;
+ }
+
+ @Override public DataMapRow getRow(int ordinal) {
+ DataMapSchema[] childSchemas =
+ ((DataMapSchema.StructDataMapSchema) schemas[ordinal]).getChildSchemas();
+ return new UnsafeDataMapRow(childSchemas, block, pointer + getPosition(ordinal));
+ }
+
+ @Override public void setByteArray(byte[] byteArray, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public int getInt(int ordinal) {
+ return unsafe
+ .getInt(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+ }
+
+ @Override public void setInt(int value, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public void setByte(byte value, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public byte getByte(int ordinal) {
+ return unsafe
+ .getByte(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+ }
+
+ @Override public void setShort(short value, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public short getShort(int ordinal) {
+ return unsafe
+ .getShort(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+ }
+
+ @Override public void setLong(long value, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public long getLong(int ordinal) {
+ return unsafe
+ .getLong(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+ }
+
+ @Override public void setFloat(float value, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public float getFloat(int ordinal) {
+ return unsafe
+ .getFloat(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+ }
+
+ @Override public void setDouble(double value, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ @Override public double getDouble(int ordinal) {
+ return unsafe
+ .getDouble(block.getBaseObject(), block.getBaseOffset() + pointer + getPosition(ordinal));
+ }
+
+ @Override public void setRow(DataMapRow row, int ordinal) {
+ throw new UnsupportedOperationException("Not supported to set on unsafe row");
+ }
+
+ private int getPosition(int ordinal) {
+ int position = 0;
+ for (int i = 0; i < ordinal; i++) {
+ position += getSizeInBytes(i);
+ }
+ return position;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
new file mode 100644
index 0000000..80c68ac
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/DataMapSchema.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.schema;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * It just have 2 types right now, either fixed or variable.
+ */
+public abstract class DataMapSchema {
+
+ protected DataType dataType;
+
+ public DataMapSchema(DataType dataType) {
+ this.dataType = dataType;
+ }
+
+ /**
+ * Either fixed or variable length.
+ *
+ * @return
+ */
+ public DataType getDataType() {
+ return dataType;
+ }
+
+ /**
+ * Gives length in case of fixed schema other wise returns length
+ *
+ * @return
+ */
+ public abstract int getLength();
+
+ /**
+ * schema type
+ * @return
+ */
+ public abstract DataMapSchemaType getSchemaType();
+
+ /*
+ * It has always fixed length, length cannot be updated later.
+ * Usage examples : all primitive types like short, int etc
+ */
+ public static class FixedDataMapSchema extends DataMapSchema {
+
+ private int length;
+
+ public FixedDataMapSchema(DataType dataType) {
+ super(dataType);
+ }
+
+ public FixedDataMapSchema(DataType dataType, int length) {
+ super(dataType);
+ this.length = length;
+ }
+
+ @Override public int getLength() {
+ if (length == 0) {
+ return dataType.getSizeInBytes();
+ } else {
+ return length;
+ }
+ }
+
+ @Override public DataMapSchemaType getSchemaType() {
+ return DataMapSchemaType.FIXED;
+ }
+ }
+
+ public static class VariableDataMapSchema extends DataMapSchema {
+
+ public VariableDataMapSchema(DataType dataType) {
+ super(dataType);
+ }
+
+ @Override public int getLength() {
+ return dataType.getSizeInBytes();
+ }
+
+ @Override public DataMapSchemaType getSchemaType() {
+ return DataMapSchemaType.VARIABLE;
+ }
+ }
+
+ public static class StructDataMapSchema extends DataMapSchema {
+
+ private DataMapSchema[] childSchemas;
+
+ public StructDataMapSchema(DataType dataType, DataMapSchema[] childSchemas) {
+ super(dataType);
+ this.childSchemas = childSchemas;
+ }
+
+ @Override public int getLength() {
+ return dataType.getSizeInBytes();
+ }
+
+ public DataMapSchema[] getChildSchemas() {
+ return childSchemas;
+ }
+
+ @Override public DataMapSchemaType getSchemaType() {
+ return DataMapSchemaType.STRUCT;
+ }
+ }
+
+ public enum DataMapSchemaType {
+ FIXED, VARIABLE, STRUCT
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
new file mode 100644
index 0000000..9d77010
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/FilterType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore.schema;
+
+/**
+ * Types of filters of select query
+ */
+public enum FilterType {
+ EQUALTO, GREATER_THAN, LESS_THAN, GREATER_THAN_EQUAL, LESS_THAN_EQUAL, LIKE
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
index bfa9d7e..f81f805 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
@@ -17,16 +17,22 @@
package org.apache.carbondata.core.metadata.blocklet;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.List;
import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
+import org.apache.hadoop.io.Writable;
+
/**
* class to store the information about the blocklet
*/
-public class BlockletInfo implements Serializable {
+public class BlockletInfo implements Serializable, Writable {
/**
* serialization id
@@ -189,4 +195,49 @@ public class BlockletInfo implements Serializable {
this.numberOfPages = numberOfPages;
}
+ @Override public void write(DataOutput output) throws IOException {
+ output.writeLong(dimensionOffset);
+ output.writeLong(measureOffsets);
+ int dsize = dimensionChunkOffsets != null ? dimensionChunkOffsets.size() : 0;
+ output.writeShort(dsize);
+ for (int i = 0; i < dsize; i++) {
+ output.writeLong(dimensionChunkOffsets.get(i));
+ }
+ for (int i = 0; i < dsize; i++) {
+ output.writeInt(dimensionChunksLength.get(i));
+ }
+ int mSize = measureChunkOffsets != null ? measureChunkOffsets.size() : 0;
+ output.writeShort(mSize);
+ for (int i = 0; i < mSize; i++) {
+ output.writeLong(measureChunkOffsets.get(i));
+ }
+ for (int i = 0; i < mSize; i++) {
+ output.writeInt(measureChunksLength.get(i));
+ }
+ }
+
+ @Override public void readFields(DataInput input) throws IOException {
+ dimensionOffset = input.readLong();
+ measureOffsets = input.readLong();
+ short dimensionChunkOffsetsSize = input.readShort();
+ dimensionChunkOffsets = new ArrayList<>(dimensionChunkOffsetsSize);
+ for (int i = 0; i < dimensionChunkOffsetsSize; i++) {
+ dimensionChunkOffsets.add(input.readLong());
+ }
+ dimensionChunksLength = new ArrayList<>(dimensionChunkOffsetsSize);
+ for (int i = 0; i < dimensionChunkOffsetsSize; i++) {
+ dimensionChunksLength.add(input.readInt());
+ }
+
+ short measureChunkOffsetsSize = input.readShort();
+ measureChunkOffsets = new ArrayList<>(measureChunkOffsetsSize);
+ for (int i = 0; i < measureChunkOffsetsSize; i++) {
+ measureChunkOffsets.add(input.readLong());
+ }
+ measureChunksLength = new ArrayList<>(measureChunkOffsetsSize);
+ for (int i = 0; i < measureChunkOffsetsSize; i++) {
+ measureChunksLength.add(input.readInt());
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
index cd86a07..ae99ed8 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/index/BlockIndexInfo.java
@@ -16,6 +16,7 @@
*/
package org.apache.carbondata.core.metadata.index;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
/**
@@ -45,6 +46,11 @@ public class BlockIndexInfo {
private BlockletIndex blockletIndex;
/**
+ * to store blocklet info like offsets and lengths of each column.
+ */
+ private BlockletInfo blockletInfo;
+
+ /**
* Constructor
*
* @param numberOfRows number of rows
@@ -61,6 +67,20 @@ public class BlockIndexInfo {
}
/**
+ *
+ * @param numberOfRows
+ * @param fileName
+ * @param offset
+ * @param blockletIndex
+ * @param blockletInfo
+ */
+ public BlockIndexInfo(long numberOfRows, String fileName, long offset,
+ BlockletIndex blockletIndex, BlockletInfo blockletInfo) {
+ this(numberOfRows, fileName, offset, blockletIndex);
+ this.blockletInfo = blockletInfo;
+ }
+
+ /**
* @return the numberOfRows
*/
public long getNumberOfRows() {
@@ -87,4 +107,11 @@ public class BlockIndexInfo {
public BlockletIndex getBlockletIndex() {
return blockletIndex;
}
+
+ /**
+ * @return BlockletInfo
+ */
+ public BlockletInfo getBlockletInfo() {
+ return blockletInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index ff54673..e0ee5bb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -21,8 +21,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -41,6 +43,7 @@ import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.blockletindex.IndexWrapper;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -116,23 +119,40 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
// so block will be loaded in sorted order this will be required for
// query execution
Collections.sort(queryModel.getTableBlockInfos());
- // get the table blocks
- CacheProvider cacheProvider = CacheProvider.getInstance();
- BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex> cache =
- (BlockIndexStore) cacheProvider
- .createCache(CacheType.EXECUTOR_BTREE, queryModel.getTable().getStorePath());
- // remove the invalid table blocks, block which is deleted or compacted
- cache.removeTableBlocks(queryModel.getInvalidSegmentIds(),
- queryModel.getAbsoluteTableIdentifier());
- List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers =
- prepareTableBlockUniqueIdentifier(queryModel.getTableBlockInfos(),
- queryModel.getAbsoluteTableIdentifier());
- cache.removeTableBlocksIfHorizontalCompactionDone(queryModel);
- queryProperties.dataBlocks = cache.getAll(tableBlockUniqueIdentifiers);
- queryStatistic
- .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis());
- queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
+ if (queryModel.getTableBlockInfos().get(0).getDetailInfo() != null) {
+ List<AbstractIndex> indexList = new ArrayList<>();
+ Map<String, List<TableBlockInfo>> listMap = new LinkedHashMap<>();
+ for (TableBlockInfo blockInfo: queryModel.getTableBlockInfos()) {
+ List<TableBlockInfo> tableBlockInfos = listMap.get(blockInfo.getFilePath());
+ if (tableBlockInfos == null) {
+ tableBlockInfos = new ArrayList<>();
+ listMap.put(blockInfo.getFilePath(), tableBlockInfos);
+ }
+ tableBlockInfos.add(blockInfo);
+ }
+ for (List<TableBlockInfo> tableBlockInfos: listMap.values()) {
+ indexList.add(new IndexWrapper(tableBlockInfos));
+ }
+ queryProperties.dataBlocks = indexList;
+ } else {
+ // get the table blocks
+ CacheProvider cacheProvider = CacheProvider.getInstance();
+ BlockIndexStore<TableBlockUniqueIdentifier, AbstractIndex> cache =
+ (BlockIndexStore) cacheProvider
+ .createCache(CacheType.EXECUTOR_BTREE, queryModel.getTable().getStorePath());
+ // remove the invalid table blocks, block which is deleted or compacted
+ cache.removeTableBlocks(queryModel.getInvalidSegmentIds(),
+ queryModel.getAbsoluteTableIdentifier());
+ List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers =
+ prepareTableBlockUniqueIdentifier(queryModel.getTableBlockInfos(),
+ queryModel.getAbsoluteTableIdentifier());
+ cache.removeTableBlocksIfHorizontalCompactionDone(queryModel);
+ queryProperties.dataBlocks = cache.getAll(tableBlockUniqueIdentifiers);
+ queryStatistic
+ .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis());
+ queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
+ }
// calculating the total number of aggeragted columns
int aggTypeCount = queryModel.getQueryMeasures().size();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
index 8704496..a874835 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImpl.java
@@ -156,7 +156,7 @@ public class IncludeFilterExecuterImpl implements FilterExecuter {
int columnIndex = dimColumnEvaluatorInfo.getColumnIndex();
int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping().get(columnIndex);
- boolean isScanRequired =
+ boolean isScanRequired = blockIndex >= blkMaxVal.length ||
isScanRequired(blkMaxVal[blockIndex], blkMinVal[blockIndex], filterValues);
if (isScanRequired) {
bitSet.set(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
index 6823531..c2e077e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
@@ -287,7 +287,7 @@ public class RangeValueFilterExecuterImpl extends ValueBasedFilterExecuterImpl {
BitSet bitSet = new BitSet(1);
byte[][] filterValues = this.filterRangesValues;
int columnIndex = this.dimColEvaluatorInfo.getColumnIndex();
- boolean isScanRequired =
+ boolean isScanRequired = columnIndex >= blockMinValue.length ||
isScanRequired(blockMinValue[columnIndex], blockMaxValue[columnIndex], filterValues);
if (isScanRequired) {
bitSet.set(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
index be82be7..73352cb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -79,7 +79,7 @@ public class RowLevelRangeGrtThanFiterExecuterImpl extends RowLevelFilterExecute
@Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
BitSet bitSet = new BitSet(1);
- boolean isScanRequired =
+ boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length ||
isScanRequired(blockMaxValue[dimensionBlocksIndex[0]], filterRangeValues);
if (isScanRequired) {
bitSet.set(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index 53da6c5..6e8e188 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -81,7 +81,7 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte
@Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
BitSet bitSet = new BitSet(1);
- boolean isScanRequired =
+ boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length ||
isScanRequired(blockMaxValue[dimensionBlocksIndex[0]], filterRangeValues);
if (isScanRequired) {
bitSet.set(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index d694960..d6f7c86 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -81,7 +81,7 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
@Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
BitSet bitSet = new BitSet(1);
- boolean isScanRequired =
+ boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length ||
isScanRequired(blockMinValue[dimensionBlocksIndex[0]], filterRangeValues);
if (isScanRequired) {
bitSet.set(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index b3dd921..597ba52 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -82,7 +82,7 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
@Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) {
BitSet bitSet = new BitSet(1);
- boolean isScanRequired =
+ boolean isScanRequired = dimensionBlocksIndex[0] >= blockMaxValue.length ||
isScanRequired(blockMinValue[dimensionBlocksIndex[0]], filterRangeValues);
if (isScanRequired) {
bitSet.set(0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
index fdb5483..ff4f5dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/AbstractDataBlockIterator.java
@@ -165,6 +165,9 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
new BlocksChunkHolder(blockExecutionInfo.getTotalNumberDimensionBlock(),
blockExecutionInfo.getTotalNumberOfMeasureBlock(), fileReader);
blocksChunkHolder.setDataBlock(dataBlockIterator.next());
+ if (blocksChunkHolder.getDataBlock().getColumnsMaxValue() == null) {
+ return blocksChunkHolder;
+ }
if (blockletScanner.isScanRequired(blocksChunkHolder)) {
return blocksChunkHolder;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index 92e9594..95030d3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.datastore.FileHolder;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNodeWrapper;
import org.apache.carbondata.core.mutate.DeleteDeltaVo;
import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader;
import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
@@ -127,20 +128,27 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato
// set the deleted row to block execution info
blockInfo.setDeletedRecordsMap(deletedRowsMap);
}
- DataRefNode startDataBlock = finder
- .findFirstDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getStartKey());
- while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) {
- startDataBlock = startDataBlock.getNextDataRefNode();
- }
- long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan();
- //if number of block is less than 0 then take end block.
- if (numberOfBlockToScan <= 0) {
- DataRefNode endDataBlock = finder
- .findLastDataBlock(blockInfo.getDataBlock().getDataRefNode(), blockInfo.getEndKey());
- numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1;
+ DataRefNode dataRefNode = blockInfo.getDataBlock().getDataRefNode();
+ if (dataRefNode instanceof BlockletDataRefNodeWrapper) {
+ BlockletDataRefNodeWrapper wrapper = (BlockletDataRefNodeWrapper) dataRefNode;
+ blockInfo.setFirstDataBlock(wrapper);
+ blockInfo.setNumberOfBlockToScan(wrapper.numberOfNodes());
+
+ } else {
+ DataRefNode startDataBlock =
+ finder.findFirstDataBlock(dataRefNode, blockInfo.getStartKey());
+ while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) {
+ startDataBlock = startDataBlock.getNextDataRefNode();
+ }
+ long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan();
+ //if number of block is less than 0 then take end block.
+ if (numberOfBlockToScan <= 0) {
+ DataRefNode endDataBlock = finder.findLastDataBlock(dataRefNode, blockInfo.getEndKey());
+ numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1;
+ }
+ blockInfo.setFirstDataBlock(startDataBlock);
+ blockInfo.setNumberOfBlockToScan(numberOfBlockToScan);
}
- blockInfo.setFirstDataBlock(startDataBlock);
- blockInfo.setNumberOfBlockToScan(numberOfBlockToScan);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index 97b1a1f..34c7709 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -122,6 +122,57 @@ public abstract class AbstractDataFileFooterConverter {
}
/**
+ * Below method will be used to get the index info from index file
+ *
+ * @param filePath file path of the index file
+ * @return list of index info
+ * @throws IOException problem while reading the index file
+ */
+ public List<DataFileFooter> getIndexInfo(String filePath) throws IOException {
+ CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+ List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
+ String parentPath = filePath.substring(0, filePath.lastIndexOf("/"));
+ try {
+ // open the reader
+ indexReader.openThriftReader(filePath);
+ // get the index header
+ org.apache.carbondata.format.IndexHeader readIndexHeader = indexReader.readIndexHeader();
+ List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+ List<org.apache.carbondata.format.ColumnSchema> table_columns =
+ readIndexHeader.getTable_columns();
+ for (int i = 0; i < table_columns.size(); i++) {
+ columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+ }
+ // get the segment info
+ SegmentInfo segmentInfo = getSegmentInfo(readIndexHeader.getSegment_info());
+ BlockletIndex blockletIndex = null;
+ DataFileFooter dataFileFooter = null;
+ // read the block info from file
+ while (indexReader.hasNext()) {
+ BlockIndex readBlockIndexInfo = indexReader.readBlockIndexInfo();
+ blockletIndex = getBlockletIndex(readBlockIndexInfo.getBlock_index());
+ dataFileFooter = new DataFileFooter();
+ TableBlockInfo tableBlockInfo = new TableBlockInfo();
+ tableBlockInfo.setBlockOffset(readBlockIndexInfo.getOffset());
+ tableBlockInfo.setVersion(
+ ColumnarFormatVersion.valueOf((short) readIndexHeader.getVersion()));
+ int blockletSize = getBlockletSize(readBlockIndexInfo);
+ tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize);
+ tableBlockInfo.setFilePath(parentPath + "/" + readBlockIndexInfo.file_name);
+ dataFileFooter.setBlockletIndex(blockletIndex);
+ dataFileFooter.setColumnInTable(columnSchemaList);
+ dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows());
+ dataFileFooter.setBlockInfo(new BlockInfo(tableBlockInfo));
+ dataFileFooter.setSegmentInfo(segmentInfo);
+ dataFileFooters.add(dataFileFooter);
+ }
+ } finally {
+ indexReader.closeThriftReader();
+ }
+ return dataFileFooters;
+ }
+
+ /**
* the methods returns the number of blocklets in a block
*
* @param readBlockIndexInfo
@@ -148,6 +199,8 @@ public abstract class AbstractDataFileFooterConverter {
public abstract DataFileFooter readDataFileFooter(TableBlockInfo tableBlockInfo)
throws IOException;
+ public abstract List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException;
+
/**
* Below method will be used to get blocklet index for data file meta
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index b9c164a..f62f3d8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -54,10 +54,13 @@ import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
@@ -926,10 +929,26 @@ public final class CarbonUtil {
* Below method will be used to read the data file matadata
*/
public static DataFileFooter readMetadatFile(TableBlockInfo tableBlockInfo) throws IOException {
- AbstractDataFileFooterConverter fileFooterConverter =
- DataFileFooterConverterFactory.getInstance()
- .getDataFileFooterConverter(tableBlockInfo.getVersion());
- return fileFooterConverter.readDataFileFooter(tableBlockInfo);
+ BlockletDetailInfo detailInfo = tableBlockInfo.getDetailInfo();
+ if (detailInfo == null) {
+ AbstractDataFileFooterConverter fileFooterConverter =
+ DataFileFooterConverterFactory.getInstance()
+ .getDataFileFooterConverter(tableBlockInfo.getVersion());
+ return fileFooterConverter.readDataFileFooter(tableBlockInfo);
+ } else {
+ DataFileFooter fileFooter = new DataFileFooter();
+ fileFooter.setSchemaUpdatedTimeStamp(detailInfo.getSchemaUpdatedTimeStamp());
+ ColumnarFormatVersion version =
+ ColumnarFormatVersion.valueOf(detailInfo.getVersionNumber());
+ AbstractDataFileFooterConverter dataFileFooterConverter =
+ DataFileFooterConverterFactory.getInstance().getDataFileFooterConverter(version);
+ fileFooter.setColumnInTable(dataFileFooterConverter.getSchema(tableBlockInfo));
+ SegmentInfo segmentInfo = new SegmentInfo();
+ segmentInfo.setColumnCardinality(detailInfo.getDimLens());
+ segmentInfo.setNumberOfColumns(detailInfo.getRowCount());
+ fileFooter.setSegmentInfo(segmentInfo);
+ return fileFooter;
+ }
}
/**
@@ -1567,24 +1586,23 @@ public final class CarbonUtil {
}
/**
- * @param tableInfo
* @param invalidBlockVOForSegmentId
* @param updateStatusMngr
* @return
*/
- public static boolean isInvalidTableBlock(TableBlockInfo tableInfo,
+ public static boolean isInvalidTableBlock(String segmentId, String filePath,
UpdateVO invalidBlockVOForSegmentId, SegmentUpdateStatusManager updateStatusMngr) {
- if (!updateStatusMngr.isBlockValid(tableInfo.getSegmentId(),
- CarbonTablePath.getCarbonDataFileName(tableInfo.getFilePath()) + CarbonTablePath
+ if (!updateStatusMngr.isBlockValid(segmentId,
+ CarbonTablePath.getCarbonDataFileName(filePath) + CarbonTablePath
.getCarbonDataExtension())) {
return true;
}
if (null != invalidBlockVOForSegmentId) {
- Long blockTimeStamp = Long.parseLong(tableInfo.getFilePath()
- .substring(tableInfo.getFilePath().lastIndexOf('-') + 1,
- tableInfo.getFilePath().lastIndexOf('.')));
+ Long blockTimeStamp = Long.parseLong(filePath
+ .substring(filePath.lastIndexOf('-') + 1,
+ filePath.lastIndexOf('.')));
if ((blockTimeStamp > invalidBlockVOForSegmentId.getFactTimestamp() && (
invalidBlockVOForSegmentId.getUpdateDeltaStartTimestamp() != null
&& blockTimeStamp < invalidBlockVOForSegmentId.getUpdateDeltaStartTimestamp()))) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
index 0f82b95..3ac6987 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
@@ -121,4 +121,8 @@ public class DataFileFooterConverter extends AbstractDataFileFooterConverter {
blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows());
return blockletInfo;
}
+
+ @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
index 4882b0f..8cd437f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
@@ -140,4 +140,7 @@ public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter {
return numberOfDimensionColumns;
}
+ @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
index 143c1b1..ccb8b29 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -85,6 +85,17 @@ public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
return dataFileFooter;
}
+ @Override public List<ColumnSchema> getSchema(TableBlockInfo tableBlockInfo) throws IOException {
+ CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(tableBlockInfo.getFilePath());
+ FileHeader fileHeader = carbonHeaderReader.readHeader();
+ List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+ List<org.apache.carbondata.format.ColumnSchema> table_columns = fileHeader.getColumn_schema();
+ for (int i = 0; i < table_columns.size(); i++) {
+ columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+ }
+ return columnSchemaList;
+ }
+
/**
* Below method is to convert the blocklet info of the thrift to wrapper
* blocklet info
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/format/src/main/thrift/carbondata_index.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata_index.thrift b/format/src/main/thrift/carbondata_index.thrift
index c055031..4df085a 100644
--- a/format/src/main/thrift/carbondata_index.thrift
+++ b/format/src/main/thrift/carbondata_index.thrift
@@ -41,4 +41,5 @@ struct BlockIndex{
2: required string file_name; // Block file name
3: required i64 offset; // Offset of the footer
4: required carbondata.BlockletIndex block_index; // Blocklet index
+ 5: optional carbondata.BlockletInfo3 blocklet_info;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 1673193..81226a2 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -21,7 +21,14 @@ import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.DataRefNode;
@@ -375,8 +382,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
if (isIUDTable) {
// In case IUD is not performed in this table avoid searching for
// invalidated blocks.
- if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, invalidBlockVOForSegmentId,
- updateStatusManager)) {
+ if (CarbonUtil
+ .isInvalidTableBlock(tableBlockInfo.getSegmentId(), tableBlockInfo.getFilePath(),
+ invalidBlockVOForSegmentId, updateStatusManager)) {
continue;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 631bc2c..56bade7 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.BlockletInfos;
import org.apache.carbondata.core.datastore.block.Distributable;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.util.ByteUtil;
@@ -77,6 +78,8 @@ public class CarbonInputSplit extends FileSplit
*/
private String[] deleteDeltaFiles;
+ private BlockletDetailInfo detailInfo;
+
public CarbonInputSplit() {
segmentId = null;
taskId = "0";
@@ -138,10 +141,12 @@ public class CarbonInputSplit extends FileSplit
BlockletInfos blockletInfos =
new BlockletInfos(split.getNumberOfBlocklets(), 0, split.getNumberOfBlocklets());
try {
- tableBlockInfoList.add(
+ TableBlockInfo blockInfo =
new TableBlockInfo(split.getPath().toString(), split.getStart(), split.getSegmentId(),
split.getLocations(), split.getLength(), blockletInfos, split.getVersion(),
- split.getDeleteDeltaFiles()));
+ split.getDeleteDeltaFiles());
+ blockInfo.setDetailInfo(split.getDetailInfo());
+ tableBlockInfoList.add(blockInfo);
} catch (IOException e) {
throw new RuntimeException("fail to get location of split: " + split, e);
}
@@ -153,9 +158,12 @@ public class CarbonInputSplit extends FileSplit
BlockletInfos blockletInfos =
new BlockletInfos(inputSplit.getNumberOfBlocklets(), 0, inputSplit.getNumberOfBlocklets());
try {
- return new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(),
- inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(),
- blockletInfos, inputSplit.getVersion(), inputSplit.getDeleteDeltaFiles());
+ TableBlockInfo blockInfo =
+ new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(),
+ inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(),
+ blockletInfos, inputSplit.getVersion(), inputSplit.getDeleteDeltaFiles());
+ blockInfo.setDetailInfo(inputSplit.getDetailInfo());
+ return blockInfo;
} catch (IOException e) {
throw new RuntimeException("fail to get location of split: " + inputSplit, e);
}
@@ -180,6 +188,11 @@ public class CarbonInputSplit extends FileSplit
for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
deleteDeltaFiles[i] = in.readUTF();
}
+ boolean detailInfoExists = in.readBoolean();
+ if (detailInfoExists) {
+ detailInfo = new BlockletDetailInfo();
+ detailInfo.readFields(in);
+ }
}
@Override public void write(DataOutput out) throws IOException {
@@ -197,6 +210,10 @@ public class CarbonInputSplit extends FileSplit
out.writeUTF(deleteDeltaFiles[i]);
}
}
+ out.writeBoolean(detailInfo != null);
+ if (detailInfo != null) {
+ detailInfo.write(out);
+ }
}
public List<String> getInvalidSegments() {
@@ -310,4 +327,16 @@ public class CarbonInputSplit extends FileSplit
public String[] getDeleteDeltaFiles() {
return deleteDeltaFiles;
}
+
+ public void setDeleteDeltaFiles(String[] deleteDeltaFiles) {
+ this.deleteDeltaFiles = deleteDeltaFiles;
+ }
+
+ public BlockletDetailInfo getDetailInfo() {
+ return detailInfo;
+ }
+
+ public void setDetailInfo(BlockletDetailInfo detailInfo) {
+ this.detailInfo = detailInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index ae9c676..e73c04a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -18,152 +18,556 @@
package org.apache.carbondata.hadoop.api;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier;
+import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.DataMapStoreManager;
+import org.apache.carbondata.core.indexstore.DataMapType;
+import org.apache.carbondata.core.indexstore.TableDataMap;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.mutate.data.BlockMappingVO;
import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.partition.PartitionUtil;
+import org.apache.carbondata.core.scan.partition.Partitioner;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
import org.apache.carbondata.hadoop.CarbonProjection;
-import org.apache.carbondata.hadoop.internal.CarbonInputSplit;
-import org.apache.carbondata.hadoop.internal.segment.Segment;
-import org.apache.carbondata.hadoop.internal.segment.SegmentManager;
-import org.apache.carbondata.hadoop.internal.segment.SegmentManagerFactory;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+import org.apache.carbondata.hadoop.util.SchemaReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.util.StringUtils;
/**
* Input format of CarbonData file.
+ *
* @param <T>
*/
public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
+ // comma separated list of input segment numbers
+ public static final String INPUT_SEGMENT_NUMBERS =
+ "mapreduce.input.carboninputformat.segmentnumbers";
+ // comma separated list of input files
+ public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
+ private static final Log LOG = LogFactory.getLog(CarbonTableInputFormat.class);
private static final String FILTER_PREDICATE =
"mapreduce.input.carboninputformat.filter.predicate";
+ private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
+ private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
+ private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
- private SegmentManager segmentManager;
+ /**
+ * It is optional, if user does not set then it reads from store
+ *
+ * @param configuration
+ * @param carbonTable
+ * @throws IOException
+ */
+ public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
+ throws IOException {
+ if (null != carbonTable) {
+ configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable));
+ }
+ }
- public CarbonTableInputFormat() {
- this.segmentManager = SegmentManagerFactory.getGlobalSegmentManager();
+ public static CarbonTable getCarbonTable(Configuration configuration) throws IOException {
+ String carbonTableStr = configuration.get(CARBON_TABLE);
+ if (carbonTableStr == null) {
+ populateCarbonTable(configuration);
+ // read it from schema file in the store
+ carbonTableStr = configuration.get(CARBON_TABLE);
+ return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
+ }
+ return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
}
- @Override
- public RecordReader<Void, T> createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException, InterruptedException {
- switch (((CarbonInputSplit)split).formatType()) {
- case COLUMNAR:
- // TODO: create record reader for columnar format
- break;
- default:
- throw new RuntimeException("Unsupported format type");
+ /**
+ * this method will read the schema from the physical file and populate into CARBON_TABLE
+ *
+ * @param configuration
+ * @throws IOException
+ */
+ private static void populateCarbonTable(Configuration configuration) throws IOException {
+ String dirs = configuration.get(INPUT_DIR, "");
+ String[] inputPaths = StringUtils.split(dirs);
+ if (inputPaths.length == 0) {
+ throw new InvalidPathException("No input paths specified in job");
}
- return null;
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
+ // read the schema file to get the absoluteTableIdentifier having the correct table id
+ // persisted in the schema
+ CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
+ setCarbonTable(configuration, carbonTable);
}
- @Override
- public List<InputSplit> getSplits(JobContext job) throws IOException {
+ public static void setTablePath(Configuration configuration, String tablePath)
+ throws IOException {
+ configuration.set(FileInputFormat.INPUT_DIR, tablePath);
+ }
- // work as following steps:
- // get all current valid segment
- // for each segment, get all input split
+ /**
+ * It sets unresolved filter expression.
+ *
+ * @param configuration
+ * @param filterExpression
+ */
+ public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
+ if (filterExpression == null) {
+ return;
+ }
+ try {
+ String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
+ configuration.set(FILTER_PREDICATE, filterString);
+ } catch (Exception e) {
+ throw new RuntimeException("Error while setting filter expression to Job", e);
+ }
+ }
- List<InputSplit> output = new LinkedList<>();
- Expression filter = getFilter(job.getConfiguration());
- Segment[] segments = segmentManager.getAllValidSegments();
- FilterResolverIntf filterResolver = CarbonInputFormatUtil.resolveFilter(filter, null);
- for (Segment segment: segments) {
- List<InputSplit> splits = segment.getSplits(job, filterResolver);
- output.addAll(splits);
+ public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
+ if (projection == null || projection.isEmpty()) {
+ return;
+ }
+ String[] allColumns = projection.getAllColumns();
+ StringBuilder builder = new StringBuilder();
+ for (String column : allColumns) {
+ builder.append(column).append(",");
}
- return output;
+ String columnString = builder.toString();
+ columnString = columnString.substring(0, columnString.length() - 1);
+ configuration.set(COLUMN_PROJECTION, columnString);
}
- /**
- * set the table path into configuration
- * @param conf configuration of the job
- * @param tablePath table path string
- */
- public void setTablePath(Configuration conf, String tablePath) {
+ public static String getColumnProjection(Configuration configuration) {
+ return configuration.get(COLUMN_PROJECTION);
+ }
+
+ public static void setCarbonReadSupport(Configuration configuration,
+ Class<? extends CarbonReadSupport> readSupportClass) {
+ if (readSupportClass != null) {
+ configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName());
+ }
+ }
+ private static CarbonTablePath getTablePath(AbsoluteTableIdentifier absIdentifier) {
+ return CarbonStorePath.getCarbonTablePath(absIdentifier);
}
/**
- * return the table path in the configuration
- * @param conf configuration of the job
- * @return table path string
+ * Set list of segments to access
*/
- public String getTablePath(Configuration conf) {
- return null;
+ public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) {
+ configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments));
}
/**
- * set projection columns into configuration
- * @param conf configuration of the job
- * @param projection projection
+ * Set list of files to access
*/
- public void setProjection(Configuration conf, CarbonProjection projection) {
+ public static void setFilesToAccess(Configuration configuration, List<String> validFiles) {
+ configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
+ }
+ private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+ throws IOException {
+ return getCarbonTable(configuration).getAbsoluteTableIdentifier();
}
/**
- * return the projection in the configuration
- * @param conf configuration of the job
- * @return projection
+ * {@inheritDoc}
+ * Configurations FileInputFormat.INPUT_DIR
+ * are used to get table path to read.
+ *
+ * @param job
+ * @return List<InputSplit> list of CarbonInputSplit
+ * @throws IOException
*/
- public CarbonProjection getProjection(Configuration conf) {
- return null;
+ @Override public List<InputSplit> getSplits(JobContext job) throws IOException {
+ AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
+ TableDataMap blockletMap =
+ DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET);
+ List<String> invalidSegments = new ArrayList<>();
+ List<UpdateVO> invalidTimestampsList = new ArrayList<>();
+ List<String> validSegments = Arrays.asList(getSegmentsToAccess(job));
+ // get all valid segments and set them into the configuration
+ if (validSegments.size() == 0) {
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
+ SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
+ segmentStatusManager.getValidAndInvalidSegments();
+ SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
+ validSegments = segments.getValidSegments();
+ if (validSegments.size() == 0) {
+ return new ArrayList<>(0);
+ }
+
+ // remove entry in the segment index if there are invalid segments
+ invalidSegments.addAll(segments.getInvalidSegments());
+ for (String invalidSegmentId : invalidSegments) {
+ invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
+ }
+ if (invalidSegments.size() > 0) {
+ List<TableSegmentUniqueIdentifier> invalidSegmentsIds =
+ new ArrayList<>(invalidSegments.size());
+ for (String segId : invalidSegments) {
+ invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(identifier, segId));
+ }
+ blockletMap.clear(invalidSegments);
+ }
+ }
+
+ // process and resolve the expression
+ Expression filter = getFilterPredicates(job.getConfiguration());
+ CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
+ // this will be null in case of corrupt schema file.
+ if (null == carbonTable) {
+ throw new IOException("Missing/Corrupt schema file for table.");
+ }
+
+ CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+
+ // prune partitions for filter query on partition table
+ BitSet matchedPartitions = null;
+ if (null != filter) {
+ PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName());
+ if (null != partitionInfo) {
+ Partitioner partitioner = PartitionUtil.getPartitioner(partitionInfo);
+ matchedPartitions = new FilterExpressionProcessor()
+ .getFilteredPartitions(filter, partitionInfo, partitioner);
+ if (matchedPartitions.cardinality() == 0) {
+ // no partition is required
+ return new ArrayList<InputSplit>();
+ }
+ if (matchedPartitions.cardinality() == partitioner.numPartitions()) {
+ // all partitions are required, no need to prune partitions
+ matchedPartitions = null;
+ }
+ }
+ }
+
+ FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier);
+
+ // do block filtering and get split
+ List<InputSplit> splits = getSplits(job, filterInterface, validSegments, matchedPartitions);
+ // pass the invalid segment to task side in order to remove index entry in task side
+ if (invalidSegments.size() > 0) {
+ for (InputSplit split : splits) {
+ ((org.apache.carbondata.hadoop.CarbonInputSplit) split).setInvalidSegments(invalidSegments);
+ ((org.apache.carbondata.hadoop.CarbonInputSplit) split)
+ .setInvalidTimestampRange(invalidTimestampsList);
+ }
+ }
+ return splits;
}
/**
- * set filter expression into the configuration
- * @param conf configuration of the job
- * @param filter filter expression
+ * {@inheritDoc}
+ * Configurations FileInputFormat.INPUT_DIR, CarbonInputFormat.INPUT_SEGMENT_NUMBERS
+ * are used to get table path to read.
+ *
+ * @return
+ * @throws IOException
*/
- public void setFilter(Configuration conf, Expression filter) {
+ private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver,
+ List<String> validSegments, BitSet matchedPartitions) throws IOException {
+
+ List<InputSplit> result = new LinkedList<InputSplit>();
+ UpdateVO invalidBlockVOForSegmentId = null;
+ Boolean isIUDTable = false;
+
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
+ SegmentUpdateStatusManager updateStatusManager =
+ new SegmentUpdateStatusManager(absoluteTableIdentifier);
+
+ isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
+
+ //for each segment fetch blocks matching filter in Driver BTree
+ List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment =
+ getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions,
+ validSegments);
+ for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) {
+
+ // Get the UpdateVO for those tables on which IUD operations being performed.
+ if (isIUDTable) {
+ invalidBlockVOForSegmentId =
+ updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId());
+ }
+ if (isIUDTable) {
+ // In case IUD is not performed in this table avoid searching for
+ // invalidated blocks.
+ if (CarbonUtil
+ .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(),
+ invalidBlockVOForSegmentId, updateStatusManager)) {
+ continue;
+ }
+ }
+ String[] deleteDeltaFilePath = null;
+ try {
+ deleteDeltaFilePath =
+ updateStatusManager.getDeleteDeltaFilePath(inputSplit.getPath().toString());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ inputSplit.setDeleteDeltaFiles(deleteDeltaFilePath);
+ result.add(inputSplit);
+ }
+ return result;
+ }
+
+ protected Expression getFilterPredicates(Configuration configuration) {
try {
- String filterString = ObjectSerializationUtil.convertObjectToString(filter);
- conf.set(FILTER_PREDICATE, filterString);
- } catch (Exception e) {
- throw new RuntimeException("Error while setting filter expression to Job", e);
+ String filterExprString = configuration.get(FILTER_PREDICATE);
+ if (filterExprString == null) {
+ return null;
+ }
+ Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
+ return (Expression) filter;
+ } catch (IOException e) {
+ throw new RuntimeException("Error while reading filter expression", e);
}
}
/**
- * return filter expression in the configuration
- * @param conf configuration of the job
- * @return filter expression
+ * get data blocks of given segment
*/
- public Expression getFilter(Configuration conf) {
- Object filter;
- String filterExprString = conf.get(FILTER_PREDICATE);
- if (filterExprString == null) {
- return null;
+ private List<org.apache.carbondata.hadoop.CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
+ AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
+ BitSet matchedPartitions, List<String> segmentIds) throws IOException {
+
+ QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
+ QueryStatistic statistic = new QueryStatistic();
+
+ // get tokens for all the required FileSystem for table path
+ TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+ new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
+
+ TableDataMap blockletMap = DataMapStoreManager.getInstance()
+ .getDataMap(absoluteTableIdentifier, "blocklet", DataMapType.BLOCKLET);
+ List<Blocklet> prunedBlocklets = blockletMap.prune(segmentIds, resolver);
+
+ List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
+ for (Blocklet blocklet : prunedBlocklets) {
+ int taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
+ CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath().toString()));
+
+ // matchedPartitions variable will be null in two cases as follows
+ // 1. the table is not a partition table
+ // 2. the table is a partition table, and all partitions are matched by query
+ // for partition table, the task id of carbaondata file name is the partition id.
+ // if this partition is not required, here will skip it.
+ if (matchedPartitions == null || matchedPartitions.get(taskId)) {
+ resultFilterredBlocks.add(convertToCarbonInputSplit(blocklet));
+ }
}
+ statistic
+ .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+ recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
+ return resultFilterredBlocks;
+ }
+
+ private org.apache.carbondata.hadoop.CarbonInputSplit convertToCarbonInputSplit(Blocklet blocklet)
+ throws IOException {
+ blocklet.updateLocations();
+ org.apache.carbondata.hadoop.CarbonInputSplit split =
+ org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(),
+ new FileSplit(blocklet.getPath(), 0, blocklet.getLength(), blocklet.getLocations()),
+ ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()));
+ split.setDetailInfo(blocklet.getDetailInfo());
+ return split;
+ }
+
+ @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ Configuration configuration = taskAttemptContext.getConfiguration();
+ QueryModel queryModel = getQueryModel(inputSplit, taskAttemptContext);
+ CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
+ return new CarbonRecordReader<T>(queryModel, readSupport);
+ }
+
+ public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException {
+ Configuration configuration = taskAttemptContext.getConfiguration();
+ CarbonTable carbonTable = getCarbonTable(configuration);
+ // getting the table absoluteTableIdentifier from the carbonTable
+ // to avoid unnecessary deserialization
+ AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
+
+ // query plan includes projection column
+ String projection = getColumnProjection(configuration);
+ CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
+ QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable);
+
+ // set the filter to the query model in order to filter blocklet before scan
+ Expression filter = getFilterPredicates(configuration);
+ CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+ FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier);
+ queryModel.setFilterExpressionResolverTree(filterIntf);
+
+ // update the file level index store if there are invalid segment
+ if (inputSplit instanceof CarbonMultiBlockSplit) {
+ CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
+ List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
+ if (invalidSegments.size() > 0) {
+ queryModel.setInvalidSegmentIds(invalidSegments);
+ }
+ List<UpdateVO> invalidTimestampRangeList =
+ split.getAllSplits().get(0).getInvalidTimestampRange();
+ if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
+ queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
+ }
+ }
+ return queryModel;
+ }
+
+ public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
+ String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
+ //By default it uses dictionary decoder read class
+ CarbonReadSupport<T> readSupport = null;
+ if (readSupportClass != null) {
+ try {
+ Class<?> myClass = Class.forName(readSupportClass);
+ Constructor<?> constructor = myClass.getConstructors()[0];
+ Object object = constructor.newInstance();
+ if (object instanceof CarbonReadSupport) {
+ readSupport = (CarbonReadSupport) object;
+ }
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Class " + readSupportClass + "not found", ex);
+ } catch (Exception ex) {
+ LOG.error("Error while creating " + readSupportClass, ex);
+ }
+ } else {
+ readSupport = new DictionaryDecodeReadSupport<>();
+ }
+ return readSupport;
+ }
+
+ @Override protected boolean isSplitable(JobContext context, Path filename) {
try {
- filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
- } catch (IOException e) {
- throw new RuntimeException("Error while reading filter expression", e);
+ // Don't split the file if it is local file system
+ FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
+ if (fileSystem instanceof LocalFileSystem) {
+ return false;
+ }
+ } catch (Exception e) {
+ return true;
+ }
+ return true;
+ }
+
+ /**
+ * required to be moved to core
+ *
+ * @return updateExtension
+ */
+ private String getUpdateExtension() {
+ // TODO: required to modify when supporting update, mostly will be update timestamp
+ return "update";
+ }
+
+ /**
+ * return valid segment to access
+ */
+ private String[] getSegmentsToAccess(JobContext job) {
+ String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
+ if (segmentString.trim().isEmpty()) {
+ return new String[0];
}
- assert (filter instanceof Expression);
- return (Expression) filter;
+ return segmentString.split(",");
}
/**
- * Optional API. It can be used by query optimizer to select index based on filter
- * in the configuration of the job. After selecting index internally, index' name will be set
- * in the configuration.
+ * Get the row count of the Block and mapping of segment and Block count.
*
- * The process of selection is simple, just use the default index. Subclass can provide a more
- * advanced selection logic like cost based.
- * @param conf job configuration
+ * @param job
+ * @param identifier
+ * @return
+ * @throws IOException
+ * @throws KeyGenException
*/
- public void selectIndex(Configuration conf) {
- // set the default index in configuration
+ public BlockMappingVO getBlockRowCount(JobContext job, AbsoluteTableIdentifier identifier)
+ throws IOException, KeyGenException {
+ TableDataMap blockletMap =
+ DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET);
+ SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
+ SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments =
+ new SegmentStatusManager(identifier).getValidAndInvalidSegments();
+ Map<String, Long> blockRowCountMapping =
+ new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ Map<String, Long> segmentAndBlockCountMapping =
+ new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ List<Blocklet> blocklets = blockletMap.prune(validAndInvalidSegments.getValidSegments(), null);
+ for (Blocklet blocklet : blocklets) {
+ String blockName = blocklet.getPath().toString();
+ blockName = CarbonTablePath.getCarbonDataFileName(blockName);
+ blockName = blockName + CarbonTablePath.getCarbonDataExtension();
+
+ long rowCount = blocklet.getDetailInfo().getRowCount();
+
+ String key = CarbonUpdateUtil.getSegmentBlockNameKey(blocklet.getSegmentId(), blockName);
+
+ // if block is invalid then dont add the count
+ SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
+
+ if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getStatus())) {
+ Long blockCount = blockRowCountMapping.get(key);
+ if (blockCount == null) {
+ blockCount = 0L;
+ Long count = segmentAndBlockCountMapping.get(blocklet.getSegmentId());
+ if (count == null) {
+ count = 0L;
+ }
+ segmentAndBlockCountMapping.put(blocklet.getSegmentId(), count + 1);
+ }
+ blockCount += rowCount;
+ blockRowCountMapping.put(key, blockCount);
+ }
+ }
+ return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2015a3e2/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 8270304..8269757 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -34,7 +34,7 @@ import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
import org.apache.carbondata.core.scan.model.QueryDimension;
import org.apache.carbondata.core.scan.model.QueryMeasure;
import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.hadoop.CarbonInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
@@ -77,9 +77,10 @@ public class CarbonInputFormatUtil {
return plan;
}
- public static <V> CarbonInputFormat<V> createCarbonInputFormat(AbsoluteTableIdentifier identifier,
+ public static <V> CarbonTableInputFormat<V> createCarbonInputFormat(
+ AbsoluteTableIdentifier identifier,
Job job) throws IOException {
- CarbonInputFormat<V> carbonInputFormat = new CarbonInputFormat<>();
+ CarbonTableInputFormat<V> carbonInputFormat = new CarbonTableInputFormat<>();
FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
return carbonInputFormat;
}