You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/10 05:16:28 UTC
[07/15] carbondata git commit: [CARBONDATA-1572][Streaming] Support
streaming ingest and query
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
new file mode 100644
index 0000000..b10bc8b
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.scan.complextypes.ArrayQueryType;
+import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType;
+import org.apache.carbondata.core.scan.complextypes.StructQueryType;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * Stream input format
+ */
+public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> {
+
+ public static final String READ_BUFFER_SIZE = "carbon.stream.read.buffer.size";
+ public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
+
+ @Override public RecordReader<Void, Object> createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ return new CarbonStreamRecordReader();
+ }
+
+ public static GenericQueryType[] getComplexDimensions(CarbonTable carbontable,
+ CarbonColumn[] carbonColumns, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache)
+ throws IOException {
+ GenericQueryType[] queryTypes = new GenericQueryType[carbonColumns.length];
+ for (int i = 0; i < carbonColumns.length; i++) {
+ if (carbonColumns[i].isComplex()) {
+ if (carbonColumns[i].getDataType() == DataTypes.ARRAY) {
+ queryTypes[i] = new ArrayQueryType(carbonColumns[i].getColName(),
+ carbonColumns[i].getColName(), i);
+ } else if (carbonColumns[i].getDataType() == DataTypes.STRUCT) {
+ queryTypes[i] = new StructQueryType(carbonColumns[i].getColName(),
+ carbonColumns[i].getColName(), i);
+ } else {
+ throw new UnsupportedOperationException(
+ carbonColumns[i].getDataType().getName() + " is not supported");
+ }
+
+ fillChildren(carbontable, queryTypes[i], (CarbonDimension) carbonColumns[i], i, cache);
+ }
+ }
+
+ return queryTypes;
+ }
+
+ private static void fillChildren(CarbonTable carbontable, GenericQueryType parentQueryType,
+ CarbonDimension dimension, int parentBlockIndex,
+ Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache) throws IOException {
+ for (int i = 0; i < dimension.getNumberOfChild(); i++) {
+ CarbonDimension child = dimension.getListOfChildDimensions().get(i);
+ DataType dataType = child.getDataType();
+ GenericQueryType queryType = null;
+ if (dataType == DataTypes.ARRAY) {
+ queryType =
+ new ArrayQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex);
+
+ } else if (dataType == DataTypes.STRUCT) {
+ queryType =
+ new StructQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex);
+ parentQueryType.addChildren(queryType);
+ } else {
+ boolean isDirectDictionary =
+ CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DIRECT_DICTIONARY);
+ DictionaryColumnUniqueIdentifier dictionarIdentifier =
+ new DictionaryColumnUniqueIdentifier(carbontable.getCarbonTableIdentifier(),
+ child.getColumnIdentifier(), child.getDataType(),
+ CarbonStorePath.getCarbonTablePath(carbontable.getAbsoluteTableIdentifier()));
+
+ queryType =
+ new PrimitiveQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex,
+ child.getDataType(), 4, cache.get(dictionarIdentifier),
+ isDirectDictionary);
+ }
+ parentQueryType.addChildren(queryType);
+ if (child.getNumberOfChild() > 0) {
+ fillChildren(carbontable, queryType, child, parentBlockIndex, cache);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
new file mode 100644
index 0000000..1c21504
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * Stream output format
+ */
+public class CarbonStreamOutputFormat extends FileOutputFormat<Void, Object> {
+
+ static final byte[] CARBON_SYNC_MARKER =
+ "@carbondata_sync".getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+
+ public static final String CARBON_ENCODER_ROW_BUFFER_SIZE = "carbon.stream.row.buffer.size";
+
+ public static final int CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT = 1024;
+
+ public static final String CARBON_STREAM_BLOCKLET_ROW_NUMS = "carbon.stream.blocklet.row.nums";
+
+ public static final int CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT = 32000;
+
+ public static final String CARBON_STREAM_CACHE_SIZE = "carbon.stream.cache.size";
+
+ public static final int CARBON_STREAM_CACHE_SIZE_DEFAULT = 32 * 1024 * 1024;
+
+ private static final String LOAD_Model = "mapreduce.output.carbon.load.model";
+
+ @Override public RecordWriter<Void, Object> getRecordWriter(TaskAttemptContext job)
+ throws IOException, InterruptedException {
+ return new CarbonStreamRecordWriter(job);
+ }
+
+ public static void setCarbonLoadModel(Configuration hadoopConf, CarbonLoadModel carbonLoadModel)
+ throws IOException {
+ if (carbonLoadModel != null) {
+ hadoopConf.set(LOAD_Model, ObjectSerializationUtil.convertObjectToString(carbonLoadModel));
+ }
+ }
+
+ public static CarbonLoadModel getCarbonLoadModel(Configuration hadoopConf) throws IOException {
+ String value = hadoopConf.get(LOAD_Model);
+ if (value == null) {
+ return null;
+ } else {
+ return (CarbonLoadModel) ObjectSerializationUtil.convertStringToObject(value);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
new file mode 100644
index 0000000..1ff0fa7
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
@@ -0,0 +1,676 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonHeaderReader;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.intf.RowImpl;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.format.BlockletHeader;
+import org.apache.carbondata.format.FileHeader;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.hadoop.util.CarbonTypeUtil;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.types.CalendarIntervalType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Stream record reader
+ */
+public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
+ // vector reader
+ private boolean isVectorReader;
+
+ // metadata
+ private CarbonTable carbonTable;
+ private CarbonColumn[] storageColumns;
+ private boolean[] isRequired;
+ private int[] measureDataTypes;
+ private int dimensionCount;
+ private int measureCount;
+
+ // input
+ private FileSplit fileSplit;
+ private Configuration hadoopConf;
+ private StreamBlockletReader input;
+ private boolean isFirstRow = true;
+ private QueryModel model;
+
+ // decode data
+ private BitSet allNonNull;
+ private boolean[] isNoDictColumn;
+ private DirectDictionaryGenerator[] directDictionaryGenerators;
+ private CacheProvider cacheProvider;
+ private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
+ private GenericQueryType[] queryTypes;
+
+ // vectorized reader
+ private StructType outputSchema;
+ private ColumnarBatch columnarBatch;
+ private boolean isFinished = false;
+
+ // filter
+ private FilterExecuter filter;
+ private boolean[] isFilterRequired;
+ private Object[] filterValues;
+ private RowIntf filterRow;
+ private int[] filterMap;
+
+ // output
+ private CarbonColumn[] projection;
+ private boolean[] isProjectionRequired;
+ private int[] projectionMap;
+ private Object[] outputValues;
+ private InternalRow outputRow;
+
+ // empty project, null filter
+ private boolean skipScanData;
+
+ @Override public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ // input
+ if (split instanceof CarbonInputSplit) {
+ fileSplit = (CarbonInputSplit) split;
+ } else if (split instanceof CarbonMultiBlockSplit) {
+ fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0);
+ } else {
+ fileSplit = (FileSplit) split;
+ }
+
+ // metadata
+ hadoopConf = context.getConfiguration();
+ if (model == null) {
+ CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
+ model = format.getQueryModel(split, context);
+ }
+ carbonTable = model.getTable();
+ List<CarbonDimension> dimensions =
+ carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ dimensionCount = dimensions.size();
+ List<CarbonMeasure> measures =
+ carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+ measureCount = measures.size();
+ List<CarbonColumn> carbonColumnList =
+ carbonTable.getStreamStorageOrderColumn(carbonTable.getFactTableName());
+ storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]);
+ isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns);
+ directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length];
+ for (int i = 0; i < storageColumns.length; i++) {
+ if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(storageColumns[i].getDataType());
+ }
+ }
+ measureDataTypes = new int[measureCount];
+ for (int i = 0; i < measureCount; i++) {
+ measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType().getId();
+ }
+
+ // decode data
+ allNonNull = new BitSet(storageColumns.length);
+ projection = model.getProjectionColumns();
+
+ isRequired = new boolean[storageColumns.length];
+ boolean[] isFiltlerDimensions = model.getIsFilterDimensions();
+ boolean[] isFiltlerMeasures = model.getIsFilterMeasures();
+ isFilterRequired = new boolean[storageColumns.length];
+ filterMap = new int[storageColumns.length];
+ for (int i = 0; i < storageColumns.length; i++) {
+ if (storageColumns[i].isDimension()) {
+ if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) {
+ isRequired[i] = true;
+ isFilterRequired[i] = true;
+ filterMap[i] = storageColumns[i].getOrdinal();
+ }
+ } else {
+ if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) {
+ isRequired[i] = true;
+ isFilterRequired[i] = true;
+ filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal();
+ }
+ }
+ }
+
+ isProjectionRequired = new boolean[storageColumns.length];
+ projectionMap = new int[storageColumns.length];
+ for (int i = 0; i < storageColumns.length; i++) {
+ for (int j = 0; j < projection.length; j++) {
+ if (storageColumns[i].getColName().equals(projection[j].getColName())) {
+ isRequired[i] = true;
+ isProjectionRequired[i] = true;
+ projectionMap[i] = j;
+ break;
+ }
+ }
+ }
+
+ // initialize filter
+ if (null != model.getFilterExpressionResolverTree()) {
+ initializeFilter();
+ } else if (projection.length == 0) {
+ skipScanData = true;
+ }
+
+ }
+
+ private void initializeFilter() {
+
+ List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
+ .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getFactTableName()),
+ carbonTable.getMeasureByTableName(carbonTable.getFactTableName()));
+ int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
+ for (int i = 0; i < dimLensWithComplex.length; i++) {
+ dimLensWithComplex[i] = Integer.MAX_VALUE;
+ }
+
+ int[] dictionaryColumnCardinality =
+ CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
+ SegmentProperties segmentProperties =
+ new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality);
+ Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
+
+ FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree();
+ filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties,
+ complexDimensionInfoMap);
+ // for row filter, we need update column index
+ FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
+ carbonTable.getDimensionOrdinalMax());
+
+ }
+
+ public void setQueryModel(QueryModel model) {
+ this.model = model;
+ }
+
+ private byte[] getSyncMarker(String filePath) throws IOException {
+ CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath);
+ FileHeader header = headerReader.readHeader();
+ return header.getSync_marker();
+ }
+
+ private void initializeAtFirstRow() throws IOException {
+ filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount];
+ filterRow = new RowImpl();
+ filterRow.setValues(filterValues);
+
+ outputValues = new Object[projection.length];
+ outputRow = new GenericInternalRow(outputValues);
+
+ Path file = fileSplit.getPath();
+
+ byte[] syncMarker = getSyncMarker(file.toUri().getPath());
+
+ FileSystem fs = file.getFileSystem(hadoopConf);
+
+ int bufferSize = Integer.parseInt(hadoopConf.get(CarbonStreamInputFormat.READ_BUFFER_SIZE,
+ CarbonStreamInputFormat.READ_BUFFER_SIZE_DEFAULT));
+
+ FSDataInputStream fileIn = fs.open(file, bufferSize);
+ fileIn.seek(fileSplit.getStart());
+ input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(),
+ fileSplit.getStart() == 0);
+
+ cacheProvider = CacheProvider.getInstance();
+ cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, carbonTable.getStorePath());
+ queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache);
+
+ outputSchema = new StructType(CarbonTypeUtil.convertCarbonSchemaToSparkSchema(projection));
+ }
+
+ @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (isFirstRow) {
+ isFirstRow = false;
+ initializeAtFirstRow();
+ }
+ if (isFinished) {
+ return false;
+ }
+
+ if (isVectorReader) {
+ return nextColumnarBatch();
+ }
+
+ return nextRow();
+ }
+
+ /**
+ * for vector reader, check next columnar batch
+ */
+ private boolean nextColumnarBatch() throws IOException {
+ boolean hasNext;
+ boolean scanMore = false;
+ do {
+ // move to the next blocklet
+ hasNext = input.nextBlocklet();
+ if (hasNext) {
+ // read blocklet header
+ BlockletHeader header = input.readBlockletHeader();
+ if (isScanRequired(header)) {
+ scanMore = !scanBlockletAndFillVector(header);
+ } else {
+ input.skipBlockletData(true);
+ scanMore = true;
+ }
+ } else {
+ isFinished = true;
+ scanMore = false;
+ }
+ } while (scanMore);
+ return hasNext;
+ }
+
+ /**
+ * check next Row
+ */
+ private boolean nextRow() throws IOException {
+ // read row one by one
+ try {
+ boolean hasNext;
+ boolean scanMore = false;
+ do {
+ hasNext = input.hasNext();
+ if (hasNext) {
+ if (skipScanData) {
+ input.nextRow();
+ scanMore = false;
+ } else {
+ readRowFromStream();
+ if (null != filter) {
+ scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax());
+ } else {
+ scanMore = false;
+ }
+ }
+ } else {
+ if (input.nextBlocklet()) {
+ BlockletHeader header = input.readBlockletHeader();
+ if (isScanRequired(header)) {
+ if (skipScanData) {
+ input.skipBlockletData(false);
+ } else {
+ input.readBlockletData(header);
+ }
+ } else {
+ input.skipBlockletData(true);
+ }
+ scanMore = true;
+ } else {
+ isFinished = true;
+ scanMore = false;
+ }
+ }
+ } while (scanMore);
+ return hasNext;
+ } catch (FilterUnsupportedException e) {
+ throw new IOException("Failed to filter row in detail reader", e);
+ }
+ }
+
+ @Override public Void getCurrentKey() throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override public Object getCurrentValue() throws IOException, InterruptedException {
+ if (isVectorReader) {
+ return columnarBatch;
+ }
+ return outputRow;
+ }
+
+ private boolean isScanRequired(BlockletHeader header) {
+ // TODO require to implement min-max index
+ if (null == filter) {
+ return true;
+ }
+ return true;
+ }
+
+ private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException {
+ // if filter is null and output projection is empty, use the row number of blocklet header
+ if (skipScanData) {
+ int rowNums = header.getBlocklet_info().getNum_rows();
+ columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, rowNums);
+ columnarBatch.setNumRows(rowNums);
+ input.skipBlockletData(true);
+ return rowNums > 0;
+ }
+
+ input.readBlockletData(header);
+ columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, input.getRowNums());
+ int rowNum = 0;
+ if (null == filter) {
+ while (input.hasNext()) {
+ readRowFromStream();
+ putRowToColumnBatch(rowNum++);
+ }
+ } else {
+ try {
+ while (input.hasNext()) {
+ readRowFromStream();
+ if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) {
+ putRowToColumnBatch(rowNum++);
+ }
+ }
+ } catch (FilterUnsupportedException e) {
+ throw new IOException("Failed to filter row in vector reader", e);
+ }
+ }
+ columnarBatch.setNumRows(rowNum);
+ return rowNum > 0;
+ }
+
+ private void readRowFromStream() {
+ input.nextRow();
+ short nullLen = input.readShort();
+ BitSet nullBitSet = allNonNull;
+ if (nullLen > 0) {
+ nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
+ }
+ int colCount = 0;
+ // primitive type dimension
+ for (; colCount < isNoDictColumn.length; colCount++) {
+ if (nullBitSet.get(colCount)) {
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = null;
+ }
+ } else {
+ if (isNoDictColumn[colCount]) {
+ int v = input.readShort();
+ if (isRequired[colCount]) {
+ byte[] b = input.readBytes(v);
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = b;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] =
+ DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(b,
+ storageColumns[colCount].getDataType());
+ }
+ } else {
+ input.skipBytes(v);
+ }
+ } else if (null != directDictionaryGenerators[colCount]) {
+ if (isRequired[colCount]) {
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = input.copy(4);
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] =
+ directDictionaryGenerators[colCount].getValueFromSurrogate(input.readInt());
+ } else {
+ input.skipBytes(4);
+ }
+ } else {
+ input.skipBytes(4);
+ }
+ } else {
+ if (isRequired[colCount]) {
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = input.copy(4);
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = input.readInt();
+ } else {
+ input.skipBytes(4);
+ }
+ } else {
+ input.skipBytes(4);
+ }
+ }
+ }
+ }
+ // complex type dimension
+ for (; colCount < dimensionCount; colCount++) {
+ if (nullBitSet.get(colCount)) {
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = null;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = null;
+ }
+ } else {
+ short v = input.readShort();
+ if (isRequired[colCount]) {
+ byte[] b = input.readBytes(v);
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = b;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = queryTypes[colCount]
+ .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(b));
+ }
+ } else {
+ input.skipBytes(v);
+ }
+ }
+ }
+ // measure
+ int dataType;
+ for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
+ if (nullBitSet.get(colCount)) {
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = null;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = null;
+ }
+ } else {
+ dataType = measureDataTypes[msrCount];
+ if (dataType == DataTypes.BOOLEAN_TYPE_ID) {
+ if (isRequired[colCount]) {
+ boolean v = input.readBoolean();
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = v;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = v;
+ }
+ } else {
+ input.skipBytes(1);
+ }
+ } else if (dataType == DataTypes.SHORT_TYPE_ID) {
+ if (isRequired[colCount]) {
+ short v = input.readShort();
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = v;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = v;
+ }
+ } else {
+ input.skipBytes(2);
+ }
+ } else if (dataType == DataTypes.INT_TYPE_ID) {
+ if (isRequired[colCount]) {
+ int v = input.readInt();
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = v;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = v;
+ }
+ } else {
+ input.skipBytes(4);
+ }
+ } else if (dataType == DataTypes.LONG_TYPE_ID) {
+ if (isRequired[colCount]) {
+ long v = input.readLong();
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = v;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = v;
+ }
+ } else {
+ input.skipBytes(8);
+ }
+ } else if (dataType == DataTypes.DOUBLE_TYPE_ID) {
+ if (isRequired[colCount]) {
+ double v = input.readDouble();
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = v;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = v;
+ }
+ } else {
+ input.skipBytes(8);
+ }
+ } else if (dataType == DataTypes.DECIMAL_TYPE_ID) {
+ int len = input.readShort();
+ if (isRequired[colCount]) {
+ BigDecimal v = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = v;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = v;
+ }
+ } else {
+ input.skipBytes(len);
+ }
+ }
+ }
+ }
+ }
+
+ private void putRowToColumnBatch(int rowId) {
+ for (int i = 0; i < projection.length; i++) {
+ Object value = outputValues[i];
+ ColumnVector col = columnarBatch.column(i);
+ DataType t = col.dataType();
+ if (null == value) {
+ col.putNull(rowId);
+ } else {
+ if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
+ col.putBoolean(rowId, (boolean)value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
+ col.putByte(rowId, (byte) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
+ col.putShort(rowId, (short) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
+ col.putInt(rowId, (int) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
+ col.putLong(rowId, (long) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
+ col.putFloat(rowId, (float) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
+ col.putDouble(rowId, (double) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
+ UTF8String v = (UTF8String) value;
+ col.putByteArray(rowId, v.getBytes());
+ } else if (t instanceof DecimalType) {
+ DecimalType dt = (DecimalType)t;
+ Decimal d = (Decimal) value;
+ if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+ col.putInt(rowId, (int)d.toUnscaledLong());
+ } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+ col.putLong(rowId, d.toUnscaledLong());
+ } else {
+ final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+ byte[] bytes = integer.toByteArray();
+ col.putByteArray(rowId, bytes, 0, bytes.length);
+ }
+ } else if (t instanceof CalendarIntervalType) {
+ CalendarInterval c = (CalendarInterval) value;
+ col.getChildColumn(0).putInt(rowId, c.months);
+ col.getChildColumn(1).putLong(rowId, c.microseconds);
+ } else if (t instanceof DateType) {
+ col.putInt(rowId, (int) value);
+ } else if (t instanceof TimestampType) {
+ col.putLong(rowId, (long) value);
+ }
+ }
+ }
+ }
+
+ @Override public float getProgress() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ public void setVectorReader(boolean isVectorReader) {
+ this.isVectorReader = isVectorReader;
+ }
+
+ @Override public void close() throws IOException {
+ if (null != input) {
+ input.close();
+ }
+ if (null != columnarBatch) {
+ columnarBatch.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
new file mode 100644
index 0000000..8d7a2e3
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.FileHeader;
+import org.apache.carbondata.processing.loading.BadRecordsLogger;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.DataLoadProcessBuilder;
+import org.apache.carbondata.processing.loading.converter.RowConverter;
+import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.parser.RowParser;
+import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl;
+import org.apache.carbondata.processing.loading.steps.DataConverterProcessorStepImpl;
+import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskID;
+
+/**
+ * Stream record writer
+ */
+public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonStreamRecordWriter.class.getName());
+
+ // basic info
+ private Configuration hadoopConf;
+ private CarbonDataLoadConfiguration configuration;
+ private CarbonTable carbonTable;
+ private int maxRowNums;
+ private int maxCacheSize;
+
+ // parser and converter
+ private RowParser rowParser;
+ private RowConverter converter;
+ private CarbonRow currentRow = new CarbonRow(null);
+
+ // encoder
+ private DataField[] dataFields;
+ private BitSet nullBitSet;
+ private boolean[] isNoDictionaryDimensionColumn;
+ private int dimensionWithComplexCount;
+ private int measureCount;
+ private int[] measureDataTypes;
+ private StreamBlockletWriter output = null;
+
+ // data write
+ private String segmentDir;
+ private String fileName;
+ private DataOutputStream outputStream;
+ private boolean isFirstRow = true;
+ private boolean hasException = false;
+
+ CarbonStreamRecordWriter(TaskAttemptContext job) throws IOException {
+ initialize(job);
+ }
+
+ private void initialize(TaskAttemptContext job) throws IOException {
+ // set basic information
+ hadoopConf = job.getConfiguration();
+ CarbonLoadModel carbonLoadModel = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf);
+ if (carbonLoadModel == null) {
+ throw new IOException(
+ "CarbonStreamRecordWriter require configuration: mapreduce.output.carbon.load.model");
+ }
+ carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
+ int taskNo = TaskID.forName(hadoopConf.get("mapred.tip.id")).getId();
+ carbonLoadModel.setTaskNo("" + taskNo);
+ configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel);
+ maxRowNums = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS,
+ CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT) - 1;
+ maxCacheSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE,
+ CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE_DEFAULT);
+
+ CarbonTablePath tablePath =
+ CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
+ segmentDir = tablePath.getSegmentDir("0", carbonLoadModel.getSegmentId());
+ fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0");
+ }
+
+ private void initializeAtFirstRow() throws IOException, InterruptedException {
+ isFirstRow = false;
+
+ // initialize metadata
+ isNoDictionaryDimensionColumn =
+ CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
+ dimensionWithComplexCount = configuration.getDimensionCount();
+ measureCount = configuration.getMeasureCount();
+ dataFields = configuration.getDataFields();
+ measureDataTypes = new int[measureCount];
+ for (int i = 0; i < measureCount; i++) {
+ measureDataTypes[i] =
+ dataFields[dimensionWithComplexCount + i].getColumn().getDataType().getId();
+ }
+
+ // initialize parser and converter
+ rowParser = new RowParserImpl(dataFields, configuration);
+ BadRecordsLogger badRecordLogger =
+ DataConverterProcessorStepImpl.createBadRecordLogger(configuration);
+ converter = new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger);
+ configuration.setCardinalityFinder(converter);
+ converter.initialize();
+
+ // initialize encoder
+ nullBitSet = new BitSet(dataFields.length);
+ int rowBufferSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE,
+ CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT);
+ output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize);
+
+ // initialize data writer
+ String filePath = segmentDir + File.separator + fileName;
+ FileFactory.FileType fileType = FileFactory.getFileType(filePath);
+ CarbonFile carbonFile = FileFactory.getCarbonFile(filePath, fileType);
+ if (carbonFile.exists()) {
+ // if the file is existed, use the append api
+ outputStream = FileFactory.getDataOutputStreamUsingAppend(filePath, fileType);
+ } else {
+ // IF the file is not existed, use the create api
+ outputStream = FileFactory.getDataOutputStream(filePath, fileType);
+ writeFileHeader();
+ }
+ }
+
+ @Override public void write(Void key, Object value) throws IOException, InterruptedException {
+ if (isFirstRow) {
+ initializeAtFirstRow();
+ }
+
+ // parse and convert row
+ currentRow.setData(rowParser.parseRow((Object[]) value));
+ converter.convert(currentRow);
+
+ // null bit set
+ nullBitSet.clear();
+ for (int i = 0; i < dataFields.length; i++) {
+ if (null == currentRow.getObject(i)) {
+ nullBitSet.set(i);
+ }
+ }
+ output.nextRow();
+ byte[] b = nullBitSet.toByteArray();
+ output.writeShort(b.length);
+ if (b.length > 0) {
+ output.writeBytes(b);
+ }
+ int dimCount = 0;
+ Object columnValue;
+
+ // primitive type dimension
+ for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
+ columnValue = currentRow.getObject(dimCount);
+ if (null != columnValue) {
+ if (isNoDictionaryDimensionColumn[dimCount]) {
+ byte[] col = (byte[]) columnValue;
+ output.writeShort(col.length);
+ output.writeBytes(col);
+ } else {
+ output.writeInt((int) columnValue);
+ }
+ }
+ }
+ // complex type dimension
+ for (; dimCount < dimensionWithComplexCount; dimCount++) {
+ columnValue = currentRow.getObject(dimCount);
+ if (null != columnValue) {
+ byte[] col = (byte[]) columnValue;
+ output.writeShort(col.length);
+ output.writeBytes(col);
+ }
+ }
+ // measure
+ int dataType;
+ for (int msrCount = 0; msrCount < measureCount; msrCount++) {
+ columnValue = currentRow.getObject(dimCount + msrCount);
+ if (null != columnValue) {
+ dataType = measureDataTypes[msrCount];
+ if (dataType == DataTypes.BOOLEAN_TYPE_ID) {
+ output.writeBoolean((boolean) columnValue);
+ } else if (dataType == DataTypes.SHORT_TYPE_ID) {
+ output.writeShort((short) columnValue);
+ } else if (dataType == DataTypes.INT_TYPE_ID) {
+ output.writeInt((int) columnValue);
+ } else if (dataType == DataTypes.LONG_TYPE_ID) {
+ output.writeLong((long) columnValue);
+ } else if (dataType == DataTypes.DOUBLE_TYPE_ID) {
+ output.writeDouble((double) columnValue);
+ } else if (dataType == DataTypes.DECIMAL_TYPE_ID) {
+ BigDecimal val = (BigDecimal) columnValue;
+ byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+ output.writeShort(bigDecimalInBytes.length);
+ output.writeBytes(bigDecimalInBytes);
+ } else {
+ String msg =
+ "unsupported data type:" + dataFields[dimCount + msrCount].getColumn().getDataType()
+ .getName();
+ LOGGER.error(msg);
+ throw new IOException(msg);
+ }
+ }
+ }
+
+ if (output.isFull()) {
+ appendBlockletToDataFile();
+ }
+ }
+
+ private void writeFileHeader() throws IOException {
+ List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
+ .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getFactTableName()),
+ carbonTable.getMeasureByTableName(carbonTable.getFactTableName()));
+ int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
+ for (int i = 0; i < dimLensWithComplex.length; i++) {
+ dimLensWithComplex[i] = Integer.MAX_VALUE;
+ }
+ int[] dictionaryColumnCardinality =
+ CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
+ List<Integer> cardinality = new ArrayList<>();
+ List<org.apache.carbondata.format.ColumnSchema> columnSchemaList = AbstractFactDataWriter
+ .getColumnSchemaListAndCardinality(cardinality, dictionaryColumnCardinality,
+ wrapperColumnSchemaList);
+ FileHeader fileHeader =
+ CarbonMetadataUtil.getFileHeader(true, columnSchemaList, System.currentTimeMillis());
+ fileHeader.setIs_footer_present(false);
+ fileHeader.setIs_splitable(true);
+ fileHeader.setSync_marker(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
+ outputStream.write(CarbonUtil.getByteArray(fileHeader));
+ }
+
+ /**
+ * write a blocklet to file
+ */
+ private void appendBlockletToDataFile() throws IOException {
+ if (output.getRowIndex() == -1) {
+ return;
+ }
+ output.apppendBlocklet(outputStream);
+ outputStream.flush();
+ // reset data
+ output.reset();
+ }
+
+ @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ try {
+ // append remain buffer data
+ if (!hasException) {
+ appendBlockletToDataFile();
+ converter.finish();
+ }
+ } finally {
+ // close resource
+ CarbonUtil.closeStreams(outputStream);
+ output.close();
+ }
+ }
+
+ public String getSegmentDir() {
+ return segmentDir;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
new file mode 100644
index 0000000..eafb142
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.BlockletHeader;
+
+/**
+ * stream blocklet reader
+ */
+public class StreamBlockletReader {
+
+ private byte[] buffer;
+ private int offset;
+ private final byte[] syncMarker;
+ private final byte[] syncBuffer;
+ private final int syncLen;
+ private long pos = 0;
+ private final InputStream in;
+ private final long limitStart;
+ private final long limitEnd;
+ private boolean isAlreadySync = false;
+ private Compressor compressor = CompressorFactory.getInstance().getCompressor();
+ private int rowNums = 0;
+ private int rowIndex = 0;
+ private boolean isHeaderPresent;
+
+ StreamBlockletReader(byte[] syncMarker, InputStream in, long limit, boolean isHeaderPresent) {
+ this.syncMarker = syncMarker;
+ this.syncLen = syncMarker.length;
+ this.syncBuffer = new byte[syncMarker.length];
+ this.in = in;
+ this.limitStart = limit;
+ this.limitEnd = limitStart + syncLen;
+ this.isHeaderPresent = isHeaderPresent;
+ }
+
+ private void ensureCapacity(int capacity) {
+ if (buffer == null || capacity > buffer.length) {
+ buffer = new byte[capacity];
+ }
+ }
+
+ /**
+ * find the first position of sync_marker in input stream
+ */
+ private boolean sync() throws IOException {
+ int len = in.read(syncBuffer);
+ if (len < syncLen) {
+ return false;
+ }
+ pos += syncLen;
+ boolean skipHeader = false;
+ for (int i = 0; i < limitStart; i++) {
+ int j = 0;
+ for (; j < syncLen; j++) {
+ if (syncMarker[j] != syncBuffer[(i + j) % syncLen]) break;
+ }
+ if (syncLen == j) {
+ if (isHeaderPresent) {
+ if (skipHeader) {
+ return true;
+ } else {
+ skipHeader = true;
+ }
+ } else {
+ return true;
+ }
+ }
+ int value = in.read();
+ if (-1 == value) {
+ return false;
+ }
+ syncBuffer[i % syncLen] = (byte) value;
+ pos++;
+ }
+ return false;
+ }
+
+ BlockletHeader readBlockletHeader() throws IOException {
+ int len = readIntFromStream();
+ byte[] b = new byte[len];
+ readBytesFromStream(b);
+ BlockletHeader header = CarbonUtil.readBlockletHeader(b);
+ rowNums = header.getBlocklet_info().getNum_rows();
+ rowIndex = 0;
+ return header;
+ }
+
+ void readBlockletData(BlockletHeader header) throws IOException {
+ ensureCapacity(header.getBlocklet_length());
+ offset = 0;
+ int len = readIntFromStream();
+ byte[] b = new byte[len];
+ readBytesFromStream(b);
+ compressor.rawUncompress(b, buffer);
+ }
+
+ void skipBlockletData(boolean reset) throws IOException {
+ int len = readIntFromStream();
+ skip(len);
+ pos += len;
+ if (reset) {
+ this.rowNums = 0;
+ this.rowIndex = 0;
+ }
+ }
+
+ private void skip(int len) throws IOException {
+ long remaining = len;
+ do {
+ long skipLen = in.skip(remaining);
+ remaining -= skipLen;
+ } while (remaining > 0);
+ }
+
+ /**
+ * find the next blocklet
+ */
+ boolean nextBlocklet() throws IOException {
+ if (pos >= limitStart) {
+ return false;
+ }
+ if (isAlreadySync) {
+ int v = in.read(syncBuffer);
+ if (v < syncLen) {
+ return false;
+ }
+ pos += syncLen;
+ } else {
+ isAlreadySync = true;
+ if (!sync()) {
+ return false;
+ }
+ }
+
+ return pos < limitEnd;
+ }
+
+ boolean hasNext() throws IOException {
+ return rowIndex < rowNums;
+ }
+
+ void nextRow() {
+ rowIndex++;
+ }
+
+ int readIntFromStream() throws IOException {
+ int ch1 = in.read();
+ int ch2 = in.read();
+ int ch3 = in.read();
+ int ch4 = in.read();
+ if ((ch1 | ch2 | ch3 | ch4) < 0) throw new EOFException();
+ pos += 4;
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+ }
+
+ void readBytesFromStream(byte[] b) throws IOException {
+ int len = in.read(b, 0, b.length);
+ if (len < b.length) {
+ throw new EOFException();
+ }
+ pos += b.length;
+ }
+
+ boolean readBoolean() {
+ return (buffer[offset++]) != 0;
+ }
+
+ short readShort() {
+ short v = (short) ((buffer[offset + 1] & 255) +
+ ((buffer[offset]) << 8));
+ offset += 2;
+ return v;
+ }
+
+ byte[] copy(int len) {
+ byte[] b = new byte[len];
+ System.arraycopy(buffer, offset, b, 0, len);
+ return b;
+ }
+
+ int readInt() {
+ int v = ((buffer[offset + 3] & 255) +
+ ((buffer[offset + 2] & 255) << 8) +
+ ((buffer[offset + 1] & 255) << 16) +
+ ((buffer[offset]) << 24));
+ offset += 4;
+ return v;
+ }
+
+ long readLong() {
+ long v = ((long)(buffer[offset + 7] & 255)) +
+ ((long) (buffer[offset + 6] & 255) << 8) +
+ ((long) (buffer[offset + 5] & 255) << 16) +
+ ((long) (buffer[offset + 4] & 255) << 24) +
+ ((long) (buffer[offset + 3] & 255) << 32) +
+ ((long) (buffer[offset + 2] & 255) << 40) +
+ ((long) (buffer[offset + 1] & 255) << 48) +
+ ((long) (buffer[offset]) << 56);
+ offset += 8;
+ return v;
+ }
+
+ double readDouble() {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ byte[] readBytes(int len) {
+ byte[] b = new byte[len];
+ System.arraycopy(buffer, offset, b, 0, len);
+ offset += len;
+ return b;
+ }
+
+ void skipBytes(int len) {
+ offset += len;
+ }
+
+ int getRowNums() {
+ return rowNums;
+ }
+
+ void close() {
+ CarbonUtil.closeStreams(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java
new file mode 100644
index 0000000..a0328b3
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.BlockletHeader;
+import org.apache.carbondata.format.BlockletInfo;
+import org.apache.carbondata.format.MutationType;
+
+/**
+ * stream blocklet writer
+ */
+public class StreamBlockletWriter {
+ private byte[] buffer;
+ private int maxSize;
+ private int maxRowNum;
+ private int rowSize;
+ private int count = 0;
+ private int rowIndex = -1;
+ private Compressor compressor = CompressorFactory.getInstance().getCompressor();
+
+ StreamBlockletWriter(int maxSize, int maxRowNum, int rowSize) {
+ buffer = new byte[maxSize];
+ this.maxSize = maxSize;
+ this.maxRowNum = maxRowNum;
+ this.rowSize = rowSize;
+ }
+
+ private void ensureCapacity(int space) {
+ int newcount = space + count;
+ if (newcount > buffer.length) {
+ byte[] newbuf = new byte[Math.max(newcount, buffer.length + rowSize)];
+ System.arraycopy(buffer, 0, newbuf, 0, count);
+ buffer = newbuf;
+ }
+ }
+
+ void reset() {
+ count = 0;
+ rowIndex = -1;
+ }
+
+ byte[] getBytes() {
+ return buffer;
+ }
+
+ int getCount() {
+ return count;
+ }
+
+ int getRowIndex() {
+ return rowIndex;
+ }
+
+ void nextRow() {
+ rowIndex++;
+ }
+
+ boolean isFull() {
+ return rowIndex == maxRowNum || count >= maxSize;
+ }
+
+ void writeBoolean(boolean val) {
+ ensureCapacity(1);
+ buffer[count] = (byte) (val ? 1 : 0);
+ count += 1;
+ }
+
+ void writeShort(int val) {
+ ensureCapacity(2);
+ buffer[count + 1] = (byte) (val);
+ buffer[count] = (byte) (val >>> 8);
+ count += 2;
+ }
+
+ void writeInt(int val) {
+ ensureCapacity(4);
+ buffer[count + 3] = (byte) (val);
+ buffer[count + 2] = (byte) (val >>> 8);
+ buffer[count + 1] = (byte) (val >>> 16);
+ buffer[count] = (byte) (val >>> 24);
+ count += 4;
+ }
+
+ void writeLong(long val) {
+ ensureCapacity(8);
+ buffer[count + 7] = (byte) (val);
+ buffer[count + 6] = (byte) (val >>> 8);
+ buffer[count + 5] = (byte) (val >>> 16);
+ buffer[count + 4] = (byte) (val >>> 24);
+ buffer[count + 3] = (byte) (val >>> 32);
+ buffer[count + 2] = (byte) (val >>> 40);
+ buffer[count + 1] = (byte) (val >>> 48);
+ buffer[count] = (byte) (val >>> 56);
+ count += 8;
+ }
+
+ void writeDouble(double val) {
+ writeLong(Double.doubleToLongBits(val));
+ }
+
+ void writeBytes(byte[] b) {
+ writeBytes(b, 0, b.length);
+ }
+
+ void writeBytes(byte[] b, int off, int len) {
+ ensureCapacity(len);
+ System.arraycopy(b, off, buffer, count, len);
+ count += len;
+ }
+
+ void apppendBlocklet(DataOutputStream outputStream) throws IOException {
+ outputStream.write(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
+
+ BlockletInfo blockletInfo = new BlockletInfo();
+ blockletInfo.setNum_rows(getRowIndex() + 1);
+ BlockletHeader blockletHeader = new BlockletHeader();
+ blockletHeader.setBlocklet_length(getCount());
+ blockletHeader.setMutation(MutationType.INSERT);
+ blockletHeader.setBlocklet_info(blockletInfo);
+ byte[] headerBytes = CarbonUtil.getByteArray(blockletHeader);
+ outputStream.writeInt(headerBytes.length);
+ outputStream.write(headerBytes);
+
+ byte[] compressed = compressor.compressByte(getBytes(), getCount());
+ outputStream.writeInt(compressed.length);
+ outputStream.write(compressed);
+ }
+
+ void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index a559cc4..b4444be 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -109,12 +109,14 @@ public class CarbonInputFormatUtil {
plan.addDimension(queryDimension);
}
- public static void processFilterExpression(Expression filterExpression, CarbonTable carbonTable) {
+ public static void processFilterExpression(Expression filterExpression, CarbonTable carbonTable,
+ boolean[] isFilterDimensions, boolean[] isFilterMeasures) {
List<CarbonDimension> dimensions =
carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
List<CarbonMeasure> measures =
carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
- QueryModel.processFilterExpression(filterExpression, dimensions, measures);
+ QueryModel.processFilterExpression(filterExpression, dimensions, measures,
+ isFilterDimensions, isFilterMeasures);
if (null != filterExpression) {
// Optimize Filter Expression and fit RANGE filters is conditions apply.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java
new file mode 100644
index 0000000..395015e
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonTypeUtil.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.util;
+
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+
+public class CarbonTypeUtil {
+
+ public static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
+ DataType carbonDataType) {
+ if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) {
+ return DataTypes.StringType;
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) {
+ return DataTypes.ShortType;
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT) {
+ return DataTypes.IntegerType;
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
+ return DataTypes.LongType;
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) {
+ return DataTypes.DoubleType;
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) {
+ return DataTypes.BooleanType;
+ } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(carbonDataType)) {
+ return DataTypes.createDecimalType();
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) {
+ return DataTypes.TimestampType;
+ } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
+ return DataTypes.DateType;
+ } else {
+ return null;
+ }
+ }
+
+ public static StructField[] convertCarbonSchemaToSparkSchema(CarbonColumn[] carbonColumns) {
+ StructField[] fields = new StructField[carbonColumns.length];
+ for (int i = 0; i < carbonColumns.length; i++) {
+ CarbonColumn carbonColumn = carbonColumns[i];
+ if (carbonColumn.isDimension()) {
+ if (carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(carbonColumn.getDataType());
+ fields[i] = new StructField(carbonColumn.getColName(),
+ CarbonTypeUtil.convertCarbonToSparkDataType(generator.getReturnType()), true, null);
+ } else if (!carbonColumn.hasEncoding(Encoding.DICTIONARY)) {
+ fields[i] = new StructField(carbonColumn.getColName(),
+ CarbonTypeUtil.convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null);
+ } else if (carbonColumn.isComplex()) {
+ fields[i] = new StructField(carbonColumn.getColName(),
+ CarbonTypeUtil.convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null);
+ } else {
+ fields[i] = new StructField(carbonColumn.getColName(), CarbonTypeUtil
+ .convertCarbonToSparkDataType(
+ org.apache.carbondata.core.metadata.datatype.DataTypes.INT), true, null);
+ }
+ } else if (carbonColumn.isMeasure()) {
+ DataType dataType = carbonColumn.getDataType();
+ if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN
+ || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT
+ || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT
+ || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
+ fields[i] = new StructField(carbonColumn.getColName(),
+ CarbonTypeUtil.convertCarbonToSparkDataType(dataType), true, null);
+ } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) {
+ CarbonMeasure measure = (CarbonMeasure) carbonColumn;
+ fields[i] = new StructField(carbonColumn.getColName(),
+ new DecimalType(measure.getPrecision(), measure.getScale()), true, null);
+ } else {
+ fields[i] = new StructField(carbonColumn.getColName(), CarbonTypeUtil
+ .convertCarbonToSparkDataType(
+ org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE), true, null);
+ }
+ }
+ }
+ return fields;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index ea90bbf..29d8d03 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -116,6 +116,45 @@ public class StoreCreator {
return absoluteTableIdentifier;
}
+ public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String factFilePath) {
+ CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
+ CarbonLoadModel loadModel = new CarbonLoadModel();
+ loadModel.setCarbonDataLoadSchema(schema);
+ loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
+ loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+ loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
+ loadModel.setFactFilePath(factFilePath);
+ loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
+ loadModel.setStorePath(absoluteTableIdentifier.getStorePath());
+ loadModel.setDateFormat(null);
+ loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS));
+ loadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_DATE_FORMAT,
+ CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
+ loadModel
+ .setSerializationNullFormat(
+ TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + "\\N");
+ loadModel
+ .setBadRecordsLoggerEnable(
+ TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + "false");
+ loadModel
+ .setBadRecordsAction(
+ TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + "FORCE");
+ loadModel
+ .setIsEmptyDataBadRecord(
+ DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + "false");
+ loadModel.setCsvHeader("ID,date,country,name,phonetype,serialname,salary");
+ loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(","));
+ loadModel.setTaskNo("0");
+ loadModel.setSegmentId("0");
+ loadModel.setPartitionId("0");
+ loadModel.setFactTimeStamp(System.currentTimeMillis());
+ loadModel.setMaxColumns("10");
+ return loadModel;
+ }
+
/**
* Create store without any restructure
*/
@@ -131,42 +170,7 @@ public class StoreCreator {
CarbonTable table = createTable();
writeDictionary(factFilePath, table);
- CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
- CarbonLoadModel loadModel = new CarbonLoadModel();
- String partitionId = "0";
- loadModel.setCarbonDataLoadSchema(schema);
- loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
- loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
- loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
- loadModel.setFactFilePath(factFilePath);
- loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
- loadModel.setStorePath(absoluteTableIdentifier.getStorePath());
- loadModel.setDateFormat(null);
- loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
- CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS));
- loadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_DATE_FORMAT,
- CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
- loadModel
- .setSerializationNullFormat(
- TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + "\\N");
- loadModel
- .setBadRecordsLoggerEnable(
- TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + "false");
- loadModel
- .setBadRecordsAction(
- TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + "FORCE");
- loadModel
- .setIsEmptyDataBadRecord(
- DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + "false");
- loadModel.setCsvHeader("ID,date,country,name,phonetype,serialname,salary");
- loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(","));
- loadModel.setTaskNo("0");
- loadModel.setSegmentId("0");
- loadModel.setPartitionId("0");
- loadModel.setFactTimeStamp(System.currentTimeMillis());
- loadModel.setMaxColumns("10");
+ CarbonLoadModel loadModel = buildCarbonLoadModel(table, factFilePath);
executeGraph(loadModel, absoluteTableIdentifier.getStorePath());
@@ -176,7 +180,7 @@ public class StoreCreator {
}
- private static CarbonTable createTable() throws IOException {
+ public static CarbonTable createTable() throws IOException {
TableInfo tableInfo = new TableInfo();
tableInfo.setStorePath(absoluteTableIdentifier.getStorePath());
tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
index 2e840c0..4cbc692 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -135,7 +135,7 @@ public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
QueryModel.createModel(identifier, queryPlan, carbonTable, new DataTypeConverterImpl());
// set the filter to the query model in order to filter blocklet before scan
Expression filter = getFilterPredicates(configuration);
- CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+ CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null);
FilterResolverIntf filterIntf =
CarbonInputFormatUtil.resolveFilter(filter, identifier, tableProvider);
queryModel.setFilterExpressionResolverTree(filterIntf);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
index f129474..5e3e5b7 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.hadoop.AbstractRecordReader;
import org.apache.carbondata.hadoop.CarbonInputSplit;
import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.util.CarbonTypeUtil;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index fc34127..7ec6b7b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -21,6 +21,8 @@ import java.text.SimpleDateFormat
import java.util.{ArrayList, Date, List}
import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import org.apache.hadoop.conf.Configuration
@@ -38,9 +40,11 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.model.QueryModel
import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, TaskMetricsMap}
+import org.apache.carbondata.core.statusmanager.FileFormat
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, DataTypeUtil, TaskMetricsMap}
import org.apache.carbondata.hadoop._
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.InitInputMetrics
import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
@@ -82,11 +86,45 @@ class CarbonScanRDD(
// get splits
val splits = format.getSplits(job)
- val result = distributeSplits(splits)
- result
+
+ // separate split
+ // 1. for batch splits, invoke distributeSplits method to create partitions
+ // 2. for stream splits, create partition for each split by default
+ val columnarSplits = new ArrayList[InputSplit]()
+ val streamSplits = new ArrayBuffer[InputSplit]()
+ splits.asScala.foreach { split =>
+ val carbonInputSplit = split.asInstanceOf[CarbonInputSplit]
+ if (FileFormat.rowformat == carbonInputSplit.getFileFormat) {
+ streamSplits += split
+ } else {
+ columnarSplits.add(split)
+ }
+ }
+ val batchPartitions = distributeColumnarSplits(columnarSplits)
+ if (streamSplits.isEmpty) {
+ batchPartitions.toArray
+ } else {
+ val index = batchPartitions.length
+ val streamPartitions: mutable.Buffer[Partition] =
+ streamSplits.zipWithIndex.map { splitWithIndex =>
+ val multiBlockSplit =
+ new CarbonMultiBlockSplit(identifier,
+ Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
+ splitWithIndex._1.getLocations,
+ FileFormat.rowformat)
+ new CarbonSparkPartition(id, splitWithIndex._2 + index, multiBlockSplit)
+ }
+ if (batchPartitions.isEmpty) {
+ streamPartitions.toArray
+ } else {
+ // should keep the order by index of partition
+ batchPartitions.appendAll(streamPartitions)
+ batchPartitions.toArray
+ }
+ }
}
- private def distributeSplits(splits: List[InputSplit]): Array[Partition] = {
+ private def distributeColumnarSplits(splits: List[InputSplit]): mutable.Buffer[Partition] = {
// this function distributes the split based on following logic:
// 1. based on data locality, to make split balanced on all available nodes
// 2. if the number of split for one
@@ -190,7 +228,7 @@ class CarbonScanRDD(
| no.of.nodes: $noOfNodes,
| parallelism: $parallelism
""".stripMargin)
- result.toArray(new Array[Partition](result.size()))
+ result.asScala
}
override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
@@ -210,20 +248,34 @@ class CarbonScanRDD(
inputMetricsStats.initBytesReadCallback(context, inputSplit)
val iterator = if (inputSplit.getAllSplits.size() > 0) {
val model = format.getQueryModel(inputSplit, attemptContext)
- val reader = {
- if (vectorReader) {
- val carbonRecordReader = createVectorizedCarbonRecordReader(model, inputMetricsStats)
- if (carbonRecordReader == null) {
- new CarbonRecordReader(model,
- format.getReadSupportClass(attemptContext.getConfiguration), inputMetricsStats)
+ // get RecordReader by FileFormat
+ val reader: RecordReader[Void, Object] = inputSplit.getFileFormat match {
+ case FileFormat.rowformat =>
+ // create record reader for row format
+ DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
+ val inputFormat = new CarbonStreamInputFormat
+ val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
+ .asInstanceOf[CarbonStreamRecordReader]
+ streamReader.setVectorReader(vectorReader)
+ model.setStatisticsRecorder(
+ CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId))
+ streamReader.setQueryModel(model)
+ streamReader
+ case _ =>
+ // create record reader for CarbonData file format
+ if (vectorReader) {
+ val carbonRecordReader = createVectorizedCarbonRecordReader(model, inputMetricsStats)
+ if (carbonRecordReader == null) {
+ new CarbonRecordReader(model,
+ format.getReadSupportClass(attemptContext.getConfiguration), inputMetricsStats)
+ } else {
+ carbonRecordReader
+ }
} else {
- carbonRecordReader
+ new CarbonRecordReader(model,
+ format.getReadSupportClass(attemptContext.getConfiguration),
+ inputMetricsStats)
}
- } else {
- new CarbonRecordReader(model,
- format.getReadSupportClass(attemptContext.getConfiguration),
- inputMetricsStats)
- }
}
reader.initialize(inputSplit, attemptContext)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index ed01728..18e37ad 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -36,7 +36,7 @@
<dependencies>
<dependency>
<groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-spark-common</artifactId>
+ <artifactId>carbondata-streaming</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index d5adc2f..10336eb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -23,9 +23,11 @@ import org.apache.spark.CarbonInputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.command.management.LoadTableByInsertCommand
+import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.optimizer.CarbonFilters
-import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation, StreamSinkProvider}
+import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -36,6 +38,7 @@ import org.apache.carbondata.core.scan.expression.logical.AndExpression
import org.apache.carbondata.core.util.{CarbonSessionInfo, ThreadLocalSessionInfo}
import org.apache.carbondata.hadoop.CarbonProjection
import org.apache.carbondata.spark.rdd.CarbonScanRDD
+import org.apache.carbondata.streaming.StreamSinkFactory
case class CarbonDatasourceHadoopRelation(
sparkSession: SparkSession,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index eeca8b8..6020eee 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.SparkSession.Builder
+import org.apache.spark.sql.execution.streaming.CarbonStreamingQueryListener
import org.apache.spark.sql.hive.CarbonSessionState
import org.apache.spark.sql.internal.{SessionState, SharedState}
import org.apache.spark.util.Utils
@@ -168,6 +169,7 @@ object CarbonSession {
SparkSession.sqlListener.set(null)
}
})
+ session.streams.addListener(new CarbonStreamingQueryListener(session))
}
return session