You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/01/10 11:30:08 UTC
[1/2] carbondata git commit: [CARBONDATA-3220] Support presto to read
stream segment data
Repository: carbondata
Updated Branches:
refs/heads/master 8e6def9fa -> d78db8f6b
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index aebe549..31417bc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -41,13 +41,13 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
+import org.apache.carbondata.hadoop.stream.CarbonStreamInputFormat
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl}
import org.apache.carbondata.spark.util.CommonUtil
-import org.apache.carbondata.streaming.CarbonStreamInputFormat
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
index 50d6c46..e822634 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
@@ -18,285 +18,45 @@
package org.apache.carbondata.stream;
import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.reader.CarbonHeaderReader;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
-import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.GenericQueryType;
-import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
-import org.apache.carbondata.core.scan.filter.intf.RowImpl;
-import org.apache.carbondata.core.scan.filter.intf.RowIntf;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.util.CarbonMetadataUtil;
-import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.format.BlockletHeader;
-import org.apache.carbondata.format.FileHeader;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
import org.apache.carbondata.hadoop.InputMetricsStats;
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-import org.apache.carbondata.streaming.CarbonStreamInputFormat;
-import org.apache.carbondata.streaming.StreamBlockletReader;
+import org.apache.carbondata.hadoop.stream.StreamRecordReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.CarbonVectorProxy;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.types.*;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
/**
- * Stream record reader
+ * Stream vector/row record reader
*/
-public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
- // vector reader
- private boolean isVectorReader;
-
- // metadata
- private CarbonTable carbonTable;
- private CarbonColumn[] storageColumns;
- private boolean[] isRequired;
- private DataType[] measureDataTypes;
- private int dimensionCount;
- private int measureCount;
-
- // input
- private FileSplit fileSplit;
- private Configuration hadoopConf;
- private StreamBlockletReader input;
- private boolean isFirstRow = true;
- private QueryModel model;
+public class CarbonStreamRecordReader extends StreamRecordReader {
- // decode data
- private BitSet allNonNull;
- private boolean[] isNoDictColumn;
- private DirectDictionaryGenerator[] directDictionaryGenerators;
- private CacheProvider cacheProvider;
- private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
- private GenericQueryType[] queryTypes;
- private String compressorName;
-
- // vectorized reader
- private StructType outputSchema;
+ // vector reader
+ protected boolean isVectorReader;
private CarbonVectorProxy vectorProxy;
- private boolean isFinished = false;
-
- // filter
- private FilterExecuter filter;
- private boolean[] isFilterRequired;
- private Object[] filterValues;
- private RowIntf filterRow;
- private int[] filterMap;
-
- // output
- private CarbonColumn[] projection;
- private boolean[] isProjectionRequired;
- private int[] projectionMap;
- private Object[] outputValues;
+ private StructType outputSchema;
private InternalRow outputRow;
- // empty project, null filter
- private boolean skipScanData;
-
- // return raw row for handoff
- private boolean useRawRow = false;
-
// InputMetricsStats
private InputMetricsStats inputMetricsStats;
public CarbonStreamRecordReader(boolean isVectorReader, InputMetricsStats inputMetricsStats,
QueryModel mdl, boolean useRawRow) {
+ super(mdl, useRawRow);
this.isVectorReader = isVectorReader;
this.inputMetricsStats = inputMetricsStats;
- this.model = mdl;
- this.useRawRow = useRawRow;
-
}
- @Override public void initialize(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- // input
- if (split instanceof CarbonInputSplit) {
- fileSplit = (CarbonInputSplit) split;
- } else if (split instanceof CarbonMultiBlockSplit) {
- fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0);
- } else {
- fileSplit = (FileSplit) split;
- }
-
- // metadata
- hadoopConf = context.getConfiguration();
- if (model == null) {
- CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
- model = format.createQueryModel(split, context);
- }
- carbonTable = model.getTable();
- List<CarbonDimension> dimensions =
- carbonTable.getDimensionByTableName(carbonTable.getTableName());
- dimensionCount = dimensions.size();
- List<CarbonMeasure> measures =
- carbonTable.getMeasureByTableName(carbonTable.getTableName());
- measureCount = measures.size();
- List<CarbonColumn> carbonColumnList =
- carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName());
- storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]);
- isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns);
- directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length];
- for (int i = 0; i < storageColumns.length; i++) {
- if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
- directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
- .getDirectDictionaryGenerator(storageColumns[i].getDataType());
- }
- }
- measureDataTypes = new DataType[measureCount];
- for (int i = 0; i < measureCount; i++) {
- measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType();
- }
-
- // decode data
- allNonNull = new BitSet(storageColumns.length);
- projection = model.getProjectionColumns();
-
- isRequired = new boolean[storageColumns.length];
- boolean[] isFiltlerDimensions = model.getIsFilterDimensions();
- boolean[] isFiltlerMeasures = model.getIsFilterMeasures();
- isFilterRequired = new boolean[storageColumns.length];
- filterMap = new int[storageColumns.length];
- for (int i = 0; i < storageColumns.length; i++) {
- if (storageColumns[i].isDimension()) {
- if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) {
- isRequired[i] = true;
- isFilterRequired[i] = true;
- filterMap[i] = storageColumns[i].getOrdinal();
- }
- } else {
- if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) {
- isRequired[i] = true;
- isFilterRequired[i] = true;
- filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal();
- }
- }
- }
-
- isProjectionRequired = new boolean[storageColumns.length];
- projectionMap = new int[storageColumns.length];
- for (int j = 0; j < projection.length; j++) {
- for (int i = 0; i < storageColumns.length; i++) {
- if (storageColumns[i].getColName().equals(projection[j].getColName())) {
- isRequired[i] = true;
- isProjectionRequired[i] = true;
- projectionMap[i] = j;
- break;
- }
- }
- }
-
- // initialize filter
- if (null != model.getFilterExpressionResolverTree()) {
- initializeFilter();
- } else if (projection.length == 0) {
- skipScanData = true;
- }
-
- }
-
- private void initializeFilter() {
-
- List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
- .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
- carbonTable.getMeasureByTableName(carbonTable.getTableName()));
- int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
- for (int i = 0; i < dimLensWithComplex.length; i++) {
- dimLensWithComplex[i] = Integer.MAX_VALUE;
- }
- int[] dictionaryColumnCardinality =
- CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
- SegmentProperties segmentProperties =
- new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality);
- Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
-
- FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree();
- filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties,
- complexDimensionInfoMap);
- // for row filter, we need update column index
- FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
- carbonTable.getDimensionOrdinalMax());
-
- }
-
- private byte[] getSyncMarker(String filePath) throws IOException {
- CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath);
- FileHeader header = headerReader.readHeader();
- // legacy store does not have this member
- if (header.isSetCompressor_name()) {
- compressorName = header.getCompressor_name();
- } else {
- compressorName = CompressorFactory.NativeSupportedCompressor.SNAPPY.getName();
- }
- return header.getSync_marker();
- }
-
- private void initializeAtFirstRow() throws IOException {
- filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount];
- filterRow = new RowImpl();
- filterRow.setValues(filterValues);
-
- outputValues = new Object[projection.length];
+ protected void initializeAtFirstRow() throws IOException {
+ super.initializeAtFirstRow();
outputRow = new GenericInternalRow(outputValues);
-
- Path file = fileSplit.getPath();
-
- byte[] syncMarker = getSyncMarker(file.toString());
-
- FileSystem fs = file.getFileSystem(hadoopConf);
-
- int bufferSize = Integer.parseInt(hadoopConf.get(CarbonStreamInputFormat.READ_BUFFER_SIZE,
- CarbonStreamInputFormat.READ_BUFFER_SIZE_DEFAULT));
-
- FSDataInputStream fileIn = fs.open(file, bufferSize);
- fileIn.seek(fileSplit.getStart());
- input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(),
- fileSplit.getStart() == 0, compressorName);
-
- cacheProvider = CacheProvider.getInstance();
- cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY);
- queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache);
-
outputSchema = new StructType((StructField[])
DataTypeUtil.getDataTypeConverter().convertCarbonSchemaToSparkSchema(projection));
}
@@ -317,6 +77,23 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
return nextRow();
}
+ @Override public Object getCurrentValue() {
+ if (isVectorReader) {
+ int value = vectorProxy.numRows();
+ if (inputMetricsStats != null) {
+ inputMetricsStats.incrementRecordRead((long) value);
+ }
+
+ return vectorProxy.getColumnarBatch();
+ }
+
+ if (inputMetricsStats != null) {
+ inputMetricsStats.incrementRecordRead(1L);
+ }
+
+ return outputRow;
+ }
+
/**
* for vector reader, check next columnar batch
*/
@@ -343,384 +120,53 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
return hasNext;
}
- /**
- * check next Row
- */
- private boolean nextRow() throws IOException {
- // read row one by one
- try {
- boolean hasNext;
- boolean scanMore = false;
- do {
- hasNext = input.hasNext();
- if (hasNext) {
- if (skipScanData) {
- input.nextRow();
- scanMore = false;
- } else {
- if (useRawRow) {
- // read raw row for streaming handoff which does not require decode raw row
- readRawRowFromStream();
- } else {
- readRowFromStream();
- }
- if (null != filter) {
- scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax());
- } else {
- scanMore = false;
- }
- }
- } else {
- if (input.nextBlocklet()) {
- BlockletHeader header = input.readBlockletHeader();
- if (isScanRequired(header)) {
- if (skipScanData) {
- input.skipBlockletData(false);
- } else {
- input.readBlockletData(header);
- }
- } else {
- input.skipBlockletData(true);
- }
- scanMore = true;
- } else {
- isFinished = true;
- scanMore = false;
+ private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException {
+ // if filter is null and output projection is empty, use the row number of blocklet header
+ if (skipScanData) {
+ int rowNums = header.getBlocklet_info().getNum_rows();
+ vectorProxy = new CarbonVectorProxy(MemoryMode.OFF_HEAP, outputSchema, rowNums, false);
+ vectorProxy.setNumRows(rowNums);
+ input.skipBlockletData(true);
+ return rowNums > 0;
+ }
+
+ input.readBlockletData(header);
+ vectorProxy =
+ new CarbonVectorProxy(MemoryMode.OFF_HEAP, outputSchema, input.getRowNums(), false);
+ int rowNum = 0;
+ if (null == filter) {
+ while (input.hasNext()) {
+ readRowFromStream();
+ putRowToColumnBatch(rowNum++);
+ }
+ } else {
+ try {
+ while (input.hasNext()) {
+ readRowFromStream();
+ if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) {
+ putRowToColumnBatch(rowNum++);
}
}
- } while (scanMore);
- return hasNext;
- } catch (FilterUnsupportedException e) {
- throw new IOException("Failed to filter row in detail reader", e);
- }
- }
-
- @Override public Void getCurrentKey() throws IOException, InterruptedException {
- return null;
- }
-
- @Override public Object getCurrentValue() throws IOException, InterruptedException {
- if (isVectorReader) {
- int value = vectorProxy.numRows();
- if (inputMetricsStats != null) {
- inputMetricsStats.incrementRecordRead((long) value);
- }
-
- return vectorProxy.getColumnarBatch();
- }
-
- if (inputMetricsStats != null) {
- inputMetricsStats.incrementRecordRead(1L);
- }
-
- return outputRow;
- }
-
- private boolean isScanRequired(BlockletHeader header) {
- if (filter != null && header.getBlocklet_index() != null) {
- BlockletMinMaxIndex minMaxIndex = CarbonMetadataUtil.convertExternalMinMaxIndex(
- header.getBlocklet_index().getMin_max_index());
- if (minMaxIndex != null) {
- BitSet bitSet = filter
- .isScanRequired(minMaxIndex.getMaxValues(), minMaxIndex.getMinValues(),
- minMaxIndex.getIsMinMaxSet());
- if (bitSet.isEmpty()) {
- return false;
- } else {
- return true;
- }
+ } catch (FilterUnsupportedException e) {
+ throw new IOException("Failed to filter row in vector reader", e);
}
}
- return true;
+ vectorProxy.setNumRows(rowNum);
+ return rowNum > 0;
}
- private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException {
- // if filter is null and output projection is empty, use the row number of blocklet header
- if (skipScanData) {
- int rowNums = header.getBlocklet_info().getNum_rows();
- vectorProxy = new CarbonVectorProxy(MemoryMode.OFF_HEAP, outputSchema, rowNums, false);
- vectorProxy.setNumRows(rowNums);
- input.skipBlockletData(true);
- return rowNums > 0;
- }
+ private void putRowToColumnBatch(int rowId) {
+ for (int i = 0; i < projection.length; i++) {
+ Object value = outputValues[i];
+ vectorProxy.getColumnVector(i).putRowToColumnBatch(rowId,value);
- input.readBlockletData(header);
- vectorProxy =
- new CarbonVectorProxy(MemoryMode.OFF_HEAP, outputSchema, input.getRowNums(), false);
- int rowNum = 0;
- if (null == filter) {
- while (input.hasNext()) {
- readRowFromStream();
- putRowToColumnBatch(rowNum++);
- }
- } else {
- try {
- while (input.hasNext()) {
- readRowFromStream();
- if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) {
- putRowToColumnBatch(rowNum++);
- }
- }
- } catch (FilterUnsupportedException e) {
- throw new IOException("Failed to filter row in vector reader", e);
- }
- }
- vectorProxy.setNumRows(rowNum);
- return rowNum > 0;
- }
-
- private void readRowFromStream() {
- input.nextRow();
- short nullLen = input.readShort();
- BitSet nullBitSet = allNonNull;
- if (nullLen > 0) {
- nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
- }
- int colCount = 0;
- // primitive type dimension
- for (; colCount < isNoDictColumn.length; colCount++) {
- if (nullBitSet.get(colCount)) {
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = null;
- }
- } else {
- if (isNoDictColumn[colCount]) {
- int v = input.readShort();
- if (isRequired[colCount]) {
- byte[] b = input.readBytes(v);
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = b;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] =
- DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(b,
- storageColumns[colCount].getDataType());
- }
- } else {
- input.skipBytes(v);
- }
- } else if (null != directDictionaryGenerators[colCount]) {
- if (isRequired[colCount]) {
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = input.copy(4);
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] =
- directDictionaryGenerators[colCount].getValueFromSurrogate(input.readInt());
- } else {
- input.skipBytes(4);
- }
- } else {
- input.skipBytes(4);
- }
- } else {
- if (isRequired[colCount]) {
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = input.copy(4);
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = input.readInt();
- } else {
- input.skipBytes(4);
- }
- } else {
- input.skipBytes(4);
- }
- }
- }
- }
- // complex type dimension
- for (; colCount < dimensionCount; colCount++) {
- if (nullBitSet.get(colCount)) {
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = null;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = null;
- }
- } else {
- short v = input.readShort();
- if (isRequired[colCount]) {
- byte[] b = input.readBytes(v);
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = b;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = queryTypes[colCount]
- .getDataBasedOnDataType(ByteBuffer.wrap(b));
- }
- } else {
- input.skipBytes(v);
- }
- }
- }
- // measure
- DataType dataType;
- for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
- if (nullBitSet.get(colCount)) {
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = null;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = null;
- }
- } else {
- dataType = measureDataTypes[msrCount];
- if (dataType == DataTypes.BOOLEAN) {
- if (isRequired[colCount]) {
- boolean v = input.readBoolean();
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = v;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = v;
- }
- } else {
- input.skipBytes(1);
- }
- } else if (dataType == DataTypes.SHORT) {
- if (isRequired[colCount]) {
- short v = input.readShort();
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = v;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = v;
- }
- } else {
- input.skipBytes(2);
- }
- } else if (dataType == DataTypes.INT) {
- if (isRequired[colCount]) {
- int v = input.readInt();
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = v;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = v;
- }
- } else {
- input.skipBytes(4);
- }
- } else if (dataType == DataTypes.LONG) {
- if (isRequired[colCount]) {
- long v = input.readLong();
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = v;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = v;
- }
- } else {
- input.skipBytes(8);
- }
- } else if (dataType == DataTypes.DOUBLE) {
- if (isRequired[colCount]) {
- double v = input.readDouble();
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = v;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = v;
- }
- } else {
- input.skipBytes(8);
- }
- } else if (DataTypes.isDecimal(dataType)) {
- int len = input.readShort();
- if (isRequired[colCount]) {
- BigDecimal v = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = v;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] =
- DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(v);
- }
- } else {
- input.skipBytes(len);
- }
- }
- }
}
}
- private void readRawRowFromStream() {
- input.nextRow();
- short nullLen = input.readShort();
- BitSet nullBitSet = allNonNull;
- if (nullLen > 0) {
- nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
- }
- int colCount = 0;
- // primitive type dimension
- for (; colCount < isNoDictColumn.length; colCount++) {
- if (nullBitSet.get(colCount)) {
- outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
- } else {
- if (isNoDictColumn[colCount]) {
- int v = input.readShort();
- outputValues[colCount] = input.readBytes(v);
- } else {
- outputValues[colCount] = input.readInt();
- }
- }
- }
- // complex type dimension
- for (; colCount < dimensionCount; colCount++) {
- if (nullBitSet.get(colCount)) {
- outputValues[colCount] = null;
- } else {
- short v = input.readShort();
- outputValues[colCount] = input.readBytes(v);
- }
- }
- // measure
- DataType dataType;
- for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
- if (nullBitSet.get(colCount)) {
- outputValues[colCount] = null;
- } else {
- dataType = measureDataTypes[msrCount];
- if (dataType == DataTypes.BOOLEAN) {
- outputValues[colCount] = input.readBoolean();
- } else if (dataType == DataTypes.SHORT) {
- outputValues[colCount] = input.readShort();
- } else if (dataType == DataTypes.INT) {
- outputValues[colCount] = input.readInt();
- } else if (dataType == DataTypes.LONG) {
- outputValues[colCount] = input.readLong();
- } else if (dataType == DataTypes.DOUBLE) {
- outputValues[colCount] = input.readDouble();
- } else if (DataTypes.isDecimal(dataType)) {
- int len = input.readShort();
- outputValues[colCount] = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
- }
- }
+ @Override public void close() throws IOException {
+ super.close();
+ if (null != vectorProxy) {
+ vectorProxy.close();
}
}
-
- private void putRowToColumnBatch(int rowId) {
- for (int i = 0; i < projection.length; i++) {
- Object value = outputValues[i];
- vectorProxy.getColumnVector(i).putRowToColumnBatch(rowId,value);
-
- }
- }
-
- @Override public float getProgress() throws IOException, InterruptedException {
- return 0;
- }
-
- @Override public void close() throws IOException {
- if (null != input) {
- input.close();
- }
- if (null != vectorProxy) {
- vectorProxy.close();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
deleted file mode 100644
index 835b115..0000000
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.streaming;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.scan.complextypes.ArrayQueryType;
-import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType;
-import org.apache.carbondata.core.scan.complextypes.StructQueryType;
-import org.apache.carbondata.core.scan.filter.GenericQueryType;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.hadoop.InputMetricsStats;
-import org.apache.carbondata.streaming.CarbonStreamUtils;
-
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- * Stream input format
- */
-public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> {
-
- public static final String READ_BUFFER_SIZE = "carbon.stream.read.buffer.size";
- public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
- public static final String STREAM_RECORD_READER_INSTANCE =
- "org.apache.carbondata.stream.CarbonStreamRecordReader";
- // return raw row for handoff
- private boolean useRawRow = false;
-
- public void setUseRawRow(boolean useRawRow) {
- this.useRawRow = useRawRow;
- }
-
- public void setInputMetricsStats(InputMetricsStats inputMetricsStats) {
- this.inputMetricsStats = inputMetricsStats;
- }
-
- public void setIsVectorReader(boolean vectorReader) {
- isVectorReader = vectorReader;
- }
-
- public void setModel(QueryModel model) {
- this.model = model;
- }
-
- // InputMetricsStats
- private InputMetricsStats inputMetricsStats;
- // vector reader
- private boolean isVectorReader;
- private QueryModel model;
-
- @Override
- public RecordReader<Void, Object> createRecordReader(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- try {
- Constructor cons = CarbonStreamUtils
- .getConstructorWithReflection(STREAM_RECORD_READER_INSTANCE, boolean.class,
- InputMetricsStats.class, QueryModel.class, boolean.class);
- return (RecordReader) CarbonStreamUtils
- .getInstanceWithReflection(cons, isVectorReader, inputMetricsStats, model, useRawRow);
-
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- public static GenericQueryType[] getComplexDimensions(CarbonTable carbontable,
- CarbonColumn[] carbonColumns, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache)
- throws IOException {
- GenericQueryType[] queryTypes = new GenericQueryType[carbonColumns.length];
- for (int i = 0; i < carbonColumns.length; i++) {
- if (carbonColumns[i].isComplex()) {
- if (DataTypes.isArrayType(carbonColumns[i].getDataType())) {
- queryTypes[i] = new ArrayQueryType(carbonColumns[i].getColName(),
- carbonColumns[i].getColName(), i);
- } else if (DataTypes.isStructType(carbonColumns[i].getDataType())) {
- queryTypes[i] = new StructQueryType(carbonColumns[i].getColName(),
- carbonColumns[i].getColName(), i);
- } else {
- throw new UnsupportedOperationException(
- carbonColumns[i].getDataType().getName() + " is not supported");
- }
-
- fillChildren(carbontable, queryTypes[i], (CarbonDimension) carbonColumns[i], i, cache);
- }
- }
-
- return queryTypes;
- }
-
- private static void fillChildren(CarbonTable carbontable, GenericQueryType parentQueryType,
- CarbonDimension dimension, int parentBlockIndex,
- Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache) throws IOException {
- for (int i = 0; i < dimension.getNumberOfChild(); i++) {
- CarbonDimension child = dimension.getListOfChildDimensions().get(i);
- DataType dataType = child.getDataType();
- GenericQueryType queryType = null;
- if (DataTypes.isArrayType(dataType)) {
- queryType =
- new ArrayQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex);
-
- } else if (DataTypes.isStructType(dataType)) {
- queryType =
- new StructQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex);
- parentQueryType.addChildren(queryType);
- } else {
- boolean isDirectDictionary =
- CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DIRECT_DICTIONARY);
- boolean isDictionary =
- CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DICTIONARY);
- Dictionary dictionary = null;
- if (isDictionary) {
- String dictionaryPath = carbontable.getTableInfo().getFactTable().getTableProperties()
- .get(CarbonCommonConstants.DICTIONARY_PATH);
- DictionaryColumnUniqueIdentifier dictionarIdentifier =
- new DictionaryColumnUniqueIdentifier(carbontable.getAbsoluteTableIdentifier(),
- child.getColumnIdentifier(), child.getDataType(), dictionaryPath);
- dictionary = cache.get(dictionarIdentifier);
- }
- queryType =
- new PrimitiveQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex,
- child.getDataType(), 4, dictionary,
- isDirectDictionary);
- }
- parentQueryType.addChildren(queryType);
- if (child.getNumberOfChild() > 0) {
- fillChildren(carbontable, queryType, child, parentBlockIndex, cache);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamUtils.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamUtils.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamUtils.java
deleted file mode 100644
index a7940f9..0000000
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamUtils.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.streaming;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-/**
- * Util class which does utility function for stream module
- */
-public class CarbonStreamUtils {
-
- public static Constructor getConstructorWithReflection(String className,
- Class<?>... parameterTypes)
- throws ClassNotFoundException, NoSuchMethodException {
- Class loadedClass = Class.forName(className);
- return loadedClass.getConstructor(parameterTypes);
-
- }
-
- public static Object getInstanceWithReflection(Constructor cons, Object... initargs) throws
- IllegalAccessException,
- InvocationTargetException, InstantiationException {
- return cons.newInstance(initargs);
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
deleted file mode 100644
index 0467fe4..0000000
--- a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.streaming;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.format.BlockletHeader;
-
-/**
- * stream blocklet reader
- */
-public class StreamBlockletReader {
-
- private byte[] buffer;
- private int offset;
- private final byte[] syncMarker;
- private final byte[] syncBuffer;
- private final int syncLen;
- private long pos = 0;
- private final InputStream in;
- private final long limitStart;
- private final long limitEnd;
- private boolean isAlreadySync = false;
- private Compressor compressor;
- private int rowNums = 0;
- private int rowIndex = 0;
- private boolean isHeaderPresent;
-
- public StreamBlockletReader(byte[] syncMarker, InputStream in, long limit,
- boolean isHeaderPresent, String compressorName) {
- this.syncMarker = syncMarker;
- syncLen = syncMarker.length;
- syncBuffer = new byte[syncLen];
- this.in = in;
- limitStart = limit;
- limitEnd = limitStart + syncLen;
- this.isHeaderPresent = isHeaderPresent;
- this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
- }
-
- private void ensureCapacity(int capacity) {
- if (buffer == null || capacity > buffer.length) {
- buffer = new byte[capacity];
- }
- }
-
- /**
- * find the first position of sync_marker in input stream
- */
- private boolean sync() throws IOException {
- if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
- return false;
- }
- boolean skipHeader = false;
- for (int i = 0; i < limitStart; i++) {
- int j = 0;
- for (; j < syncLen; j++) {
- if (syncMarker[j] != syncBuffer[(i + j) % syncLen]) break;
- }
- if (syncLen == j) {
- if (isHeaderPresent) {
- if (skipHeader) {
- return true;
- } else {
- skipHeader = true;
- }
- } else {
- return true;
- }
- }
- int value = in.read();
- if (-1 == value) {
- return false;
- }
- syncBuffer[i % syncLen] = (byte) value;
- pos++;
- }
- return false;
- }
-
- public BlockletHeader readBlockletHeader() throws IOException {
- int len = readIntFromStream();
- byte[] b = new byte[len];
- if (!readBytesFromStream(b, 0, len)) {
- throw new EOFException("Failed to read blocklet header");
- }
- BlockletHeader header = CarbonUtil.readBlockletHeader(b);
- rowNums = header.getBlocklet_info().getNum_rows();
- rowIndex = 0;
- return header;
- }
-
- public void readBlockletData(BlockletHeader header) throws IOException {
- ensureCapacity(header.getBlocklet_length());
- offset = 0;
- int len = readIntFromStream();
- byte[] b = new byte[len];
- if (!readBytesFromStream(b, 0, len)) {
- throw new EOFException("Failed to read blocklet data");
- }
- compressor.rawUncompress(b, buffer);
- }
-
- public void skipBlockletData(boolean reset) throws IOException {
- int len = readIntFromStream();
- skip(len);
- pos += len;
- if (reset) {
- this.rowNums = 0;
- this.rowIndex = 0;
- }
- }
-
- private void skip(int len) throws IOException {
- long remaining = len;
- do {
- long skipLen = in.skip(remaining);
- remaining -= skipLen;
- } while (remaining > 0);
- }
-
- /**
- * find the next blocklet
- */
- public boolean nextBlocklet() throws IOException {
- if (pos >= limitStart) {
- return false;
- }
- if (isAlreadySync) {
- if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
- return false;
- }
- } else {
- isAlreadySync = true;
- if (!sync()) {
- return false;
- }
- }
-
- return pos < limitEnd;
- }
-
- public boolean hasNext() throws IOException {
- return rowIndex < rowNums;
- }
-
- public void nextRow() {
- rowIndex++;
- }
-
- public int readIntFromStream() throws IOException {
- int ch1 = in.read();
- int ch2 = in.read();
- int ch3 = in.read();
- int ch4 = in.read();
- if ((ch1 | ch2 | ch3 | ch4) < 0) throw new EOFException();
- pos += 4;
- return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
- }
-
- /**
- * Reads <code>len</code> bytes of data from the input stream into
- * an array of bytes.
- * @return <code>true</code> if reading data successfully, or
- * <code>false</code> if there is no more data because the end of the stream has been reached.
- */
- public boolean readBytesFromStream(byte[] b, int offset, int len) throws IOException {
- int readLen = in.read(b, offset, len);
- if (readLen < 0) {
- return false;
- }
- pos += readLen;
- if (readLen < len) {
- return readBytesFromStream(b, offset + readLen, len - readLen);
- } else {
- return true;
- }
- }
-
- public boolean readBoolean() {
- return (buffer[offset++]) != 0;
- }
-
- public short readShort() {
- short v = (short) ((buffer[offset + 1] & 255) +
- ((buffer[offset]) << 8));
- offset += 2;
- return v;
- }
-
- public byte[] copy(int len) {
- byte[] b = new byte[len];
- System.arraycopy(buffer, offset, b, 0, len);
- return b;
- }
-
- public int readInt() {
- int v = ((buffer[offset + 3] & 255) +
- ((buffer[offset + 2] & 255) << 8) +
- ((buffer[offset + 1] & 255) << 16) +
- ((buffer[offset]) << 24));
- offset += 4;
- return v;
- }
-
- public long readLong() {
- long v = ((long)(buffer[offset + 7] & 255)) +
- ((long) (buffer[offset + 6] & 255) << 8) +
- ((long) (buffer[offset + 5] & 255) << 16) +
- ((long) (buffer[offset + 4] & 255) << 24) +
- ((long) (buffer[offset + 3] & 255) << 32) +
- ((long) (buffer[offset + 2] & 255) << 40) +
- ((long) (buffer[offset + 1] & 255) << 48) +
- ((long) (buffer[offset]) << 56);
- offset += 8;
- return v;
- }
-
- public double readDouble() {
- return Double.longBitsToDouble(readLong());
- }
-
- public byte[] readBytes(int len) {
- byte[] b = new byte[len];
- System.arraycopy(buffer, offset, b, 0, len);
- offset += len;
- return b;
- }
-
- public void skipBytes(int len) {
- offset += len;
- }
-
- public int getRowNums() {
- return rowNums;
- }
-
- public void close() {
- CarbonUtil.closeStreams(in);
- }
-}
[2/2] carbondata git commit: [CARBONDATA-3220] Support presto to read
stream segment data
Posted by ra...@apache.org.
[CARBONDATA-3220] Support presto to read stream segment data
Support presto read the streaming table
re-factory old CarbonStreamRecordReader to reuse code for presto
change CarbondataPageSource to support read streaming data by StreamRecordReader
This closes #3001
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d78db8f6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d78db8f6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d78db8f6
Branch: refs/heads/master
Commit: d78db8f6bd714e6a5111812b0c0473418201b23c
Parents: 8e6def9
Author: QiangCai <qi...@qq.com>
Authored: Wed Jan 9 22:06:02 2019 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Jan 10 16:59:54 2019 +0530
----------------------------------------------------------------------
.../carbondata/hadoop/CarbonInputSplit.java | 4 +-
.../hadoop/stream/CarbonStreamInputFormat.java | 159 +++++
.../hadoop/stream/CarbonStreamUtils.java | 40 ++
.../hadoop/stream/StreamBlockletReader.java | 261 +++++++
.../hadoop/stream/StreamRecordReader.java | 614 +++++++++++++++++
.../carbondata/presto/CarbonVectorBatch.java | 10 +-
.../carbondata/presto/CarbondataPageSource.java | 318 ++++++++-
.../presto/CarbondataPageSourceProvider.java | 142 +---
.../presto/impl/CarbonLocalInputSplit.java | 44 +-
.../presto/impl/CarbonLocalMultiBlockSplit.java | 4 +
.../presto/impl/CarbonTableReader.java | 3 +-
.../presto/readers/BooleanStreamReader.java | 11 +
.../readers/DecimalSliceStreamReader.java | 12 +
.../presto/readers/DoubleStreamReader.java | 12 +
.../presto/readers/IntegerStreamReader.java | 7 +
.../presto/readers/LongStreamReader.java | 12 +
.../presto/readers/ShortStreamReader.java | 12 +
.../presto/readers/SliceStreamReader.java | 13 +
.../presto/readers/TimestampStreamReader.java | 12 +
.../carbondata/spark/rdd/CarbonScanRDD.scala | 2 +-
.../carbondata/spark/rdd/StreamHandoffRDD.scala | 2 +-
.../stream/CarbonStreamRecordReader.java | 684 ++-----------------
.../streaming/CarbonStreamInputFormat.java | 160 -----
.../carbondata/streaming/CarbonStreamUtils.java | 40 --
.../streaming/StreamBlockletReader.java | 261 -------
25 files changed, 1595 insertions(+), 1244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index de2451b..bcf703c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -195,7 +195,9 @@ public class CarbonInputSplit extends FileSplit
blockletInfos, split.getVersion(), split.getDeleteDeltaFiles());
blockInfo.setDetailInfo(split.getDetailInfo());
blockInfo.setDataMapWriterPath(split.dataMapWritePath);
- blockInfo.setBlockOffset(split.getDetailInfo().getBlockFooterOffset());
+ if (split.getDetailInfo() != null) {
+ blockInfo.setBlockOffset(split.getDetailInfo().getBlockFooterOffset());
+ }
tableBlockInfoList.add(blockInfo);
} catch (IOException e) {
throw new RuntimeException("fail to get location of split: " + split, e);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java
new file mode 100644
index 0000000..e4819ee
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.stream;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.scan.complextypes.ArrayQueryType;
+import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType;
+import org.apache.carbondata.core.scan.complextypes.StructQueryType;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.hadoop.InputMetricsStats;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * Stream input format
+ */
+public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> {
+
+ public static final String READ_BUFFER_SIZE = "carbon.stream.read.buffer.size";
+ public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
+ public static final String STREAM_RECORD_READER_INSTANCE =
+ "org.apache.carbondata.stream.CarbonStreamRecordReader";
+ // return raw row for handoff
+ private boolean useRawRow = false;
+
+ public void setUseRawRow(boolean useRawRow) {
+ this.useRawRow = useRawRow;
+ }
+
+ public void setInputMetricsStats(InputMetricsStats inputMetricsStats) {
+ this.inputMetricsStats = inputMetricsStats;
+ }
+
+ public void setIsVectorReader(boolean vectorReader) {
+ isVectorReader = vectorReader;
+ }
+
+ public void setModel(QueryModel model) {
+ this.model = model;
+ }
+
+ // InputMetricsStats
+ private InputMetricsStats inputMetricsStats;
+ // vector reader
+ private boolean isVectorReader;
+ private QueryModel model;
+
+ @Override
+ public RecordReader<Void, Object> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException {
+ try {
+ Constructor cons = CarbonStreamUtils
+ .getConstructorWithReflection(STREAM_RECORD_READER_INSTANCE, boolean.class,
+ InputMetricsStats.class, QueryModel.class, boolean.class);
+ return (RecordReader) CarbonStreamUtils
+ .getInstanceWithReflection(cons, isVectorReader, inputMetricsStats, model, useRawRow);
+
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ public static GenericQueryType[] getComplexDimensions(CarbonTable carbontable,
+ CarbonColumn[] carbonColumns, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache)
+ throws IOException {
+ GenericQueryType[] queryTypes = new GenericQueryType[carbonColumns.length];
+ for (int i = 0; i < carbonColumns.length; i++) {
+ if (carbonColumns[i].isComplex()) {
+ if (DataTypes.isArrayType(carbonColumns[i].getDataType())) {
+ queryTypes[i] = new ArrayQueryType(carbonColumns[i].getColName(),
+ carbonColumns[i].getColName(), i);
+ } else if (DataTypes.isStructType(carbonColumns[i].getDataType())) {
+ queryTypes[i] = new StructQueryType(carbonColumns[i].getColName(),
+ carbonColumns[i].getColName(), i);
+ } else {
+ throw new UnsupportedOperationException(
+ carbonColumns[i].getDataType().getName() + " is not supported");
+ }
+
+ fillChildren(carbontable, queryTypes[i], (CarbonDimension) carbonColumns[i], i, cache);
+ }
+ }
+
+ return queryTypes;
+ }
+
+ private static void fillChildren(CarbonTable carbontable, GenericQueryType parentQueryType,
+ CarbonDimension dimension, int parentBlockIndex,
+ Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache) throws IOException {
+ for (int i = 0; i < dimension.getNumberOfChild(); i++) {
+ CarbonDimension child = dimension.getListOfChildDimensions().get(i);
+ DataType dataType = child.getDataType();
+ GenericQueryType queryType = null;
+ if (DataTypes.isArrayType(dataType)) {
+ queryType =
+ new ArrayQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex);
+
+ } else if (DataTypes.isStructType(dataType)) {
+ queryType =
+ new StructQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex);
+ parentQueryType.addChildren(queryType);
+ } else {
+ boolean isDirectDictionary =
+ CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DIRECT_DICTIONARY);
+ boolean isDictionary =
+ CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DICTIONARY);
+ Dictionary dictionary = null;
+ if (isDictionary) {
+ String dictionaryPath = carbontable.getTableInfo().getFactTable().getTableProperties()
+ .get(CarbonCommonConstants.DICTIONARY_PATH);
+ DictionaryColumnUniqueIdentifier dictionarIdentifier =
+ new DictionaryColumnUniqueIdentifier(carbontable.getAbsoluteTableIdentifier(),
+ child.getColumnIdentifier(), child.getDataType(), dictionaryPath);
+ dictionary = cache.get(dictionarIdentifier);
+ }
+ queryType =
+ new PrimitiveQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex,
+ child.getDataType(), 4, dictionary,
+ isDirectDictionary);
+ }
+ parentQueryType.addChildren(queryType);
+ if (child.getNumberOfChild() > 0) {
+ fillChildren(carbontable, queryType, child, parentBlockIndex, cache);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java
new file mode 100644
index 0000000..78669e7
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hadoop.stream;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Util class which does utility function for stream module
+ */
+public class CarbonStreamUtils {
+
+ public static Constructor getConstructorWithReflection(String className,
+ Class<?>... parameterTypes)
+ throws ClassNotFoundException, NoSuchMethodException {
+ Class loadedClass = Class.forName(className);
+ return loadedClass.getConstructor(parameterTypes);
+
+ }
+
+ public static Object getInstanceWithReflection(Constructor cons, Object... initargs) throws
+ IllegalAccessException,
+ InvocationTargetException, InstantiationException {
+ return cons.newInstance(initargs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java
new file mode 100644
index 0000000..dbcf72d
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.stream;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.BlockletHeader;
+
+/**
+ * stream blocklet reader
+ */
+public class StreamBlockletReader {
+
+ private byte[] buffer;
+ private int offset;
+ private final byte[] syncMarker;
+ private final byte[] syncBuffer;
+ private final int syncLen;
+ private long pos = 0;
+ private final InputStream in;
+ private final long limitStart;
+ private final long limitEnd;
+ private boolean isAlreadySync = false;
+ private Compressor compressor;
+ private int rowNums = 0;
+ private int rowIndex = 0;
+ private boolean isHeaderPresent;
+
+ public StreamBlockletReader(byte[] syncMarker, InputStream in, long limit,
+ boolean isHeaderPresent, String compressorName) {
+ this.syncMarker = syncMarker;
+ syncLen = syncMarker.length;
+ syncBuffer = new byte[syncLen];
+ this.in = in;
+ limitStart = limit;
+ limitEnd = limitStart + syncLen;
+ this.isHeaderPresent = isHeaderPresent;
+ this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
+ }
+
+ private void ensureCapacity(int capacity) {
+ if (buffer == null || capacity > buffer.length) {
+ buffer = new byte[capacity];
+ }
+ }
+
+ /**
+ * find the first position of sync_marker in input stream
+ */
+ private boolean sync() throws IOException {
+ if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
+ return false;
+ }
+ boolean skipHeader = false;
+ for (int i = 0; i < limitStart; i++) {
+ int j = 0;
+ for (; j < syncLen; j++) {
+ if (syncMarker[j] != syncBuffer[(i + j) % syncLen]) break;
+ }
+ if (syncLen == j) {
+ if (isHeaderPresent) {
+ if (skipHeader) {
+ return true;
+ } else {
+ skipHeader = true;
+ }
+ } else {
+ return true;
+ }
+ }
+ int value = in.read();
+ if (-1 == value) {
+ return false;
+ }
+ syncBuffer[i % syncLen] = (byte) value;
+ pos++;
+ }
+ return false;
+ }
+
+ public BlockletHeader readBlockletHeader() throws IOException {
+ int len = readIntFromStream();
+ byte[] b = new byte[len];
+ if (!readBytesFromStream(b, 0, len)) {
+ throw new EOFException("Failed to read blocklet header");
+ }
+ BlockletHeader header = CarbonUtil.readBlockletHeader(b);
+ rowNums = header.getBlocklet_info().getNum_rows();
+ rowIndex = 0;
+ return header;
+ }
+
+ public void readBlockletData(BlockletHeader header) throws IOException {
+ ensureCapacity(header.getBlocklet_length());
+ offset = 0;
+ int len = readIntFromStream();
+ byte[] b = new byte[len];
+ if (!readBytesFromStream(b, 0, len)) {
+ throw new EOFException("Failed to read blocklet data");
+ }
+ compressor.rawUncompress(b, buffer);
+ }
+
+ public void skipBlockletData(boolean reset) throws IOException {
+ int len = readIntFromStream();
+ skip(len);
+ pos += len;
+ if (reset) {
+ this.rowNums = 0;
+ this.rowIndex = 0;
+ }
+ }
+
+ private void skip(int len) throws IOException {
+ long remaining = len;
+ do {
+ long skipLen = in.skip(remaining);
+ remaining -= skipLen;
+ } while (remaining > 0);
+ }
+
+ /**
+ * find the next blocklet
+ */
+ public boolean nextBlocklet() throws IOException {
+ if (pos >= limitStart) {
+ return false;
+ }
+ if (isAlreadySync) {
+ if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
+ return false;
+ }
+ } else {
+ isAlreadySync = true;
+ if (!sync()) {
+ return false;
+ }
+ }
+
+ return pos < limitEnd;
+ }
+
+ public boolean hasNext() throws IOException {
+ return rowIndex < rowNums;
+ }
+
+ public void nextRow() {
+ rowIndex++;
+ }
+
+ public int readIntFromStream() throws IOException {
+ int ch1 = in.read();
+ int ch2 = in.read();
+ int ch3 = in.read();
+ int ch4 = in.read();
+ if ((ch1 | ch2 | ch3 | ch4) < 0) throw new EOFException();
+ pos += 4;
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+ }
+
+ /**
+ * Reads <code>len</code> bytes of data from the input stream into
+ * an array of bytes.
+ * @return <code>true</code> if reading data successfully, or
+ * <code>false</code> if there is no more data because the end of the stream has been reached.
+ */
+ public boolean readBytesFromStream(byte[] b, int offset, int len) throws IOException {
+ int readLen = in.read(b, offset, len);
+ if (readLen < 0) {
+ return false;
+ }
+ pos += readLen;
+ if (readLen < len) {
+ return readBytesFromStream(b, offset + readLen, len - readLen);
+ } else {
+ return true;
+ }
+ }
+
+ public boolean readBoolean() {
+ return (buffer[offset++]) != 0;
+ }
+
+ public short readShort() {
+ short v = (short) ((buffer[offset + 1] & 255) +
+ ((buffer[offset]) << 8));
+ offset += 2;
+ return v;
+ }
+
+ public byte[] copy(int len) {
+ byte[] b = new byte[len];
+ System.arraycopy(buffer, offset, b, 0, len);
+ return b;
+ }
+
+ public int readInt() {
+ int v = ((buffer[offset + 3] & 255) +
+ ((buffer[offset + 2] & 255) << 8) +
+ ((buffer[offset + 1] & 255) << 16) +
+ ((buffer[offset]) << 24));
+ offset += 4;
+ return v;
+ }
+
+ public long readLong() {
+ long v = ((long)(buffer[offset + 7] & 255)) +
+ ((long) (buffer[offset + 6] & 255) << 8) +
+ ((long) (buffer[offset + 5] & 255) << 16) +
+ ((long) (buffer[offset + 4] & 255) << 24) +
+ ((long) (buffer[offset + 3] & 255) << 32) +
+ ((long) (buffer[offset + 2] & 255) << 40) +
+ ((long) (buffer[offset + 1] & 255) << 48) +
+ ((long) (buffer[offset]) << 56);
+ offset += 8;
+ return v;
+ }
+
+ public double readDouble() {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ public byte[] readBytes(int len) {
+ byte[] b = new byte[len];
+ System.arraycopy(buffer, offset, b, 0, len);
+ offset += len;
+ return b;
+ }
+
+ public void skipBytes(int len) {
+ offset += len;
+ }
+
+ public int getRowNums() {
+ return rowNums;
+ }
+
+ public void close() {
+ CarbonUtil.closeStreams(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
new file mode 100644
index 0000000..75e36be
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.stream;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonHeaderReader;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.intf.RowImpl;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.format.BlockletHeader;
+import org.apache.carbondata.format.FileHeader;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * Stream row record reader
+ */
+public class StreamRecordReader extends RecordReader<Void, Object> {
+
+ // metadata
+ protected CarbonTable carbonTable;
+ private CarbonColumn[] storageColumns;
+ private boolean[] isRequired;
+ private DataType[] measureDataTypes;
+ private int dimensionCount;
+ private int measureCount;
+
+ // input
+ private FileSplit fileSplit;
+ private Configuration hadoopConf;
+ protected StreamBlockletReader input;
+ protected boolean isFirstRow = true;
+ protected QueryModel model;
+
+ // decode data
+ private BitSet allNonNull;
+ private boolean[] isNoDictColumn;
+ private DirectDictionaryGenerator[] directDictionaryGenerators;
+ private CacheProvider cacheProvider;
+ private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
+ private GenericQueryType[] queryTypes;
+ private String compressorName;
+
+ // vectorized reader
+ protected boolean isFinished = false;
+
+ // filter
+ protected FilterExecuter filter;
+ private boolean[] isFilterRequired;
+ private Object[] filterValues;
+ protected RowIntf filterRow;
+ private int[] filterMap;
+
+ // output
+ protected CarbonColumn[] projection;
+ private boolean[] isProjectionRequired;
+ private int[] projectionMap;
+ protected Object[] outputValues;
+
+ // empty project, null filter
+ protected boolean skipScanData;
+
+ // return raw row for handoff
+ private boolean useRawRow = false;
+
+ public StreamRecordReader(QueryModel mdl, boolean useRawRow) {
+ this.model = mdl;
+ this.useRawRow = useRawRow;
+
+ }
+
+ @Override public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException {
+ // input
+ if (split instanceof CarbonInputSplit) {
+ fileSplit = (CarbonInputSplit) split;
+ } else if (split instanceof CarbonMultiBlockSplit) {
+ fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0);
+ } else {
+ fileSplit = (FileSplit) split;
+ }
+
+ // metadata
+ hadoopConf = context.getConfiguration();
+ if (model == null) {
+ CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
+ model = format.createQueryModel(split, context);
+ }
+ carbonTable = model.getTable();
+ List<CarbonDimension> dimensions =
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
+ dimensionCount = dimensions.size();
+ List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(carbonTable.getTableName());
+ measureCount = measures.size();
+ List<CarbonColumn> carbonColumnList =
+ carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName());
+ storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]);
+ isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns);
+ directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length];
+ for (int i = 0; i < storageColumns.length; i++) {
+ if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(storageColumns[i].getDataType());
+ }
+ }
+ measureDataTypes = new DataType[measureCount];
+ for (int i = 0; i < measureCount; i++) {
+ measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType();
+ }
+
+ // decode data
+ allNonNull = new BitSet(storageColumns.length);
+ projection = model.getProjectionColumns();
+
+ isRequired = new boolean[storageColumns.length];
+ boolean[] isFiltlerDimensions = model.getIsFilterDimensions();
+ boolean[] isFiltlerMeasures = model.getIsFilterMeasures();
+ isFilterRequired = new boolean[storageColumns.length];
+ filterMap = new int[storageColumns.length];
+ for (int i = 0; i < storageColumns.length; i++) {
+ if (storageColumns[i].isDimension()) {
+ if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) {
+ isRequired[i] = true;
+ isFilterRequired[i] = true;
+ filterMap[i] = storageColumns[i].getOrdinal();
+ }
+ } else {
+ if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) {
+ isRequired[i] = true;
+ isFilterRequired[i] = true;
+ filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal();
+ }
+ }
+ }
+
+ isProjectionRequired = new boolean[storageColumns.length];
+ projectionMap = new int[storageColumns.length];
+ for (int j = 0; j < projection.length; j++) {
+ for (int i = 0; i < storageColumns.length; i++) {
+ if (storageColumns[i].getColName().equals(projection[j].getColName())) {
+ isRequired[i] = true;
+ isProjectionRequired[i] = true;
+ projectionMap[i] = j;
+ break;
+ }
+ }
+ }
+
+ // initialize filter
+ if (null != model.getFilterExpressionResolverTree()) {
+ initializeFilter();
+ } else if (projection.length == 0) {
+ skipScanData = true;
+ }
+
+ }
+
+ private void initializeFilter() {
+
+ List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
+ .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
+ carbonTable.getMeasureByTableName(carbonTable.getTableName()));
+ int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
+ for (int i = 0; i < dimLensWithComplex.length; i++) {
+ dimLensWithComplex[i] = Integer.MAX_VALUE;
+ }
+
+ int[] dictionaryColumnCardinality =
+ CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
+ SegmentProperties segmentProperties =
+ new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality);
+ Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
+
+ FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree();
+ filter =
+ FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties, complexDimensionInfoMap);
+ // for row filter, we need update column index
+ FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
+ carbonTable.getDimensionOrdinalMax());
+
+ }
+
+ private byte[] getSyncMarker(String filePath) throws IOException {
+ CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath);
+ FileHeader header = headerReader.readHeader();
+ // legacy store does not have this member
+ if (header.isSetCompressor_name()) {
+ compressorName = header.getCompressor_name();
+ } else {
+ compressorName = CompressorFactory.NativeSupportedCompressor.SNAPPY.getName();
+ }
+ return header.getSync_marker();
+ }
+
+ protected void initializeAtFirstRow() throws IOException {
+ filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount];
+ filterRow = new RowImpl();
+ filterRow.setValues(filterValues);
+
+ outputValues = new Object[projection.length];
+
+ Path file = fileSplit.getPath();
+
+ byte[] syncMarker = getSyncMarker(file.toString());
+
+ FileSystem fs = file.getFileSystem(hadoopConf);
+
+ int bufferSize = Integer.parseInt(hadoopConf.get(CarbonStreamInputFormat.READ_BUFFER_SIZE,
+ CarbonStreamInputFormat.READ_BUFFER_SIZE_DEFAULT));
+
+ FSDataInputStream fileIn = fs.open(file, bufferSize);
+ fileIn.seek(fileSplit.getStart());
+ input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(),
+ fileSplit.getStart() == 0, compressorName);
+
+ cacheProvider = CacheProvider.getInstance();
+ cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY);
+ queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache);
+ }
+
+ /**
+ * check next Row
+ */
+ protected boolean nextRow() throws IOException {
+ // read row one by one
+ try {
+ boolean hasNext;
+ boolean scanMore = false;
+ do {
+ hasNext = input.hasNext();
+ if (hasNext) {
+ if (skipScanData) {
+ input.nextRow();
+ scanMore = false;
+ } else {
+ if (useRawRow) {
+ // read raw row for streaming handoff which does not require decode raw row
+ readRawRowFromStream();
+ } else {
+ readRowFromStream();
+ }
+ if (null != filter) {
+ scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax());
+ } else {
+ scanMore = false;
+ }
+ }
+ } else {
+ if (input.nextBlocklet()) {
+ BlockletHeader header = input.readBlockletHeader();
+ if (isScanRequired(header)) {
+ if (skipScanData) {
+ input.skipBlockletData(false);
+ } else {
+ input.readBlockletData(header);
+ }
+ } else {
+ input.skipBlockletData(true);
+ }
+ scanMore = true;
+ } else {
+ isFinished = true;
+ scanMore = false;
+ }
+ }
+ } while (scanMore);
+ return hasNext;
+ } catch (FilterUnsupportedException e) {
+ throw new IOException("Failed to filter row in detail reader", e);
+ }
+ }
+
+ @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (isFirstRow) {
+ isFirstRow = false;
+ initializeAtFirstRow();
+ }
+ if (isFinished) {
+ return false;
+ }
+
+ return nextRow();
+ }
+
+ @Override public Void getCurrentKey() throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override public Object getCurrentValue() throws IOException, InterruptedException {
+ return outputValues;
+ }
+
+ protected boolean isScanRequired(BlockletHeader header) {
+ if (filter != null && header.getBlocklet_index() != null) {
+ BlockletMinMaxIndex minMaxIndex = CarbonMetadataUtil
+ .convertExternalMinMaxIndex(header.getBlocklet_index().getMin_max_index());
+ if (minMaxIndex != null) {
+ BitSet bitSet = filter
+ .isScanRequired(minMaxIndex.getMaxValues(), minMaxIndex.getMinValues(),
+ minMaxIndex.getIsMinMaxSet());
+ if (bitSet.isEmpty()) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+ }
+ return true;
+ }
+
+ protected void readRowFromStream() {
+ input.nextRow();
+ short nullLen = input.readShort();
+ BitSet nullBitSet = allNonNull;
+ if (nullLen > 0) {
+ nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
+ }
+ int colCount = 0;
+ // primitive type dimension
+ for (; colCount < isNoDictColumn.length; colCount++) {
+ if (nullBitSet.get(colCount)) {
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = null;
+ }
+ } else {
+ if (isNoDictColumn[colCount]) {
+ int v = input.readShort();
+ if (isRequired[colCount]) {
+ byte[] b = input.readBytes(v);
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = b;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = DataTypeUtil
+ .getDataBasedOnDataTypeForNoDictionaryColumn(b,
+ storageColumns[colCount].getDataType());
+ }
+ } else {
+ input.skipBytes(v);
+ }
+ } else if (null != directDictionaryGenerators[colCount]) {
+ if (isRequired[colCount]) {
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = input.copy(4);
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] =
+ directDictionaryGenerators[colCount].getValueFromSurrogate(input.readInt());
+ } else {
+ input.skipBytes(4);
+ }
+ } else {
+ input.skipBytes(4);
+ }
+ } else {
+ if (isRequired[colCount]) {
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = input.copy(4);
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = input.readInt();
+ } else {
+ input.skipBytes(4);
+ }
+ } else {
+ input.skipBytes(4);
+ }
+ }
+ }
+ }
+ // complex type dimension
+ for (; colCount < dimensionCount; colCount++) {
+ if (nullBitSet.get(colCount)) {
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = null;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = null;
+ }
+ } else {
+ short v = input.readShort();
+ if (isRequired[colCount]) {
+ byte[] b = input.readBytes(v);
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = b;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] =
+ queryTypes[colCount].getDataBasedOnDataType(ByteBuffer.wrap(b));
+ }
+ } else {
+ input.skipBytes(v);
+ }
+ }
+ }
+ // measure
+ DataType dataType;
+ for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
+ if (nullBitSet.get(colCount)) {
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = null;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = null;
+ }
+ } else {
+ dataType = measureDataTypes[msrCount];
+ if (dataType == DataTypes.BOOLEAN) {
+ if (isRequired[colCount]) {
+ boolean v = input.readBoolean();
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = v;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = v;
+ }
+ } else {
+ input.skipBytes(1);
+ }
+ } else if (dataType == DataTypes.SHORT) {
+ if (isRequired[colCount]) {
+ short v = input.readShort();
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = v;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = v;
+ }
+ } else {
+ input.skipBytes(2);
+ }
+ } else if (dataType == DataTypes.INT) {
+ if (isRequired[colCount]) {
+ int v = input.readInt();
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = v;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = v;
+ }
+ } else {
+ input.skipBytes(4);
+ }
+ } else if (dataType == DataTypes.LONG) {
+ if (isRequired[colCount]) {
+ long v = input.readLong();
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = v;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = v;
+ }
+ } else {
+ input.skipBytes(8);
+ }
+ } else if (dataType == DataTypes.DOUBLE) {
+ if (isRequired[colCount]) {
+ double v = input.readDouble();
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = v;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = v;
+ }
+ } else {
+ input.skipBytes(8);
+ }
+ } else if (DataTypes.isDecimal(dataType)) {
+ int len = input.readShort();
+ if (isRequired[colCount]) {
+ BigDecimal v = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = v;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] =
+ DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(v);
+ }
+ } else {
+ input.skipBytes(len);
+ }
+ }
+ }
+ }
+ }
+
+ private void readRawRowFromStream() {
+ input.nextRow();
+ short nullLen = input.readShort();
+ BitSet nullBitSet = allNonNull;
+ if (nullLen > 0) {
+ nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
+ }
+ int colCount = 0;
+ // primitive type dimension
+ for (; colCount < isNoDictColumn.length; colCount++) {
+ if (nullBitSet.get(colCount)) {
+ outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+ } else {
+ if (isNoDictColumn[colCount]) {
+ int v = input.readShort();
+ outputValues[colCount] = input.readBytes(v);
+ } else {
+ outputValues[colCount] = input.readInt();
+ }
+ }
+ }
+ // complex type dimension
+ for (; colCount < dimensionCount; colCount++) {
+ if (nullBitSet.get(colCount)) {
+ outputValues[colCount] = null;
+ } else {
+ short v = input.readShort();
+ outputValues[colCount] = input.readBytes(v);
+ }
+ }
+ // measure
+ DataType dataType;
+ for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
+ if (nullBitSet.get(colCount)) {
+ outputValues[colCount] = null;
+ } else {
+ dataType = measureDataTypes[msrCount];
+ if (dataType == DataTypes.BOOLEAN) {
+ outputValues[colCount] = input.readBoolean();
+ } else if (dataType == DataTypes.SHORT) {
+ outputValues[colCount] = input.readShort();
+ } else if (dataType == DataTypes.INT) {
+ outputValues[colCount] = input.readInt();
+ } else if (dataType == DataTypes.LONG) {
+ outputValues[colCount] = input.readLong();
+ } else if (dataType == DataTypes.DOUBLE) {
+ outputValues[colCount] = input.readDouble();
+ } else if (DataTypes.isDecimal(dataType)) {
+ int len = input.readShort();
+ outputValues[colCount] = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
+ }
+ }
+ }
+ }
+
+ @Override public float getProgress() {
+ return 0;
+ }
+
+ @Override public void close() throws IOException {
+ if (null != input) {
+ input.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
index 140e46b..2f0c9eb 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
@@ -75,7 +75,7 @@ public class CarbonVectorBatch {
}
}
- private CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType dataType,
+ public static CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType dataType,
StructField field, Dictionary dictionary) {
if (dataType == DataTypes.BOOLEAN) {
return new BooleanStreamReader(batchSize, field.getDataType(), dictionary);
@@ -92,8 +92,12 @@ public class CarbonVectorBatch {
} else if (dataType == DataTypes.STRING) {
return new SliceStreamReader(batchSize, field.getDataType(), dictionary);
} else if (DataTypes.isDecimal(dataType)) {
- return new DecimalSliceStreamReader(batchSize, field.getDataType(), (DecimalType) dataType,
- dictionary);
+ if (dataType instanceof DecimalType) {
+ return new DecimalSliceStreamReader(batchSize, field.getDataType(), (DecimalType) dataType,
+ dictionary);
+ } else {
+ return null;
+ }
} else {
return new ObjectStreamReader(batchSize, field.getDataType());
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
index 93de394..f289718 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -18,14 +18,46 @@
package org.apache.carbondata.presto;
import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.List;
import java.util.Objects;
+import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.ProjectionMeasure;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.CarbonProjection;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.hadoop.stream.StreamRecordReader;
+import org.apache.carbondata.presto.impl.CarbonLocalMultiBlockSplit;
import org.apache.carbondata.presto.readers.PrestoVectorBlockBuilder;
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
import com.facebook.presto.hadoop.$internal.com.google.common.base.Throwables;
+import com.facebook.presto.hive.HiveColumnHandle;
+import com.facebook.presto.hive.HiveSplit;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.Page;
@@ -33,6 +65,12 @@ import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.LazyBlock;
import com.facebook.presto.spi.block.LazyBlockLoader;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.log4j.Logger;
import static com.google.common.base.Preconditions.checkState;
@@ -44,18 +82,103 @@ class CarbondataPageSource implements ConnectorPageSource {
private static final Logger LOGGER =
LogServiceFactory.getLogService(CarbondataPageSource.class.getName());
+
+ private HiveSplit split;
+ private CarbonTable carbonTable;
+ private String queryId;
+ private Configuration hadoopConf;
+ private FileFormat fileFormat;
private List<ColumnHandle> columnHandles;
+ private int columnCount = 0;
private boolean closed;
- private PrestoCarbonVectorizedRecordReader vectorReader;
private long sizeOfData = 0;
private int batchId;
private long nanoStart;
private long nanoEnd;
+ private CarbonDictionaryDecodeReadSupport readSupport;
+
+ // columnar format split
+ private PrestoCarbonVectorizedRecordReader vectorReader;
+ private boolean isDirectVectorFill;
+
+ // row format split
+ private StreamRecordReader rowReader;
+ private StructField[] fields;
+ private int batchSize = 100;
+ private Dictionary[] dictionaries;
+ private DataType[] dataTypes;
+ private boolean isFrstPage = true;
- CarbondataPageSource(PrestoCarbonVectorizedRecordReader vectorizedRecordReader,
- List<ColumnHandle> columnHandles) {
+ CarbondataPageSource(CarbonTable carbonTable, String queryId, HiveSplit split,
+ List<ColumnHandle> columnHandles, Configuration hadoopConf, boolean isDirectVectorFill) {
+ this.carbonTable = carbonTable;
+ this.queryId = queryId;
+ this.split = split;
this.columnHandles = columnHandles;
- vectorReader = vectorizedRecordReader;
+ this.hadoopConf = hadoopConf;
+ this.isDirectVectorFill = isDirectVectorFill;
+ initialize();
+ }
+
+ private void initialize() {
+ CarbonMultiBlockSplit carbonInputSplit = CarbonLocalMultiBlockSplit
+ .convertSplit(split.getSchema().getProperty("carbonSplit"));
+ fileFormat = carbonInputSplit.getFileFormat();
+ if (fileFormat.ordinal() == FileFormat.ROW_V1.ordinal()) {
+ initializeForRow();
+ } else {
+ initializeForColumnar();
+ }
+ }
+
+ private void initializeForColumnar() {
+ readSupport = new CarbonDictionaryDecodeReadSupport();
+ vectorReader = createReaderForColumnar(split, columnHandles, readSupport, hadoopConf);
+ }
+
+ private void initializeForRow() {
+ QueryModel queryModel = createQueryModel(split, columnHandles, hadoopConf);
+ rowReader = new StreamRecordReader(queryModel, false);
+ List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions();
+ List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures();
+ fields = new StructField[queryDimension.size() + queryMeasures.size()];
+ for (int i = 0; i < queryDimension.size(); i++) {
+ ProjectionDimension dim = queryDimension.get(i);
+ if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(dim.getDimension().getDataType());
+ fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), generator.getReturnType());
+ } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) {
+ fields[dim.getOrdinal()] =
+ new StructField(dim.getColumnName(), dim.getDimension().getDataType());
+ } else if (dim.getDimension().isComplex()) {
+ fields[dim.getOrdinal()] =
+ new StructField(dim.getColumnName(), dim.getDimension().getDataType());
+ } else {
+ fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), DataTypes.INT);
+ }
+ }
+
+ for (ProjectionMeasure msr : queryMeasures) {
+ DataType dataType = msr.getMeasure().getDataType();
+ if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT || dataType == DataTypes.INT
+ || dataType == DataTypes.LONG) {
+ fields[msr.getOrdinal()] =
+ new StructField(msr.getColumnName(), msr.getMeasure().getDataType());
+ } else if (DataTypes.isDecimal(dataType)) {
+ fields[msr.getOrdinal()] =
+ new StructField(msr.getColumnName(), msr.getMeasure().getDataType());
+ } else {
+ fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), DataTypes.DOUBLE);
+ }
+ }
+
+ this.columnCount = columnHandles.size();
+ readSupport = new CarbonDictionaryDecodeReadSupport();
+ readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
+ this.dictionaries = readSupport.getDictionaries();
+ this.dataTypes = readSupport.getDataTypes();
+
}
@Override public long getCompletedBytes() {
@@ -71,6 +194,14 @@ class CarbondataPageSource implements ConnectorPageSource {
}
@Override public Page getNextPage() {
+ if (fileFormat.ordinal() == FileFormat.ROW_V1.ordinal()) {
+ return getNextPageForRow();
+ } else {
+ return getNextPageForColumnar();
+ }
+ }
+
+ private Page getNextPageForColumnar() {
if (nanoStart == 0) {
nanoStart = System.nanoTime();
}
@@ -111,6 +242,68 @@ class CarbondataPageSource implements ConnectorPageSource {
}
}
+ private Page getNextPageForRow() {
+ if (isFrstPage) {
+ isFrstPage = false;
+ initialReaderForRow();
+ }
+
+ if (nanoStart == 0) {
+ nanoStart = System.nanoTime();
+ }
+ int count = 0;
+ try {
+ Block[] blocks = new Block[columnCount];
+ CarbonColumnVectorImpl[] columns = new CarbonColumnVectorImpl[columnCount];
+ for (int i = 0; i < columnCount; ++i) {
+ columns[i] = CarbonVectorBatch
+ .createDirectStreamReader(batchSize, dataTypes[i], fields[i], dictionaries[i]);
+ }
+
+ while (rowReader.nextKeyValue()) {
+ Object[] values = (Object[]) rowReader.getCurrentValue();
+ for (int index = 0; index < columnCount; index++) {
+ columns[index].putObject(count, values[index]);
+ }
+ count++;
+ if (count == batchSize) {
+ break;
+ }
+ }
+ if (count == 0) {
+ close();
+ return null;
+ } else {
+ for (int index = 0; index < columnCount; index++) {
+ blocks[index] = ((PrestoVectorBlockBuilder) columns[index]).buildBlock();
+ sizeOfData += blocks[index].getSizeInBytes();
+ }
+ }
+ return new Page(count, blocks);
+ } catch (PrestoException e) {
+ closeWithSuppression(e);
+ throw e;
+ } catch (RuntimeException | InterruptedException | IOException e) {
+ closeWithSuppression(e);
+ throw new CarbonDataLoadingException("Exception when creating the Carbon data Block", e);
+ }
+ }
+
+ private void initialReaderForRow() {
+ SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
+ String jobTrackerId = formatter.format(new Date());
+ TaskAttemptID attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0);
+ TaskAttemptContextImpl attemptContext =
+ new TaskAttemptContextImpl(FileFactory.getConfiguration(), attemptId);
+ CarbonMultiBlockSplit carbonInputSplit = CarbonLocalMultiBlockSplit
+ .convertSplit(split.getSchema().getProperty("carbonSplit"));
+ try {
+ rowReader.initialize(carbonInputSplit, attemptContext);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override public long getSystemMemoryUsage() {
return sizeOfData;
}
@@ -122,7 +315,12 @@ class CarbondataPageSource implements ConnectorPageSource {
}
closed = true;
try {
- vectorReader.close();
+ if (vectorReader != null) {
+ vectorReader.close();
+ }
+ if (rowReader != null) {
+ rowReader.close();
+ }
nanoEnd = System.nanoTime();
} catch (Exception e) {
throw Throwables.propagate(e);
@@ -144,6 +342,116 @@ class CarbondataPageSource implements ConnectorPageSource {
}
/**
+ * Create vector reader using the split.
+ */
+ private PrestoCarbonVectorizedRecordReader createReaderForColumnar(HiveSplit carbonSplit,
+ List<? extends ColumnHandle> columns, CarbonDictionaryDecodeReadSupport readSupport,
+ Configuration conf) {
+ QueryModel queryModel = createQueryModel(carbonSplit, columns, conf);
+ if (isDirectVectorFill) {
+ queryModel.setDirectVectorFill(true);
+ queryModel.setPreFetchData(false);
+ }
+ QueryExecutor queryExecutor =
+ QueryExecutorFactory.getQueryExecutor(queryModel, new Configuration());
+ try {
+ CarbonIterator iterator = queryExecutor.execute(queryModel);
+ readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
+ PrestoCarbonVectorizedRecordReader reader =
+ new PrestoCarbonVectorizedRecordReader(queryExecutor, queryModel,
+ (AbstractDetailQueryResultIterator) iterator, readSupport);
+ reader.setTaskId(Long.parseLong(carbonSplit.getSchema().getProperty("index")));
+ return reader;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create reader ", e);
+ }
+ }
+
+ /**
+ * @param carbondataSplit
+ * @param columns
+ * @return
+ */
+ private QueryModel createQueryModel(HiveSplit carbondataSplit,
+ List<? extends ColumnHandle> columns, Configuration conf) {
+
+ try {
+ CarbonProjection carbonProjection = getCarbonProjection(columns);
+ conf.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+ String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+ CarbonTableInputFormat
+ .setTransactionalTable(conf, carbonTable.getTableInfo().isTransactionalTable());
+ CarbonTableInputFormat.setTableInfo(conf, carbonTable.getTableInfo());
+ conf.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+ conf.set("query.id", queryId);
+ JobConf jobConf = new JobConf(conf);
+ CarbonTableInputFormat carbonTableInputFormat = createInputFormat(jobConf, carbonTable,
+ PrestoFilterUtil.parseFilterExpression(carbondataSplit.getEffectivePredicate()),
+ carbonProjection);
+ TaskAttemptContextImpl hadoopAttemptContext =
+ new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
+ CarbonMultiBlockSplit carbonInputSplit = CarbonLocalMultiBlockSplit
+ .convertSplit(carbondataSplit.getSchema().getProperty("carbonSplit"));
+ QueryModel queryModel =
+ carbonTableInputFormat.createQueryModel(carbonInputSplit, hadoopAttemptContext);
+ queryModel.setQueryId(queryId);
+ queryModel.setVectorReader(true);
+ queryModel.setStatisticsRecorder(
+ CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId()));
+
+ List<TableBlockInfo> tableBlockInfoList =
+ CarbonInputSplit.createBlocks(carbonInputSplit.getAllSplits());
+ queryModel.setTableBlockInfos(tableBlockInfoList);
+ return queryModel;
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to get the Query Model ", e);
+ }
+ }
+
+ /**
+ * @param conf
+ * @param carbonTable
+ * @param filterExpression
+ * @param projection
+ * @return
+ */
+ private CarbonTableInputFormat<Object> createInputFormat(Configuration conf,
+ CarbonTable carbonTable, Expression filterExpression, CarbonProjection projection) {
+
+ AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
+ CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
+ try {
+ CarbonTableInputFormat
+ .setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath()));
+ CarbonTableInputFormat
+ .setDatabaseName(conf, identifier.getCarbonTableIdentifier().getDatabaseName());
+ CarbonTableInputFormat
+ .setTableName(conf, identifier.getCarbonTableIdentifier().getTableName());
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to create the CarbonTableInputFormat", e);
+ }
+ CarbonTableInputFormat.setFilterPredicates(conf, filterExpression);
+ CarbonTableInputFormat.setColumnProjection(conf, projection);
+
+ return format;
+ }
+
+ /**
+ * @param columns
+ * @return
+ */
+ private CarbonProjection getCarbonProjection(List<? extends ColumnHandle> columns) {
+ CarbonProjection carbonProjection = new CarbonProjection();
+ // Convert all columns handles
+ ImmutableList.Builder<HiveColumnHandle> handles = ImmutableList.builder();
+ for (ColumnHandle handle : columns) {
+ handles.add(Types.checkType(handle, HiveColumnHandle.class, "handle"));
+ carbonProjection.addColumn(((HiveColumnHandle) handle).getName());
+ }
+ return carbonProjection;
+ }
+
+ /**
* Lazy Block Implementation for the Carbondata
*/
private final class CarbondataBlockLoader implements LazyBlockLoader<LazyBlock> {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
index c81e0c3..be088e1 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -17,27 +17,12 @@
package org.apache.carbondata.presto;
-import java.io.IOException;
import java.util.List;
import java.util.Set;
import static java.util.Objects.requireNonNull;
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.executor.QueryExecutor;
-import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.CarbonProjection;
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-import org.apache.carbondata.presto.impl.CarbonLocalMultiBlockSplit;
import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
import org.apache.carbondata.presto.impl.CarbonTableReader;
@@ -45,7 +30,6 @@ import static org.apache.carbondata.presto.Types.checkType;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
-import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.hive.HivePageSourceFactory;
import com.facebook.presto.hive.HivePageSourceProvider;
import com.facebook.presto.hive.HiveRecordCursorProvider;
@@ -57,14 +41,9 @@ import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.type.TypeManager;
-import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TaskAttemptContextImpl;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskType;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -103,122 +82,11 @@ public class CarbondataPageSourceProvider extends HivePageSourceProvider {
new HdfsEnvironment.HdfsContext(session, carbonSplit.getDatabase(), carbonSplit.getTable()),
new Path(carbonSplit.getSchema().getProperty("tablePath")));
configuration = carbonTableReader.updateS3Properties(configuration);
- CarbonDictionaryDecodeReadSupport readSupport = new CarbonDictionaryDecodeReadSupport();
- PrestoCarbonVectorizedRecordReader carbonRecordReader =
- createReader(carbonSplit, columns, readSupport, configuration);
- return new CarbondataPageSource(carbonRecordReader, columns);
- }
-
- /**
- * Create vector reader using the split.
- */
- private PrestoCarbonVectorizedRecordReader createReader(HiveSplit carbonSplit,
- List<? extends ColumnHandle> columns, CarbonDictionaryDecodeReadSupport readSupport,
- Configuration conf) {
- QueryModel queryModel = createQueryModel(carbonSplit, columns, conf);
- if (carbonTableReader.config.getPushRowFilter() == null ||
- carbonTableReader.config.getPushRowFilter().equalsIgnoreCase("false")) {
- queryModel.setDirectVectorFill(true);
- queryModel.setPreFetchData(false);
- }
- QueryExecutor queryExecutor =
- QueryExecutorFactory.getQueryExecutor(queryModel, new Configuration());
- try {
- CarbonIterator iterator = queryExecutor.execute(queryModel);
- readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
- PrestoCarbonVectorizedRecordReader reader =
- new PrestoCarbonVectorizedRecordReader(queryExecutor, queryModel,
- (AbstractDetailQueryResultIterator) iterator, readSupport);
- reader.setTaskId(Long.parseLong(carbonSplit.getSchema().getProperty("index")));
- return reader;
- } catch (Exception e) {
- throw new RuntimeException("Failed to create reader ", e);
- }
- }
-
- /**
- * @param carbondataSplit
- * @param columns
- * @return
- */
- private QueryModel createQueryModel(HiveSplit carbondataSplit,
- List<? extends ColumnHandle> columns, Configuration conf) {
-
- try {
- CarbonProjection carbonProjection = getCarbonProjection(columns);
- CarbonTable carbonTable = getCarbonTable(carbondataSplit, conf);
- conf.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
- String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
- CarbonTableInputFormat
- .setTransactionalTable(conf, carbonTable.getTableInfo().isTransactionalTable());
- CarbonTableInputFormat.setTableInfo(conf, carbonTable.getTableInfo());
- conf.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
- conf.set("query.id", queryId);
- JobConf jobConf = new JobConf(conf);
- CarbonTableInputFormat carbonTableInputFormat = createInputFormat(jobConf, carbonTable,
- PrestoFilterUtil.parseFilterExpression(carbondataSplit.getEffectivePredicate()),
- carbonProjection);
- TaskAttemptContextImpl hadoopAttemptContext =
- new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
- CarbonMultiBlockSplit carbonInputSplit = CarbonLocalMultiBlockSplit
- .convertSplit(carbondataSplit.getSchema().getProperty("carbonSplit"));
- QueryModel queryModel =
- carbonTableInputFormat.createQueryModel(carbonInputSplit, hadoopAttemptContext);
- queryModel.setQueryId(queryId);
- queryModel.setVectorReader(true);
- queryModel.setStatisticsRecorder(
- CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId()));
-
- List<TableBlockInfo> tableBlockInfoList =
- CarbonInputSplit.createBlocks(carbonInputSplit.getAllSplits());
- queryModel.setTableBlockInfos(tableBlockInfoList);
- return queryModel;
- } catch (IOException e) {
- throw new RuntimeException("Unable to get the Query Model ", e);
- }
- }
-
- /**
- * @param conf
- * @param carbonTable
- * @param filterExpression
- * @param projection
- * @return
- */
- private CarbonTableInputFormat<Object> createInputFormat(Configuration conf,
- CarbonTable carbonTable, Expression filterExpression, CarbonProjection projection) {
-
- AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
- CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
- try {
- CarbonTableInputFormat
- .setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath()));
- CarbonTableInputFormat
- .setDatabaseName(conf, identifier.getCarbonTableIdentifier().getDatabaseName());
- CarbonTableInputFormat
- .setTableName(conf, identifier.getCarbonTableIdentifier().getTableName());
- } catch (Exception e) {
- throw new RuntimeException("Unable to create the CarbonTableInputFormat", e);
- }
- CarbonTableInputFormat.setFilterPredicates(conf, filterExpression);
- CarbonTableInputFormat.setColumnProjection(conf, projection);
-
- return format;
- }
-
- /**
- * @param columns
- * @return
- */
- private CarbonProjection getCarbonProjection(List<? extends ColumnHandle> columns) {
- CarbonProjection carbonProjection = new CarbonProjection();
- // Convert all columns handles
- ImmutableList.Builder<HiveColumnHandle> handles = ImmutableList.builder();
- for (ColumnHandle handle : columns) {
- handles.add(checkType(handle, HiveColumnHandle.class, "handle"));
- carbonProjection.addColumn(((HiveColumnHandle) handle).getName());
- }
- return carbonProjection;
+ CarbonTable carbonTable = getCarbonTable(carbonSplit, configuration);
+ boolean isDirectVectorFill = carbonTableReader.config.getPushRowFilter() == null ||
+ carbonTableReader.config.getPushRowFilter().equalsIgnoreCase("false");
+ return new CarbondataPageSource(
+ carbonTable, queryId, carbonSplit, columns, configuration, isDirectVectorFill);
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
index 718cb33..f4f50a5 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.statusmanager.FileFormat;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -44,9 +45,9 @@ public class CarbonLocalInputSplit {
private short version;
private String[] deleteDeltaFiles;
private String blockletId;
-
-
private String detailInfo;
+ private int fileFormatOrdinal;
+ private FileFormat fileFormat;
/**
* Number of BlockLets in a block
@@ -93,6 +94,14 @@ public class CarbonLocalInputSplit {
return blockletId;
}
+ @JsonProperty public int getFileFormatOrdinal() {
+ return fileFormatOrdinal;
+ }
+
+ public FileFormat getFileFormat() {
+ return fileFormat;
+ }
+
public void setDetailInfo(BlockletDetailInfo blockletDetailInfo) {
Gson gson = new Gson();
detailInfo = gson.toJson(blockletDetailInfo);
@@ -107,7 +116,8 @@ public class CarbonLocalInputSplit {
@JsonProperty("version") short version,
@JsonProperty("deleteDeltaFiles") String[] deleteDeltaFiles,
@JsonProperty("blockletId") String blockletId,
- @JsonProperty("detailInfo") String detailInfo
+ @JsonProperty("detailInfo") String detailInfo,
+ @JsonProperty("fileFormatOrdinal") int fileFormatOrdinal
) {
this.path = path;
this.start = start;
@@ -120,7 +130,8 @@ public class CarbonLocalInputSplit {
this.deleteDeltaFiles = deleteDeltaFiles;
this.blockletId = blockletId;
this.detailInfo = detailInfo;
-
+ this.fileFormatOrdinal = fileFormatOrdinal;
+ this.fileFormat = FileFormat.getByOrdinal(fileFormatOrdinal);
}
public static CarbonInputSplit convertSplit(CarbonLocalInputSplit carbonLocalInputSplit) {
@@ -132,18 +143,21 @@ public class CarbonLocalInputSplit {
carbonLocalInputSplit.getNumberOfBlocklets(),
ColumnarFormatVersion.valueOf(carbonLocalInputSplit.getVersion()),
carbonLocalInputSplit.getDeleteDeltaFiles());
- Gson gson = new Gson();
- BlockletDetailInfo blockletDetailInfo =
- gson.fromJson(carbonLocalInputSplit.detailInfo, BlockletDetailInfo.class);
- if (null == blockletDetailInfo) {
- throw new RuntimeException("Could not read blocklet details");
- }
- try {
- blockletDetailInfo.readColumnSchema(blockletDetailInfo.getColumnSchemaBinary());
- } catch (IOException e) {
- throw new RuntimeException(e);
+ inputSplit.setFormat(carbonLocalInputSplit.getFileFormat());
+ if (FileFormat.COLUMNAR_V3.ordinal() == inputSplit.getFileFormat().ordinal()) {
+ Gson gson = new Gson();
+ BlockletDetailInfo blockletDetailInfo =
+ gson.fromJson(carbonLocalInputSplit.detailInfo, BlockletDetailInfo.class);
+ if (null == blockletDetailInfo) {
+ throw new RuntimeException("Could not read blocklet details");
+ }
+ try {
+ blockletDetailInfo.readColumnSchema(blockletDetailInfo.getColumnSchemaBinary());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ inputSplit.setDetailInfo(blockletDetailInfo);
}
- inputSplit.setDetailInfo(blockletDetailInfo);
return inputSplit;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java
index fd232ed..6702c5f 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java
@@ -70,6 +70,9 @@ public class CarbonLocalMultiBlockSplit {
@JsonProperty("locations") String[] locations) {
this.splitList = splitList;
this.locations = locations;
+ if (!splitList.isEmpty()) {
+ this.fileFormat = splitList.get(0).getFileFormat();
+ }
}
public String getJsonString() {
@@ -87,6 +90,7 @@ public class CarbonLocalMultiBlockSplit {
CarbonMultiBlockSplit carbonMultiBlockSplit =
new CarbonMultiBlockSplit(carbonInputSplitList, carbonLocalMultiBlockSplit.getLocations());
+ carbonMultiBlockSplit.setFileFormat(carbonLocalMultiBlockSplit.getFileFormat());
return carbonMultiBlockSplit;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 5ede272..1121a37 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -277,7 +277,8 @@ public class CarbonTableReader {
carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
- gson.toJson(carbonInputSplit.getDetailInfo())));
+ gson.toJson(carbonInputSplit.getDetailInfo()),
+ carbonInputSplit.getFileFormat().ordinal()));
}
// Use block distribution
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
index 0eee58a..37eb111 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
@@ -92,4 +92,15 @@ public class BooleanStreamReader extends CarbonColumnVectorImpl
builder = type.createBlockBuilder(null, batchSize);
}
+ @Override public void putObject(int rowId, Object value) {
+ if (value == null) {
+ putNull(rowId);
+ } else {
+ if (dictionary == null) {
+ putBoolean(rowId, (boolean) value);
+ } else {
+ putInt(rowId, (int) value);
+ }
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
index ddc855a..da8d913 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
@@ -167,4 +167,16 @@ public class DecimalSliceStreamReader extends CarbonColumnVectorImpl
"Read decimal precision larger than column precision");
return decimal;
}
+
+ @Override public void putObject(int rowId, Object value) {
+ if (value == null) {
+ putNull(rowId);
+ } else {
+ if (dictionary == null) {
+ decimalBlockWriter((BigDecimal) value);
+ } else {
+ putInt(rowId, (int) value);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
index ed9a202..8c3a73f 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
@@ -89,4 +89,16 @@ public class DoubleStreamReader extends CarbonColumnVectorImpl implements Presto
@Override public void reset() {
builder = type.createBlockBuilder(null, batchSize);
}
+
+ @Override public void putObject(int rowId, Object value) {
+ if (value == null) {
+ putNull(rowId);
+ } else {
+ if (dictionary == null) {
+ putDouble(rowId, (double) value);
+ } else {
+ putInt(rowId, (int) value);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
index 52ddbb2..3b7e0bf 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
@@ -88,4 +88,11 @@ public class IntegerStreamReader extends CarbonColumnVectorImpl
builder = type.createBlockBuilder(null, batchSize);
}
+ @Override public void putObject(int rowId, Object value) {
+ if (value == null) {
+ putNull(rowId);
+ } else {
+ putInt(rowId, (int) value);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
index 81fdf88..abaf0a0 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
@@ -86,4 +86,16 @@ public class LongStreamReader extends CarbonColumnVectorImpl implements PrestoVe
builder.appendNull();
}
}
+
+ @Override public void putObject(int rowId, Object value) {
+ if (value == null) {
+ putNull(rowId);
+ } else {
+ if (dictionary == null) {
+ putLong(rowId, (long) value);
+ } else {
+ putInt(rowId, (int) value);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
index 7411513..32498e0 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
@@ -86,4 +86,16 @@ public class ShortStreamReader extends CarbonColumnVectorImpl implements PrestoV
@Override public void reset() {
builder = type.createBlockBuilder(null, batchSize);
}
+
+ @Override public void putObject(int rowId, Object value) {
+ if (value == null) {
+ putNull(rowId);
+ } else {
+ if (dictionary == null) {
+ putShort(rowId, (short) value);
+ } else {
+ putInt(rowId, (int) value);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
index 1e4688f..3b3c78c 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import com.facebook.presto.spi.block.Block;
@@ -156,4 +157,16 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV
((String) data).getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))));
}
}
+
+ @Override public void putObject(int rowId, Object value) {
+ if (value == null) {
+ putNull(rowId);
+ } else {
+ if (dictionaryBlock == null) {
+ putByteArray(rowId, ByteUtil.toBytes((String) value));
+ } else {
+ putInt(rowId, (int) value);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
index 1052a74..2b7f0c0 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
@@ -87,4 +87,16 @@ public class TimestampStreamReader extends CarbonColumnVectorImpl
@Override public void reset() {
builder = type.createBlockBuilder(null, batchSize);
}
+
+ @Override public void putObject(int rowId, Object value) {
+ if (value == null) {
+ putNull(rowId);
+ } else {
+ if (dictionary == null) {
+ putLong(rowId, (Long) value);
+ } else {
+ putInt(rowId, (int) value);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index a32a8de..0ab6a3a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -61,11 +61,11 @@ import org.apache.carbondata.hadoop._
import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat}
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
+import org.apache.carbondata.hadoop.stream.CarbonStreamInputFormat
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.InitInputMetrics
import org.apache.carbondata.spark.util.Util
-import org.apache.carbondata.streaming.CarbonStreamInputFormat
/**
* This RDD is used to perform query on CarbonData file. Before sending tasks to scan