You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/28 03:19:43 UTC
[2/5] carbondata git commit: [CARBONDATA-2165]Remove spark in
carbon-hadoop module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
new file mode 100644
index 0000000..69d2a3b
--- /dev/null
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java
@@ -0,0 +1,761 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonHeaderReader;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.intf.RowImpl;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.format.BlockletHeader;
+import org.apache.carbondata.format.FileHeader;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.InputMetricsStats;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.types.CalendarIntervalType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Stream record reader
+ */
+public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
+ // vector reader
+ private boolean isVectorReader;
+
+ // metadata
+ private CarbonTable carbonTable;
+ private CarbonColumn[] storageColumns;
+ private boolean[] isRequired;
+ private DataType[] measureDataTypes;
+ private int dimensionCount;
+ private int measureCount;
+
+ // input
+ private FileSplit fileSplit;
+ private Configuration hadoopConf;
+ private StreamBlockletReader input;
+ private boolean isFirstRow = true;
+ private QueryModel model;
+
+ // decode data
+ private BitSet allNonNull;
+ private boolean[] isNoDictColumn;
+ private DirectDictionaryGenerator[] directDictionaryGenerators;
+ private CacheProvider cacheProvider;
+ private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
+ private GenericQueryType[] queryTypes;
+
+ // vectorized reader
+ private StructType outputSchema;
+ private ColumnarBatch columnarBatch;
+ private boolean isFinished = false;
+
+ // filter
+ private FilterExecuter filter;
+ private boolean[] isFilterRequired;
+ private Object[] filterValues;
+ private RowIntf filterRow;
+ private int[] filterMap;
+
+ // output
+ private CarbonColumn[] projection;
+ private boolean[] isProjectionRequired;
+ private int[] projectionMap;
+ private Object[] outputValues;
+ private InternalRow outputRow;
+
+ // empty project, null filter
+ private boolean skipScanData;
+
+ // return raw row for handoff
+ private boolean useRawRow = false;
+
+ // InputMetricsStats
+ private InputMetricsStats inputMetricsStats;
+
+ @Override public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ // input
+ if (split instanceof CarbonInputSplit) {
+ fileSplit = (CarbonInputSplit) split;
+ } else if (split instanceof CarbonMultiBlockSplit) {
+ fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0);
+ } else {
+ fileSplit = (FileSplit) split;
+ }
+
+ // metadata
+ hadoopConf = context.getConfiguration();
+ if (model == null) {
+ CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
+ model = format.createQueryModel(split, context);
+ }
+ carbonTable = model.getTable();
+ List<CarbonDimension> dimensions =
+ carbonTable.getDimensionByTableName(carbonTable.getTableName());
+ dimensionCount = dimensions.size();
+ List<CarbonMeasure> measures =
+ carbonTable.getMeasureByTableName(carbonTable.getTableName());
+ measureCount = measures.size();
+ List<CarbonColumn> carbonColumnList =
+ carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName());
+ storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]);
+ isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns);
+ directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length];
+ for (int i = 0; i < storageColumns.length; i++) {
+ if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
+ .getDirectDictionaryGenerator(storageColumns[i].getDataType());
+ }
+ }
+ measureDataTypes = new DataType[measureCount];
+ for (int i = 0; i < measureCount; i++) {
+ measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType();
+ }
+
+ // decode data
+ allNonNull = new BitSet(storageColumns.length);
+ projection = model.getProjectionColumns();
+
+ isRequired = new boolean[storageColumns.length];
+ boolean[] isFiltlerDimensions = model.getIsFilterDimensions();
+ boolean[] isFiltlerMeasures = model.getIsFilterMeasures();
+ isFilterRequired = new boolean[storageColumns.length];
+ filterMap = new int[storageColumns.length];
+ for (int i = 0; i < storageColumns.length; i++) {
+ if (storageColumns[i].isDimension()) {
+ if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) {
+ isRequired[i] = true;
+ isFilterRequired[i] = true;
+ filterMap[i] = storageColumns[i].getOrdinal();
+ }
+ } else {
+ if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) {
+ isRequired[i] = true;
+ isFilterRequired[i] = true;
+ filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal();
+ }
+ }
+ }
+
+ isProjectionRequired = new boolean[storageColumns.length];
+ projectionMap = new int[storageColumns.length];
+ for (int i = 0; i < storageColumns.length; i++) {
+ for (int j = 0; j < projection.length; j++) {
+ if (storageColumns[i].getColName().equals(projection[j].getColName())) {
+ isRequired[i] = true;
+ isProjectionRequired[i] = true;
+ projectionMap[i] = j;
+ break;
+ }
+ }
+ }
+
+ // initialize filter
+ if (null != model.getFilterExpressionResolverTree()) {
+ initializeFilter();
+ } else if (projection.length == 0) {
+ skipScanData = true;
+ }
+
+ }
+
+ private void initializeFilter() {
+
+ List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
+ .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
+ carbonTable.getMeasureByTableName(carbonTable.getTableName()));
+ int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
+ for (int i = 0; i < dimLensWithComplex.length; i++) {
+ dimLensWithComplex[i] = Integer.MAX_VALUE;
+ }
+
+ int[] dictionaryColumnCardinality =
+ CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
+ SegmentProperties segmentProperties =
+ new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality);
+ Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
+
+ FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree();
+ filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties,
+ complexDimensionInfoMap);
+ // for row filter, we need update column index
+ FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
+ carbonTable.getDimensionOrdinalMax());
+
+ }
+
+ public void setQueryModel(QueryModel model) {
+ this.model = model;
+ }
+
+ private byte[] getSyncMarker(String filePath) throws IOException {
+ CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath);
+ FileHeader header = headerReader.readHeader();
+ return header.getSync_marker();
+ }
+
+ public void setUseRawRow(boolean useRawRow) {
+ this.useRawRow = useRawRow;
+ }
+
+ private void initializeAtFirstRow() throws IOException {
+ filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount];
+ filterRow = new RowImpl();
+ filterRow.setValues(filterValues);
+
+ outputValues = new Object[projection.length];
+ outputRow = new GenericInternalRow(outputValues);
+
+ Path file = fileSplit.getPath();
+
+ byte[] syncMarker = getSyncMarker(file.toString());
+
+ FileSystem fs = file.getFileSystem(hadoopConf);
+
+ int bufferSize = Integer.parseInt(hadoopConf.get(CarbonStreamInputFormat.READ_BUFFER_SIZE,
+ CarbonStreamInputFormat.READ_BUFFER_SIZE_DEFAULT));
+
+ FSDataInputStream fileIn = fs.open(file, bufferSize);
+ fileIn.seek(fileSplit.getStart());
+ input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(),
+ fileSplit.getStart() == 0);
+
+ cacheProvider = CacheProvider.getInstance();
+ cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY);
+ queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache);
+
+ outputSchema = new StructType((StructField[])
+ DataTypeUtil.getDataTypeConverter().convertCarbonSchemaToSparkSchema(projection));
+ }
+
+ @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (isFirstRow) {
+ isFirstRow = false;
+ initializeAtFirstRow();
+ }
+ if (isFinished) {
+ return false;
+ }
+
+ if (isVectorReader) {
+ return nextColumnarBatch();
+ }
+
+ return nextRow();
+ }
+
+ /**
+ * for vector reader, check next columnar batch
+ */
+ private boolean nextColumnarBatch() throws IOException {
+ boolean hasNext;
+ boolean scanMore = false;
+ do {
+ // move to the next blocklet
+ hasNext = input.nextBlocklet();
+ if (hasNext) {
+ // read blocklet header
+ BlockletHeader header = input.readBlockletHeader();
+ if (isScanRequired(header)) {
+ scanMore = !scanBlockletAndFillVector(header);
+ } else {
+ input.skipBlockletData(true);
+ scanMore = true;
+ }
+ } else {
+ isFinished = true;
+ scanMore = false;
+ }
+ } while (scanMore);
+ return hasNext;
+ }
+
+ /**
+ * check next Row
+ */
+ private boolean nextRow() throws IOException {
+ // read row one by one
+ try {
+ boolean hasNext;
+ boolean scanMore = false;
+ do {
+ hasNext = input.hasNext();
+ if (hasNext) {
+ if (skipScanData) {
+ input.nextRow();
+ scanMore = false;
+ } else {
+ if (useRawRow) {
+ // read raw row for streaming handoff which does not require decode raw row
+ readRawRowFromStream();
+ } else {
+ readRowFromStream();
+ }
+ if (null != filter) {
+ scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax());
+ } else {
+ scanMore = false;
+ }
+ }
+ } else {
+ if (input.nextBlocklet()) {
+ BlockletHeader header = input.readBlockletHeader();
+ if (isScanRequired(header)) {
+ if (skipScanData) {
+ input.skipBlockletData(false);
+ } else {
+ input.readBlockletData(header);
+ }
+ } else {
+ input.skipBlockletData(true);
+ }
+ scanMore = true;
+ } else {
+ isFinished = true;
+ scanMore = false;
+ }
+ }
+ } while (scanMore);
+ return hasNext;
+ } catch (FilterUnsupportedException e) {
+ throw new IOException("Failed to filter row in detail reader", e);
+ }
+ }
+
+ @Override public Void getCurrentKey() throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override public Object getCurrentValue() throws IOException, InterruptedException {
+ if (isVectorReader) {
+ int value = columnarBatch.numValidRows();
+ if (inputMetricsStats != null) {
+ inputMetricsStats.incrementRecordRead((long) value);
+ }
+
+ return columnarBatch;
+ }
+
+ if (inputMetricsStats != null) {
+ inputMetricsStats.incrementRecordRead(1L);
+ }
+
+ return outputRow;
+ }
+
+ private boolean isScanRequired(BlockletHeader header) {
+ // TODO require to implement min-max index
+ if (null == filter) {
+ return true;
+ }
+ return true;
+ }
+
+ private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException {
+ // if filter is null and output projection is empty, use the row number of blocklet header
+ if (skipScanData) {
+ int rowNums = header.getBlocklet_info().getNum_rows();
+ columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, rowNums);
+ columnarBatch.setNumRows(rowNums);
+ input.skipBlockletData(true);
+ return rowNums > 0;
+ }
+
+ input.readBlockletData(header);
+ columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, input.getRowNums());
+ int rowNum = 0;
+ if (null == filter) {
+ while (input.hasNext()) {
+ readRowFromStream();
+ putRowToColumnBatch(rowNum++);
+ }
+ } else {
+ try {
+ while (input.hasNext()) {
+ readRowFromStream();
+ if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) {
+ putRowToColumnBatch(rowNum++);
+ }
+ }
+ } catch (FilterUnsupportedException e) {
+ throw new IOException("Failed to filter row in vector reader", e);
+ }
+ }
+ columnarBatch.setNumRows(rowNum);
+ return rowNum > 0;
+ }
+
+ private void readRowFromStream() {
+ input.nextRow();
+ short nullLen = input.readShort();
+ BitSet nullBitSet = allNonNull;
+ if (nullLen > 0) {
+ nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
+ }
+ int colCount = 0;
+ // primitive type dimension
+ for (; colCount < isNoDictColumn.length; colCount++) {
+ if (nullBitSet.get(colCount)) {
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = null;
+ }
+ } else {
+ if (isNoDictColumn[colCount]) {
+ int v = input.readShort();
+ if (isRequired[colCount]) {
+ byte[] b = input.readBytes(v);
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = b;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] =
+ DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(b,
+ storageColumns[colCount].getDataType());
+ }
+ } else {
+ input.skipBytes(v);
+ }
+ } else if (null != directDictionaryGenerators[colCount]) {
+ if (isRequired[colCount]) {
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = input.copy(4);
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] =
+ directDictionaryGenerators[colCount].getValueFromSurrogate(input.readInt());
+ } else {
+ input.skipBytes(4);
+ }
+ } else {
+ input.skipBytes(4);
+ }
+ } else {
+ if (isRequired[colCount]) {
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = input.copy(4);
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = input.readInt();
+ } else {
+ input.skipBytes(4);
+ }
+ } else {
+ input.skipBytes(4);
+ }
+ }
+ }
+ }
+ // complex type dimension
+ for (; colCount < dimensionCount; colCount++) {
+ if (nullBitSet.get(colCount)) {
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = null;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = null;
+ }
+ } else {
+ short v = input.readShort();
+ if (isRequired[colCount]) {
+ byte[] b = input.readBytes(v);
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = b;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = queryTypes[colCount]
+ .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(b));
+ }
+ } else {
+ input.skipBytes(v);
+ }
+ }
+ }
+ // measure
+ DataType dataType;
+ for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
+ if (nullBitSet.get(colCount)) {
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = null;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = null;
+ }
+ } else {
+ dataType = measureDataTypes[msrCount];
+ if (dataType == DataTypes.BOOLEAN) {
+ if (isRequired[colCount]) {
+ boolean v = input.readBoolean();
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = v;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = v;
+ }
+ } else {
+ input.skipBytes(1);
+ }
+ } else if (dataType == DataTypes.SHORT) {
+ if (isRequired[colCount]) {
+ short v = input.readShort();
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = v;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = v;
+ }
+ } else {
+ input.skipBytes(2);
+ }
+ } else if (dataType == DataTypes.INT) {
+ if (isRequired[colCount]) {
+ int v = input.readInt();
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = v;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = v;
+ }
+ } else {
+ input.skipBytes(4);
+ }
+ } else if (dataType == DataTypes.LONG) {
+ if (isRequired[colCount]) {
+ long v = input.readLong();
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = v;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = v;
+ }
+ } else {
+ input.skipBytes(8);
+ }
+ } else if (dataType == DataTypes.DOUBLE) {
+ if (isRequired[colCount]) {
+ double v = input.readDouble();
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = v;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] = v;
+ }
+ } else {
+ input.skipBytes(8);
+ }
+ } else if (DataTypes.isDecimal(dataType)) {
+ int len = input.readShort();
+ if (isRequired[colCount]) {
+ BigDecimal v = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
+ if (isFilterRequired[colCount]) {
+ filterValues[filterMap[colCount]] = v;
+ }
+ if (isProjectionRequired[colCount]) {
+ outputValues[projectionMap[colCount]] =
+ DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(v);
+ }
+ } else {
+ input.skipBytes(len);
+ }
+ }
+ }
+ }
+ }
+
+ private void readRawRowFromStream() {
+ input.nextRow();
+ short nullLen = input.readShort();
+ BitSet nullBitSet = allNonNull;
+ if (nullLen > 0) {
+ nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
+ }
+ int colCount = 0;
+ // primitive type dimension
+ for (; colCount < isNoDictColumn.length; colCount++) {
+ if (nullBitSet.get(colCount)) {
+ outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+ } else {
+ if (isNoDictColumn[colCount]) {
+ int v = input.readShort();
+ outputValues[colCount] = input.readBytes(v);
+ } else {
+ outputValues[colCount] = input.readInt();
+ }
+ }
+ }
+ // complex type dimension
+ for (; colCount < dimensionCount; colCount++) {
+ if (nullBitSet.get(colCount)) {
+ outputValues[colCount] = null;
+ } else {
+ short v = input.readShort();
+ outputValues[colCount] = input.readBytes(v);
+ }
+ }
+ // measure
+ DataType dataType;
+ for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
+ if (nullBitSet.get(colCount)) {
+ outputValues[colCount] = null;
+ } else {
+ dataType = measureDataTypes[msrCount];
+ if (dataType == DataTypes.BOOLEAN) {
+ outputValues[colCount] = input.readBoolean();
+ } else if (dataType == DataTypes.SHORT) {
+ outputValues[colCount] = input.readShort();
+ } else if (dataType == DataTypes.INT) {
+ outputValues[colCount] = input.readInt();
+ } else if (dataType == DataTypes.LONG) {
+ outputValues[colCount] = input.readLong();
+ } else if (dataType == DataTypes.DOUBLE) {
+ outputValues[colCount] = input.readDouble();
+ } else if (DataTypes.isDecimal(dataType)) {
+ int len = input.readShort();
+ outputValues[colCount] = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
+ }
+ }
+ }
+ }
+
+ private void putRowToColumnBatch(int rowId) {
+ for (int i = 0; i < projection.length; i++) {
+ Object value = outputValues[i];
+ ColumnVector col = columnarBatch.column(i);
+ org.apache.spark.sql.types.DataType t = col.dataType();
+ if (null == value) {
+ col.putNull(rowId);
+ } else {
+ if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
+ col.putBoolean(rowId, (boolean)value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
+ col.putByte(rowId, (byte) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
+ col.putShort(rowId, (short) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
+ col.putInt(rowId, (int) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
+ col.putLong(rowId, (long) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
+ col.putFloat(rowId, (float) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
+ col.putDouble(rowId, (double) value);
+ } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
+ UTF8String v = (UTF8String) value;
+ col.putByteArray(rowId, v.getBytes());
+ } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
+ DecimalType dt = (DecimalType)t;
+ Decimal d = Decimal.fromDecimal(value);
+ if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+ col.putInt(rowId, (int)d.toUnscaledLong());
+ } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+ col.putLong(rowId, d.toUnscaledLong());
+ } else {
+ final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+ byte[] bytes = integer.toByteArray();
+ col.putByteArray(rowId, bytes, 0, bytes.length);
+ }
+ } else if (t instanceof CalendarIntervalType) {
+ CalendarInterval c = (CalendarInterval) value;
+ col.getChildColumn(0).putInt(rowId, c.months);
+ col.getChildColumn(1).putLong(rowId, c.microseconds);
+ } else if (t instanceof org.apache.spark.sql.types.DateType) {
+ col.putInt(rowId, (int) value);
+ } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
+ col.putLong(rowId, (long) value);
+ }
+ }
+ }
+ }
+
+ @Override public float getProgress() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ public void setVectorReader(boolean isVectorReader) {
+ this.isVectorReader = isVectorReader;
+ }
+
+ public void setInputMetricsStats(InputMetricsStats inputMetricsStats) {
+ this.inputMetricsStats = inputMetricsStats;
+ }
+
+ @Override public void close() throws IOException {
+ if (null != input) {
+ input.close();
+ }
+ if (null != columnarBatch) {
+ columnarBatch.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
new file mode 100644
index 0000000..4e555d3
--- /dev/null
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.FileHeader;
+import org.apache.carbondata.processing.loading.BadRecordsLogger;
+import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.DataLoadProcessBuilder;
+import org.apache.carbondata.processing.loading.converter.RowConverter;
+import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.parser.RowParser;
+import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl;
+import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskID;
+
+/**
+ * Stream record writer
+ */
+public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(CarbonStreamRecordWriter.class.getName());
+
+ // basic info
+ private Configuration hadoopConf;
+ private CarbonLoadModel carbonLoadModel;
+ private CarbonDataLoadConfiguration configuration;
+ private CarbonTable carbonTable;
+ private int maxRowNums;
+ private int maxCacheSize;
+
+ // parser and converter
+ private RowParser rowParser;
+ private BadRecordsLogger badRecordLogger;
+ private RowConverter converter;
+ private CarbonRow currentRow = new CarbonRow(null);
+
+ // encoder
+ private DataField[] dataFields;
+ private BitSet nullBitSet;
+ private boolean[] isNoDictionaryDimensionColumn;
+ private int dimensionWithComplexCount;
+ private int measureCount;
+ private DataType[] measureDataTypes;
+ private StreamBlockletWriter output = null;
+
+ // data write
+ private String segmentDir;
+ private String fileName;
+ private DataOutputStream outputStream;
+ private boolean isFirstRow = true;
+ private boolean hasException = false;
+
+ CarbonStreamRecordWriter(TaskAttemptContext job) throws IOException {
+ initialize(job);
+ }
+
+ public CarbonStreamRecordWriter(TaskAttemptContext job, CarbonLoadModel carbonLoadModel)
+ throws IOException {
+ this.carbonLoadModel = carbonLoadModel;
+ initialize(job);
+ }
+
+ private void initialize(TaskAttemptContext job) throws IOException {
+ // set basic information
+ hadoopConf = job.getConfiguration();
+ if (carbonLoadModel == null) {
+ carbonLoadModel = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf);
+ if (carbonLoadModel == null) {
+ throw new IOException(
+ "CarbonStreamRecordWriter require configuration: mapreduce.output.carbon.load.model");
+ }
+ }
+ String segmentId = CarbonStreamOutputFormat.getSegmentId(hadoopConf);
+ carbonLoadModel.setSegmentId(segmentId);
+ carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
+ long taskNo = TaskID.forName(hadoopConf.get("mapred.tip.id")).getId();
+ carbonLoadModel.setTaskNo("" + taskNo);
+ configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel);
+ maxRowNums = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS,
+ CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT) - 1;
+ maxCacheSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE,
+ CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE_DEFAULT);
+
+ segmentDir = CarbonTablePath.getSegmentPath(
+ carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId);
+ fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0");
+ }
+
+ private void initializeAtFirstRow() throws IOException, InterruptedException {
+
+ // initialize metadata
+ isNoDictionaryDimensionColumn =
+ CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
+ dimensionWithComplexCount = configuration.getDimensionCount();
+ measureCount = configuration.getMeasureCount();
+ dataFields = configuration.getDataFields();
+ measureDataTypes = new DataType[measureCount];
+ for (int i = 0; i < measureCount; i++) {
+ measureDataTypes[i] =
+ dataFields[dimensionWithComplexCount + i].getColumn().getDataType();
+ }
+
+ // initialize parser and converter
+ rowParser = new RowParserImpl(dataFields, configuration);
+ badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration);
+ converter = new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger);
+ configuration.setCardinalityFinder(converter);
+ converter.initialize();
+
+ // initialize encoder
+ nullBitSet = new BitSet(dataFields.length);
+ int rowBufferSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE,
+ CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT);
+ output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize);
+
+ // initialize data writer
+ String filePath = segmentDir + File.separator + fileName;
+ FileFactory.FileType fileType = FileFactory.getFileType(filePath);
+ CarbonFile carbonFile = FileFactory.getCarbonFile(filePath, fileType);
+ if (carbonFile.exists()) {
+ // if the file is existed, use the append api
+ outputStream = FileFactory.getDataOutputStreamUsingAppend(filePath, fileType);
+ } else {
+ // IF the file is not existed, use the create api
+ outputStream = FileFactory.getDataOutputStream(filePath, fileType);
+ writeFileHeader();
+ }
+
+ isFirstRow = false;
+ }
+
+ @Override public void write(Void key, Object value) throws IOException, InterruptedException {
+ if (isFirstRow) {
+ initializeAtFirstRow();
+ }
+
+ // parse and convert row
+ currentRow.setData(rowParser.parseRow((Object[]) value));
+ converter.convert(currentRow);
+
+ // null bit set
+ nullBitSet.clear();
+ for (int i = 0; i < dataFields.length; i++) {
+ if (null == currentRow.getObject(i)) {
+ nullBitSet.set(i);
+ }
+ }
+ output.nextRow();
+ byte[] b = nullBitSet.toByteArray();
+ output.writeShort(b.length);
+ if (b.length > 0) {
+ output.writeBytes(b);
+ }
+ int dimCount = 0;
+ Object columnValue;
+
+ // primitive type dimension
+ for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
+ columnValue = currentRow.getObject(dimCount);
+ if (null != columnValue) {
+ if (isNoDictionaryDimensionColumn[dimCount]) {
+ byte[] col = (byte[]) columnValue;
+ output.writeShort(col.length);
+ output.writeBytes(col);
+ } else {
+ output.writeInt((int) columnValue);
+ }
+ }
+ }
+ // complex type dimension
+ for (; dimCount < dimensionWithComplexCount; dimCount++) {
+ columnValue = currentRow.getObject(dimCount);
+ if (null != columnValue) {
+ byte[] col = (byte[]) columnValue;
+ output.writeShort(col.length);
+ output.writeBytes(col);
+ }
+ }
+ // measure
+ DataType dataType;
+ for (int msrCount = 0; msrCount < measureCount; msrCount++) {
+ columnValue = currentRow.getObject(dimCount + msrCount);
+ if (null != columnValue) {
+ dataType = measureDataTypes[msrCount];
+ if (dataType == DataTypes.BOOLEAN) {
+ output.writeBoolean((boolean) columnValue);
+ } else if (dataType == DataTypes.SHORT) {
+ output.writeShort((short) columnValue);
+ } else if (dataType == DataTypes.INT) {
+ output.writeInt((int) columnValue);
+ } else if (dataType == DataTypes.LONG) {
+ output.writeLong((long) columnValue);
+ } else if (dataType == DataTypes.DOUBLE) {
+ output.writeDouble((double) columnValue);
+ } else if (DataTypes.isDecimal(dataType)) {
+ BigDecimal val = (BigDecimal) columnValue;
+ byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+ output.writeShort(bigDecimalInBytes.length);
+ output.writeBytes(bigDecimalInBytes);
+ } else {
+ String msg =
+ "unsupported data type:" + dataFields[dimCount + msrCount].getColumn().getDataType()
+ .getName();
+ LOGGER.error(msg);
+ throw new IOException(msg);
+ }
+ }
+ }
+
+ if (output.isFull()) {
+ appendBlockletToDataFile();
+ }
+ }
+
+ private void writeFileHeader() throws IOException {
+ List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
+ .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
+ carbonTable.getMeasureByTableName(carbonTable.getTableName()));
+ int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
+ for (int i = 0; i < dimLensWithComplex.length; i++) {
+ dimLensWithComplex[i] = Integer.MAX_VALUE;
+ }
+ int[] dictionaryColumnCardinality =
+ CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
+ List<Integer> cardinality = new ArrayList<>();
+ List<org.apache.carbondata.format.ColumnSchema> columnSchemaList = AbstractFactDataWriter
+ .getColumnSchemaListAndCardinality(cardinality, dictionaryColumnCardinality,
+ wrapperColumnSchemaList);
+ FileHeader fileHeader =
+ CarbonMetadataUtil.getFileHeader(true, columnSchemaList, System.currentTimeMillis());
+ fileHeader.setIs_footer_present(false);
+ fileHeader.setIs_splitable(true);
+ fileHeader.setSync_marker(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
+ outputStream.write(CarbonUtil.getByteArray(fileHeader));
+ }
+
+ /**
+ * write a blocklet to file
+ */
+ private void appendBlockletToDataFile() throws IOException {
+ if (output.getRowIndex() == -1) {
+ return;
+ }
+ output.apppendBlocklet(outputStream);
+ outputStream.flush();
+ // reset data
+ output.reset();
+ }
+
+ @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ try {
+ // append remain buffer data
+ if (!hasException && !isFirstRow) {
+ appendBlockletToDataFile();
+ converter.finish();
+ }
+ } finally {
+ // close resource
+ CarbonUtil.closeStreams(outputStream);
+ if (output != null) {
+ output.close();
+ }
+ if (badRecordLogger != null) {
+ badRecordLogger.closeStreams();
+ }
+ }
+ }
+
+ public String getSegmentDir() {
+ return segmentDir;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public void setHasException(boolean hasException) {
+ this.hasException = hasException;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
new file mode 100644
index 0000000..43fe6ed
--- /dev/null
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.BlockletHeader;
+
+/**
+ * stream blocklet reader
+ */
+public class StreamBlockletReader {
+
+ private byte[] buffer;
+ private int offset;
+ private final byte[] syncMarker;
+ private final byte[] syncBuffer;
+ private final int syncLen;
+ private long pos = 0;
+ private final InputStream in;
+ private final long limitStart;
+ private final long limitEnd;
+ private boolean isAlreadySync = false;
+ private Compressor compressor = CompressorFactory.getInstance().getCompressor();
+ private int rowNums = 0;
+ private int rowIndex = 0;
+ private boolean isHeaderPresent;
+
+ StreamBlockletReader(byte[] syncMarker, InputStream in, long limit, boolean isHeaderPresent) {
+ this.syncMarker = syncMarker;
+ syncLen = syncMarker.length;
+ syncBuffer = new byte[syncLen];
+ this.in = in;
+ limitStart = limit;
+ limitEnd = limitStart + syncLen;
+ this.isHeaderPresent = isHeaderPresent;
+ }
+
+ private void ensureCapacity(int capacity) {
+ if (buffer == null || capacity > buffer.length) {
+ buffer = new byte[capacity];
+ }
+ }
+
+ /**
+ * find the first position of sync_marker in input stream
+ */
+ private boolean sync() throws IOException {
+ if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
+ return false;
+ }
+ boolean skipHeader = false;
+ for (int i = 0; i < limitStart; i++) {
+ int j = 0;
+ for (; j < syncLen; j++) {
+ if (syncMarker[j] != syncBuffer[(i + j) % syncLen]) break;
+ }
+ if (syncLen == j) {
+ if (isHeaderPresent) {
+ if (skipHeader) {
+ return true;
+ } else {
+ skipHeader = true;
+ }
+ } else {
+ return true;
+ }
+ }
+ int value = in.read();
+ if (-1 == value) {
+ return false;
+ }
+ syncBuffer[i % syncLen] = (byte) value;
+ pos++;
+ }
+ return false;
+ }
+
+ BlockletHeader readBlockletHeader() throws IOException {
+ int len = readIntFromStream();
+ byte[] b = new byte[len];
+ if (!readBytesFromStream(b, 0, len)) {
+ throw new EOFException("Failed to read blocklet header");
+ }
+ BlockletHeader header = CarbonUtil.readBlockletHeader(b);
+ rowNums = header.getBlocklet_info().getNum_rows();
+ rowIndex = 0;
+ return header;
+ }
+
+ void readBlockletData(BlockletHeader header) throws IOException {
+ ensureCapacity(header.getBlocklet_length());
+ offset = 0;
+ int len = readIntFromStream();
+ byte[] b = new byte[len];
+ if (!readBytesFromStream(b, 0, len)) {
+ throw new EOFException("Failed to read blocklet data");
+ }
+ compressor.rawUncompress(b, buffer);
+ }
+
+ void skipBlockletData(boolean reset) throws IOException {
+ int len = readIntFromStream();
+ skip(len);
+ pos += len;
+ if (reset) {
+ this.rowNums = 0;
+ this.rowIndex = 0;
+ }
+ }
+
+ private void skip(int len) throws IOException {
+ long remaining = len;
+ do {
+ long skipLen = in.skip(remaining);
+ remaining -= skipLen;
+ } while (remaining > 0);
+ }
+
+ /**
+ * find the next blocklet
+ */
+ boolean nextBlocklet() throws IOException {
+ if (pos >= limitStart) {
+ return false;
+ }
+ if (isAlreadySync) {
+ if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
+ return false;
+ }
+ } else {
+ isAlreadySync = true;
+ if (!sync()) {
+ return false;
+ }
+ }
+
+ return pos < limitEnd;
+ }
+
+ boolean hasNext() throws IOException {
+ return rowIndex < rowNums;
+ }
+
+ void nextRow() {
+ rowIndex++;
+ }
+
+ int readIntFromStream() throws IOException {
+ int ch1 = in.read();
+ int ch2 = in.read();
+ int ch3 = in.read();
+ int ch4 = in.read();
+ if ((ch1 | ch2 | ch3 | ch4) < 0) throw new EOFException();
+ pos += 4;
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+ }
+
+ /**
+ * Reads <code>len</code> bytes of data from the input stream into
+ * an array of bytes.
+ * @return <code>true</code> if reading data successfully, or
+ * <code>false</code> if there is no more data because the end of the stream has been reached.
+ */
+ boolean readBytesFromStream(byte[] b, int offset, int len) throws IOException {
+ int readLen = in.read(b, offset, len);
+ if (readLen < 0) {
+ return false;
+ }
+ pos += readLen;
+ if (readLen < len) {
+ return readBytesFromStream(b, offset + readLen, len - readLen);
+ } else {
+ return true;
+ }
+ }
+
+ boolean readBoolean() {
+ return (buffer[offset++]) != 0;
+ }
+
+ short readShort() {
+ short v = (short) ((buffer[offset + 1] & 255) +
+ ((buffer[offset]) << 8));
+ offset += 2;
+ return v;
+ }
+
+ byte[] copy(int len) {
+ byte[] b = new byte[len];
+ System.arraycopy(buffer, offset, b, 0, len);
+ return b;
+ }
+
+ int readInt() {
+ int v = ((buffer[offset + 3] & 255) +
+ ((buffer[offset + 2] & 255) << 8) +
+ ((buffer[offset + 1] & 255) << 16) +
+ ((buffer[offset]) << 24));
+ offset += 4;
+ return v;
+ }
+
+ long readLong() {
+ long v = ((long)(buffer[offset + 7] & 255)) +
+ ((long) (buffer[offset + 6] & 255) << 8) +
+ ((long) (buffer[offset + 5] & 255) << 16) +
+ ((long) (buffer[offset + 4] & 255) << 24) +
+ ((long) (buffer[offset + 3] & 255) << 32) +
+ ((long) (buffer[offset + 2] & 255) << 40) +
+ ((long) (buffer[offset + 1] & 255) << 48) +
+ ((long) (buffer[offset]) << 56);
+ offset += 8;
+ return v;
+ }
+
+ double readDouble() {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ byte[] readBytes(int len) {
+ byte[] b = new byte[len];
+ System.arraycopy(buffer, offset, b, 0, len);
+ offset += len;
+ return b;
+ }
+
+ void skipBytes(int len) {
+ offset += len;
+ }
+
+ int getRowNums() {
+ return rowNums;
+ }
+
+ void close() {
+ CarbonUtil.closeStreams(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
new file mode 100644
index 0000000..509e2aa
--- /dev/null
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.BlockletHeader;
+import org.apache.carbondata.format.BlockletInfo;
+import org.apache.carbondata.format.MutationType;
+
+/**
+ * stream blocklet writer
+ */
+public class StreamBlockletWriter {
+ private byte[] buffer;
+ private int maxSize;
+ private int maxRowNum;
+ private int rowSize;
+ private int count = 0;
+ private int rowIndex = -1;
+ private Compressor compressor = CompressorFactory.getInstance().getCompressor();
+
+ StreamBlockletWriter(int maxSize, int maxRowNum, int rowSize) {
+ buffer = new byte[maxSize];
+ this.maxSize = maxSize;
+ this.maxRowNum = maxRowNum;
+ this.rowSize = rowSize;
+ }
+
+ private void ensureCapacity(int space) {
+ int newcount = space + count;
+ if (newcount > buffer.length) {
+ byte[] newbuf = new byte[Math.max(newcount, buffer.length + rowSize)];
+ System.arraycopy(buffer, 0, newbuf, 0, count);
+ buffer = newbuf;
+ }
+ }
+
+ void reset() {
+ count = 0;
+ rowIndex = -1;
+ }
+
+ byte[] getBytes() {
+ return buffer;
+ }
+
+ int getCount() {
+ return count;
+ }
+
+ int getRowIndex() {
+ return rowIndex;
+ }
+
+ void nextRow() {
+ rowIndex++;
+ }
+
+ boolean isFull() {
+ return rowIndex == maxRowNum || count >= maxSize;
+ }
+
+ void writeBoolean(boolean val) {
+ ensureCapacity(1);
+ buffer[count] = (byte) (val ? 1 : 0);
+ count += 1;
+ }
+
+ void writeShort(int val) {
+ ensureCapacity(2);
+ buffer[count + 1] = (byte) (val);
+ buffer[count] = (byte) (val >>> 8);
+ count += 2;
+ }
+
+ void writeInt(int val) {
+ ensureCapacity(4);
+ buffer[count + 3] = (byte) (val);
+ buffer[count + 2] = (byte) (val >>> 8);
+ buffer[count + 1] = (byte) (val >>> 16);
+ buffer[count] = (byte) (val >>> 24);
+ count += 4;
+ }
+
+ void writeLong(long val) {
+ ensureCapacity(8);
+ buffer[count + 7] = (byte) (val);
+ buffer[count + 6] = (byte) (val >>> 8);
+ buffer[count + 5] = (byte) (val >>> 16);
+ buffer[count + 4] = (byte) (val >>> 24);
+ buffer[count + 3] = (byte) (val >>> 32);
+ buffer[count + 2] = (byte) (val >>> 40);
+ buffer[count + 1] = (byte) (val >>> 48);
+ buffer[count] = (byte) (val >>> 56);
+ count += 8;
+ }
+
+ void writeDouble(double val) {
+ writeLong(Double.doubleToLongBits(val));
+ }
+
+ void writeBytes(byte[] b) {
+ writeBytes(b, 0, b.length);
+ }
+
+ void writeBytes(byte[] b, int off, int len) {
+ ensureCapacity(len);
+ System.arraycopy(b, off, buffer, count, len);
+ count += len;
+ }
+
+ void apppendBlocklet(DataOutputStream outputStream) throws IOException {
+ outputStream.write(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
+
+ BlockletInfo blockletInfo = new BlockletInfo();
+ blockletInfo.setNum_rows(getRowIndex() + 1);
+ BlockletHeader blockletHeader = new BlockletHeader();
+ blockletHeader.setBlocklet_length(getCount());
+ blockletHeader.setMutation(MutationType.INSERT);
+ blockletHeader.setBlocklet_info(blockletInfo);
+ byte[] headerBytes = CarbonUtil.getByteArray(blockletHeader);
+ outputStream.writeInt(headerBytes.length);
+ outputStream.write(headerBytes);
+
+ byte[] compressed = compressor.compressByte(getBytes(), getCount());
+ outputStream.writeInt(compressed.length);
+ outputStream.write(compressed);
+ }
+
+ void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index 8c9889d..9e83924 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -42,8 +42,8 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
import org.apache.carbondata.format.BlockIndex;
import org.apache.carbondata.format.BlockletIndex;
-import org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.streaming.CarbonStreamRecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala
deleted file mode 100644
index 6d1fa45..0000000
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.streaming
-
-import org.apache.spark.scheduler.SparkListener
-import org.apache.spark.scheduler.SparkListenerApplicationEnd
-
-class CarbonSparkStreamingListener extends SparkListener {
-
- /**
- * When Spark Streaming App stops, remove all locks for stream table.
- */
- override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
- CarbonStreamSparkStreaming.cleanAllLockAfterStop()
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
deleted file mode 100644
index 4aa1517..0000000
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.streaming
-
-import java.util
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink
-import org.apache.spark.sql.execution.streaming.Sink
-import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.Time
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-
-/**
- * Interface used to write stream data to stream table
- * when integrate with Spark Streaming.
- *
- * NOTE: Current integration with Spark Streaming is an alpha feature.
- */
-class CarbonStreamSparkStreamingWriter(val sparkSession: SparkSession,
- val carbonTable: CarbonTable,
- val configuration: Configuration) {
-
- private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- private var isInitialize: Boolean = false
-
- private var lock: ICarbonLock = null
- private var carbonAppendableStreamSink: Sink = null
-
- /**
- * Acquired the lock for stream table
- */
- def lockStreamTable(): Unit = {
- lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
- LockUsage.STREAMING_LOCK)
- if (lock.lockWithRetries()) {
- LOGGER.info("Acquired the lock for stream table: " +
- carbonTable.getDatabaseName + "." +
- carbonTable.getTableName)
- } else {
- LOGGER.error("Not able to acquire the lock for stream table:" +
- carbonTable.getDatabaseName + "." + carbonTable.getTableName)
- throw new InterruptedException(
- "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." +
- carbonTable.getTableName)
- }
- }
-
- /**
- * unlock for stream table
- */
- def unLockStreamTable(): Unit = {
- if (null != lock) {
- lock.unlock()
- LOGGER.info("unlock for stream table: " +
- carbonTable.getDatabaseName + "." +
- carbonTable.getTableName)
- }
- }
-
- def initialize(): Unit = {
- carbonAppendableStreamSink = StreamSinkFactory.createStreamTableSink(
- sparkSession,
- configuration,
- carbonTable,
- extraOptions.toMap).asInstanceOf[CarbonAppendableStreamSink]
-
- lockStreamTable()
-
- isInitialize = true
- }
-
- def writeStreamData(dataFrame: DataFrame, time: Time): Unit = {
- if (!isInitialize) {
- initialize()
- }
- carbonAppendableStreamSink.addBatch(time.milliseconds, dataFrame)
- }
-
- private val extraOptions = new scala.collection.mutable.HashMap[String, String]
- private var mode: SaveMode = SaveMode.ErrorIfExists
-
- this.option("dbName", carbonTable.getDatabaseName)
- this.option("tableName", carbonTable.getTableName)
-
- /**
- * Specifies the behavior when data or table already exists. Options include:
- * - `SaveMode.Overwrite`: overwrite the existing data.
- * - `SaveMode.Append`: append the data.
- * - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
- * - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
- */
- def mode(saveMode: SaveMode): CarbonStreamSparkStreamingWriter = {
- if (mode == SaveMode.ErrorIfExists) {
- mode = saveMode
- }
- this
- }
-
- /**
- * Specifies the behavior when data or table already exists. Options include:
- * - `overwrite`: overwrite the existing data.
- * - `append`: append the data.
- * - `ignore`: ignore the operation (i.e. no-op).
- * - `error or default`: default option, throw an exception at runtime.
- */
- def mode(saveMode: String): CarbonStreamSparkStreamingWriter = {
- if (mode == SaveMode.ErrorIfExists) {
- mode = saveMode.toLowerCase(util.Locale.ROOT) match {
- case "overwrite" => SaveMode.Overwrite
- case "append" => SaveMode.Append
- case "ignore" => SaveMode.Ignore
- case "error" | "default" => SaveMode.ErrorIfExists
- case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
- "Accepted save modes are 'overwrite', 'append', 'ignore', 'error'.")
- }
- }
- this
- }
-
- /**
- * Adds an output option
- */
- def option(key: String, value: String): CarbonStreamSparkStreamingWriter = {
- if (!extraOptions.contains(key)) {
- extraOptions += (key -> value)
- }
- this
- }
-
- /**
- * Adds an output option
- */
- def option(key: String, value: Boolean): CarbonStreamSparkStreamingWriter =
- option(key, value.toString)
-
- /**
- * Adds an output option
- */
- def option(key: String, value: Long): CarbonStreamSparkStreamingWriter =
- option(key, value.toString)
-
- /**
- * Adds an output option
- */
- def option(key: String, value: Double): CarbonStreamSparkStreamingWriter =
- option(key, value.toString)
-}
-
-object CarbonStreamSparkStreaming {
-
- @transient private val tableMap =
- new util.HashMap[String, CarbonStreamSparkStreamingWriter]()
-
- def getTableMap: util.Map[String, CarbonStreamSparkStreamingWriter] = tableMap
-
- /**
- * remove all stream lock.
- */
- def cleanAllLockAfterStop(): Unit = {
- tableMap.asScala.values.foreach { writer => writer.unLockStreamTable() }
- tableMap.clear()
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
deleted file mode 100644
index 4df04b9..0000000
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.streaming
-
-import java.text.SimpleDateFormat
-import java.util
-import java.util.Date
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType}
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.SparkSession
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.Segment
-import org.apache.carbondata.core.datastore.block.SegmentProperties
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
-import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
-import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
-import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
-import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl}
-import org.apache.carbondata.spark.rdd.CarbonRDD
-import org.apache.carbondata.spark.util.CommonUtil
-
-
-/**
- * partition of the handoff segment
- */
-class HandoffPartition(
- val rddId: Int,
- val idx: Int,
- @transient val inputSplit: CarbonInputSplit
-) extends Partition {
-
- val split = new SerializableWritable[CarbonInputSplit](inputSplit)
-
- override val index: Int = idx
-
- override def hashCode(): Int = 41 * (41 + rddId) + idx
-}
-
-/**
- * package the record reader of the handoff segment to RawResultIterator
- */
-class StreamingRawResultIterator(
- recordReader: CarbonStreamRecordReader
-) extends RawResultIterator(null, null, null) {
-
- override def hasNext: Boolean = {
- recordReader.nextKeyValue()
- }
-
- override def next(): Array[Object] = {
- val rowTmp = recordReader
- .getCurrentValue
- .asInstanceOf[GenericInternalRow]
- .values
- .asInstanceOf[Array[Object]]
- val row = new Array[Object](rowTmp.length)
- System.arraycopy(rowTmp, 0, row, 0, rowTmp.length)
- row
- }
-}
-
-/**
- * execute streaming segment handoff
- */
-class StreamHandoffRDD[K, V](
- sc: SparkContext,
- result: HandoffResult[K, V],
- carbonLoadModel: CarbonLoadModel,
- handOffSegmentId: String
-) extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) {
-
- private val jobTrackerId: String = {
- val formatter = new SimpleDateFormat("yyyyMMddHHmm")
- formatter.format(new Date())
- }
-
- override def internalCompute(
- split: Partition,
- context: TaskContext
- ): Iterator[(K, V)] = {
- carbonLoadModel.setTaskNo("" + split.index)
- val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- CarbonMetadata.getInstance().addCarbonTable(carbonTable)
- // the input iterator is using raw row
- val iteratorList = prepareInputIterator(split, carbonTable)
-
- CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, true, false)
- // use CompactionResultSortProcessor to sort data dan write to columnar files
- val processor = prepareHandoffProcessor(carbonTable)
- val status = processor.execute(iteratorList)
-
- new Iterator[(K, V)] {
- private var finished = false
-
- override def hasNext: Boolean = {
- !finished
- }
-
- override def next(): (K, V) = {
- finished = true
- result.getKey("" + split.index, status)
- }
- }
- }
-
- /**
- * prepare input iterator by basing CarbonStreamRecordReader
- */
- private def prepareInputIterator(
- split: Partition,
- carbonTable: CarbonTable
- ): util.ArrayList[RawResultIterator] = {
- val inputSplit = split.asInstanceOf[HandoffPartition].split.value
- val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
- val hadoopConf = new Configuration()
- CarbonInputFormat.setDatabaseName(hadoopConf, carbonTable.getDatabaseName)
- CarbonInputFormat.setTableName(hadoopConf, carbonTable.getTableName)
- CarbonInputFormat.setTablePath(hadoopConf, carbonTable.getTablePath)
- val projection = new CarbonProjection
- val dataFields = carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName)
- (0 until dataFields.size()).foreach { index =>
- projection.addColumn(dataFields.get(index).getColName)
- }
- CarbonInputFormat.setColumnProjection(hadoopConf, projection)
- CarbonInputFormat.setTableInfo(hadoopConf, carbonTable.getTableInfo)
- val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
- val format = new CarbonTableInputFormat[Array[Object]]()
- val model = format.createQueryModel(inputSplit, attemptContext)
- val inputFormat = new CarbonStreamInputFormat
- val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
- .asInstanceOf[CarbonStreamRecordReader]
- streamReader.setVectorReader(false)
- streamReader.setQueryModel(model)
- streamReader.setUseRawRow(true)
- streamReader.initialize(inputSplit, attemptContext)
- val iteratorList = new util.ArrayList[RawResultIterator](1)
- iteratorList.add(new StreamingRawResultIterator(streamReader))
- iteratorList
- }
-
- private def prepareHandoffProcessor(
- carbonTable: CarbonTable
- ): CompactionResultSortProcessor = {
- val wrapperColumnSchemaList = CarbonUtil.getColumnSchemaList(
- carbonTable.getDimensionByTableName(carbonTable.getTableName),
- carbonTable.getMeasureByTableName(carbonTable.getTableName))
- val dimLensWithComplex =
- (0 until wrapperColumnSchemaList.size()).map(_ => Integer.MAX_VALUE).toArray
- val dictionaryColumnCardinality =
- CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList)
- val segmentProperties =
- new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality)
-
- new CompactionResultSortProcessor(
- carbonLoadModel,
- carbonTable,
- segmentProperties,
- CompactionType.STREAMING,
- carbonTable.getTableName,
- null
- )
- }
-
- /**
- * get the partitions of the handoff segment
- */
- override protected def getPartitions: Array[Partition] = {
- val job = Job.getInstance(FileFactory.getConfiguration)
- val inputFormat = new CarbonTableInputFormat[Array[Object]]()
- val segmentList = new util.ArrayList[Segment](1)
- segmentList.add(Segment.toSegment(handOffSegmentId))
- val splits = inputFormat.getSplitsOfStreaming(
- job,
- carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier,
- segmentList
- )
-
- (0 until splits.size()).map { index =>
- new HandoffPartition(id, index, splits.get(index).asInstanceOf[CarbonInputSplit])
- }.toArray[Partition]
- }
-}
-
-object StreamHandoffRDD {
-
- private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- def iterateStreamingHandoff(
- carbonLoadModel: CarbonLoadModel,
- sparkSession: SparkSession
- ): Unit = {
- val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val identifier = carbonTable.getAbsoluteTableIdentifier
- var continueHandoff = false
- // require handoff lock on table
- val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK)
- try {
- if (lock.lockWithRetries()) {
- LOGGER.info("Acquired the handoff lock for table" +
- s" ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }")
- // handoff streaming segment one by one
- do {
- val segmentStatusManager = new SegmentStatusManager(identifier)
- var loadMetadataDetails: Array[LoadMetadataDetails] = null
- // lock table to read table status file
- val statusLock = segmentStatusManager.getTableStatusLock
- try {
- if (statusLock.lockWithRetries()) {
- loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
- CarbonTablePath.getMetadataPath(identifier.getTablePath))
- }
- } finally {
- if (null != statusLock) {
- statusLock.unlock()
- }
- }
- if (null != loadMetadataDetails) {
- val streamSegments =
- loadMetadataDetails.filter(_.getSegmentStatus == SegmentStatus.STREAMING_FINISH)
-
- continueHandoff = streamSegments.length > 0
- if (continueHandoff) {
- // handoff a streaming segment
- val loadMetadataDetail = streamSegments(0)
- executeStreamingHandoff(
- carbonLoadModel,
- sparkSession,
- loadMetadataDetail.getLoadName
- )
- }
- } else {
- continueHandoff = false
- }
- } while (continueHandoff)
- }
- } finally {
- if (null != lock) {
- lock.unlock()
- }
- }
- }
-
- /**
- * start new thread to execute stream segment handoff
- */
- def startStreamingHandoffThread(
- carbonLoadModel: CarbonLoadModel,
- sparkSession: SparkSession,
- isDDL: Boolean
- ): Unit = {
- if (isDDL) {
- iterateStreamingHandoff(carbonLoadModel, sparkSession)
- } else {
- // start a new thread to execute streaming segment handoff
- val handoffThread = new Thread() {
- override def run(): Unit = {
- iterateStreamingHandoff(carbonLoadModel, sparkSession)
- }
- }
- handoffThread.start()
- }
- }
-
- /**
- * invoke StreamHandoffRDD to handoff a streaming segment to a columnar segment
- */
- def executeStreamingHandoff(
- carbonLoadModel: CarbonLoadModel,
- sparkSession: SparkSession,
- handoffSegmenId: String
- ): Unit = {
- var loadStatus = SegmentStatus.SUCCESS
- var errorMessage: String = "Handoff failure"
- try {
- // generate new columnar segment
- val newMetaEntry = new LoadMetadataDetails
- carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
- CarbonLoaderUtil.populateNewLoadMetaEntry(
- newMetaEntry,
- SegmentStatus.INSERT_IN_PROGRESS,
- carbonLoadModel.getFactTimeStamp,
- false)
- val operationContext = new OperationContext()
- val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
- new LoadTablePreStatusUpdateEvent(
- carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCarbonTableIdentifier,
- carbonLoadModel)
- OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
-
- CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, carbonLoadModel, true, false)
- val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
- new LoadTablePostStatusUpdateEvent(carbonLoadModel)
- OperationListenerBus.getInstance()
- .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
- // convert a streaming segment to columnar segment
- val status = new StreamHandoffRDD(
- sparkSession.sparkContext,
- new HandoffResultImpl(),
- carbonLoadModel,
- handoffSegmenId).collect()
-
- status.foreach { x =>
- if (!x._2) {
- loadStatus = SegmentStatus.LOAD_FAILURE
- }
- }
- } catch {
- case ex: Exception =>
- loadStatus = SegmentStatus.LOAD_FAILURE
- errorMessage = errorMessage + ": " + ex.getCause.getMessage
- LOGGER.error(errorMessage)
- LOGGER.error(ex, s"Handoff failed on streaming segment $handoffSegmenId")
- }
-
- if (loadStatus == SegmentStatus.LOAD_FAILURE) {
- CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel)
- LOGGER.info("********starting clean up**********")
- CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
- LOGGER.info("********clean up done**********")
- LOGGER.audit(s"Handoff is failed for " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- LOGGER.error("Cannot write load metadata file as handoff failed")
- throw new Exception(errorMessage)
- }
-
- if (loadStatus == SegmentStatus.SUCCESS) {
- val done = updateLoadMetadata(handoffSegmenId, carbonLoadModel)
- if (!done) {
- val errorMessage = "Handoff failed due to failure in table status updation."
- LOGGER.audit("Handoff is failed for " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- LOGGER.error("Handoff failed due to failure in table status updation.")
- throw new Exception(errorMessage)
- }
- done
- }
-
- }
-
- /**
- * update streaming segment and new columnar segment
- */
- private def updateLoadMetadata(
- handoffSegmentId: String,
- loadModel: CarbonLoadModel
- ): Boolean = {
- var status = false
- val metaDataFilepath = loadModel.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath
- val identifier = loadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier
- val metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath)
- val fileType = FileFactory.getFileType(metadataPath)
- if (!FileFactory.isFileExist(metadataPath, fileType)) {
- FileFactory.mkdirs(metadataPath, fileType)
- }
- val tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath)
- val segmentStatusManager = new SegmentStatusManager(identifier)
- val carbonLock = segmentStatusManager.getTableStatusLock
- try {
- if (carbonLock.lockWithRetries()) {
- LOGGER.info(
- "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
- + " for table status updation")
- val listOfLoadFolderDetailsArray =
- SegmentStatusManager.readLoadMetadata(metaDataFilepath)
-
- // update new columnar segment to success status
- val newSegment =
- listOfLoadFolderDetailsArray.find(_.getLoadName.equals(loadModel.getSegmentId))
- if (newSegment.isEmpty) {
- throw new Exception("Failed to update table status for new segment")
- } else {
- newSegment.get.setSegmentStatus(SegmentStatus.SUCCESS)
- newSegment.get.setLoadEndTime(System.currentTimeMillis())
- }
-
- // update streaming segment to compacted status
- val streamSegment =
- listOfLoadFolderDetailsArray.find(_.getLoadName.equals(handoffSegmentId))
- if (streamSegment.isEmpty) {
- throw new Exception("Failed to update table status for streaming segment")
- } else {
- streamSegment.get.setSegmentStatus(SegmentStatus.COMPACTED)
- streamSegment.get.setMergedLoadName(loadModel.getSegmentId)
- }
-
- // refresh table status file
- SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray)
- status = true
- } else {
- LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel
- .getDatabaseName() + "." + loadModel.getTableName())
- }
- } finally {
- if (carbonLock.unlock()) {
- LOGGER.info("Table unlocked successfully after table status updation" +
- loadModel.getDatabaseName() + "." + loadModel.getTableName())
- } else {
- LOGGER.error("Unable to unlock Table lock for table" + loadModel.getDatabaseName() +
- "." + loadModel.getTableName() + " during table status updation")
- }
- }
- status
- }
-}