You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/28 03:19:46 UTC
[5/5] carbondata git commit: [CARBONDATA-2165]Remove spark in
carbon-hadoop module
[CARBONDATA-2165]Remove spark in carbon-hadoop module
1. Streaming relation RecordReader is moved to carbon-streaming module.
2. RDD related class is moved to carbon-spark2 module
This closes #2074
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c723947a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c723947a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c723947a
Branch: refs/heads/master
Commit: c723947a79332c66175f5a33cf57f08fe70fe1a9
Parents: 2e1ddb5
Author: Jacky Li <ja...@qq.com>
Authored: Sat Mar 17 18:13:08 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Mar 28 11:19:23 2018 +0800
----------------------------------------------------------------------
.../carbondata/core/util/DataTypeConverter.java | 3 +
.../core/util/DataTypeConverterImpl.java | 5 +
hadoop/CARBON_HADOOPLogResource.properties | 18 -
hadoop/pom.xml | 7 -
.../readsupport/impl/RawDataReadSupport.java | 42 -
.../streaming/CarbonStreamInputFormat.java | 115 ---
.../streaming/CarbonStreamOutputFormat.java | 87 ---
.../streaming/CarbonStreamRecordReader.java | 759 ------------------
.../streaming/CarbonStreamRecordWriter.java | 325 --------
.../hadoop/streaming/StreamBlockletReader.java | 259 -------
.../hadoop/streaming/StreamBlockletWriter.java | 152 ----
.../hadoop/testutil/StoreCreator.java | 495 ++++++++++++
.../carbondata/hadoop/util/CarbonTypeUtil.java | 101 ---
.../hadoop/ft/CarbonTableInputFormatTest.java | 6 +-
.../hadoop/ft/CarbonTableOutputFormatTest.java | 2 +-
.../streaming/CarbonStreamInputFormatTest.java | 99 ---
.../streaming/CarbonStreamOutputFormatTest.java | 121 ---
.../hadoop/test/util/StoreCreator.java | 492 ------------
integration/spark-common/pom.xml | 2 +-
.../spark/util/SparkDataTypeConverterImpl.java | 81 ++
.../carbondata/spark/rdd/CarbonScanRDD.scala | 3 +-
.../carbondata/spark/rdd/StreamHandoffRDD.scala | 435 +++++++++++
.../carbondata/spark/util/CarbonScalaUtil.scala | 52 +-
.../CarbonSparkStreamingListener.scala | 30 +
.../streaming/CarbonStreamSparkStreaming.scala | 184 +++++
.../CarbonStreamingQueryListener.scala | 77 ++
.../streaming/StreamSinkFactory.scala | 236 ++++++
.../streaming/CarbonAppendableStreamSink.scala | 362 +++++++++
.../spark/sql/test/TestQueryExecutor.scala | 4 +-
integration/spark2/pom.xml | 2 +-
.../org/apache/spark/sql/CarbonSession.scala | 2 +-
.../CarbonAlterTableCompactionCommand.scala | 3 +-
streaming/pom.xml | 9 +-
.../streaming/CarbonStreamInputFormat.java | 115 +++
.../streaming/CarbonStreamOutputFormat.java | 87 +++
.../streaming/CarbonStreamRecordReader.java | 761 +++++++++++++++++++
.../streaming/CarbonStreamRecordWriter.java | 325 ++++++++
.../streaming/StreamBlockletReader.java | 259 +++++++
.../streaming/StreamBlockletWriter.java | 152 ++++
.../streaming/segment/StreamSegment.java | 2 +-
.../CarbonSparkStreamingListener.scala | 31 -
.../streaming/CarbonStreamSparkStreaming.scala | 187 -----
.../carbondata/streaming/StreamHandoffRDD.scala | 436 -----------
.../streaming/StreamSinkFactory.scala | 236 ------
.../streaming/parser/FieldConverter.scala | 95 +++
.../streaming/parser/RowStreamParserImp.scala | 7 +-
.../streaming/CarbonAppendableStreamSink.scala | 362 ---------
.../CarbonStreamingQueryListener.scala | 77 --
.../streaming/CarbonStreamInputFormatTest.java | 99 +++
.../streaming/CarbonStreamOutputFormatTest.java | 121 +++
50 files changed, 3948 insertions(+), 3974 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java
index 7c63860..474493a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverter.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.core.util;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+
public interface DataTypeConverter {
Object convertFromStringToDecimal(Object data);
@@ -31,4 +33,5 @@ public interface DataTypeConverter {
Object wrapWithGenericArrayData(Object data);
Object wrapWithGenericRow(Object[] fields);
+ Object[] convertCarbonSchemaToSparkSchema(CarbonColumn[] carbonColumns);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
index ea5740d..a4f571e 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
import java.math.BigDecimal;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
public class DataTypeConverterImpl implements DataTypeConverter, Serializable {
@@ -91,4 +92,8 @@ public class DataTypeConverterImpl implements DataTypeConverter, Serializable {
return fields;
}
+ @Override
+ public Object[] convertCarbonSchemaToSparkSchema(CarbonColumn[] carbonColumns) {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/CARBON_HADOOPLogResource.properties
----------------------------------------------------------------------
diff --git a/hadoop/CARBON_HADOOPLogResource.properties b/hadoop/CARBON_HADOOPLogResource.properties
deleted file mode 100644
index 135a578..0000000
--- a/hadoop/CARBON_HADOOPLogResource.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-carbon.hadoop = {0}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop/pom.xml b/hadoop/pom.xml
index 916b9db..41e2822 100644
--- a/hadoop/pom.xml
+++ b/hadoop/pom.xml
@@ -40,10 +40,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
@@ -55,9 +51,6 @@
<resources>
<resource>
<directory>.</directory>
- <includes>
- <include>CARBON_HADOOPLogResource.properties</include>
- </includes>
</resource>
</resources>
<plugins>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
deleted file mode 100644
index b2cd450..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.readsupport.impl;
-
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-
-public class RawDataReadSupport implements CarbonReadSupport<InternalRow> {
-
- @Override
- public void initialize(CarbonColumn[] carbonColumns, CarbonTable carbonTable) { }
-
- /**
- * return column data as InternalRow
- *
- * @param data column data
- */
- @Override
- public InternalRow readRow(Object[] data) {
- return new GenericInternalRow(data);
- }
-
- @Override public void close() { }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
deleted file mode 100644
index a6e9563..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormat.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.streaming;
-
-import java.io.IOException;
-
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.scan.complextypes.ArrayQueryType;
-import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType;
-import org.apache.carbondata.core.scan.complextypes.StructQueryType;
-import org.apache.carbondata.core.scan.filter.GenericQueryType;
-import org.apache.carbondata.core.util.CarbonUtil;
-
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- * Stream input format
- */
-public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> {
-
- public static final String READ_BUFFER_SIZE = "carbon.stream.read.buffer.size";
- public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
-
- @Override public RecordReader<Void, Object> createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException, InterruptedException {
- return new CarbonStreamRecordReader();
- }
-
- public static GenericQueryType[] getComplexDimensions(CarbonTable carbontable,
- CarbonColumn[] carbonColumns, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache)
- throws IOException {
- GenericQueryType[] queryTypes = new GenericQueryType[carbonColumns.length];
- for (int i = 0; i < carbonColumns.length; i++) {
- if (carbonColumns[i].isComplex()) {
- if (DataTypes.isArrayType(carbonColumns[i].getDataType())) {
- queryTypes[i] = new ArrayQueryType(carbonColumns[i].getColName(),
- carbonColumns[i].getColName(), i);
- } else if (DataTypes.isStructType(carbonColumns[i].getDataType())) {
- queryTypes[i] = new StructQueryType(carbonColumns[i].getColName(),
- carbonColumns[i].getColName(), i);
- } else {
- throw new UnsupportedOperationException(
- carbonColumns[i].getDataType().getName() + " is not supported");
- }
-
- fillChildren(carbontable, queryTypes[i], (CarbonDimension) carbonColumns[i], i, cache);
- }
- }
-
- return queryTypes;
- }
-
- private static void fillChildren(CarbonTable carbontable, GenericQueryType parentQueryType,
- CarbonDimension dimension, int parentBlockIndex,
- Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache) throws IOException {
- for (int i = 0; i < dimension.getNumberOfChild(); i++) {
- CarbonDimension child = dimension.getListOfChildDimensions().get(i);
- DataType dataType = child.getDataType();
- GenericQueryType queryType = null;
- if (DataTypes.isArrayType(dataType)) {
- queryType =
- new ArrayQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex);
-
- } else if (DataTypes.isStructType(dataType)) {
- queryType =
- new StructQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex);
- parentQueryType.addChildren(queryType);
- } else {
- boolean isDirectDictionary =
- CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DIRECT_DICTIONARY);
- String dictionaryPath = carbontable.getTableInfo().getFactTable().getTableProperties()
- .get(CarbonCommonConstants.DICTIONARY_PATH);
- DictionaryColumnUniqueIdentifier dictionarIdentifier =
- new DictionaryColumnUniqueIdentifier(carbontable.getAbsoluteTableIdentifier(),
- child.getColumnIdentifier(), child.getDataType(), dictionaryPath);
-
- queryType =
- new PrimitiveQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex,
- child.getDataType(), 4, cache.get(dictionarIdentifier),
- isDirectDictionary);
- }
- parentQueryType.addChildren(queryType);
- if (child.getNumberOfChild() > 0) {
- fillChildren(carbontable, queryType, child, parentBlockIndex, cache);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
deleted file mode 100644
index 2599fa7..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormat.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.streaming;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-/**
- * Stream output format
- */
-public class CarbonStreamOutputFormat extends FileOutputFormat<Void, Object> {
-
- static final byte[] CARBON_SYNC_MARKER =
- "@carbondata_sync".getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
-
- public static final String CARBON_ENCODER_ROW_BUFFER_SIZE = "carbon.stream.row.buffer.size";
-
- public static final int CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT = 1024;
-
- public static final String CARBON_STREAM_BLOCKLET_ROW_NUMS = "carbon.stream.blocklet.row.nums";
-
- public static final int CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT = 32000;
-
- public static final String CARBON_STREAM_CACHE_SIZE = "carbon.stream.cache.size";
-
- public static final int CARBON_STREAM_CACHE_SIZE_DEFAULT = 32 * 1024 * 1024;
-
- private static final String LOAD_Model = "mapreduce.output.carbon.load.model";
-
- private static final String SEGMENT_ID = "carbon.segment.id";
-
- @Override public RecordWriter<Void, Object> getRecordWriter(TaskAttemptContext job)
- throws IOException, InterruptedException {
- return new CarbonStreamRecordWriter(job);
- }
-
- public static void setCarbonLoadModel(Configuration hadoopConf, CarbonLoadModel carbonLoadModel)
- throws IOException {
- if (carbonLoadModel != null) {
- hadoopConf.set(LOAD_Model, ObjectSerializationUtil.convertObjectToString(carbonLoadModel));
- }
- }
-
- public static CarbonLoadModel getCarbonLoadModel(Configuration hadoopConf) throws IOException {
- String value = hadoopConf.get(LOAD_Model);
- if (value == null) {
- return null;
- } else {
- return (CarbonLoadModel) ObjectSerializationUtil.convertStringToObject(value);
- }
- }
-
- public static void setSegmentId(Configuration hadoopConf, String segmentId) throws IOException {
- if (segmentId != null) {
- hadoopConf.set(SEGMENT_ID, segmentId);
- }
- }
-
- public static String getSegmentId(Configuration hadoopConf) throws IOException {
- return hadoopConf.get(SEGMENT_ID);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
deleted file mode 100644
index 1e227c4..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordReader.java
+++ /dev/null
@@ -1,759 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.streaming;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.reader.CarbonHeaderReader;
-import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
-import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.GenericQueryType;
-import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
-import org.apache.carbondata.core.scan.filter.intf.RowImpl;
-import org.apache.carbondata.core.scan.filter.intf.RowIntf;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.format.BlockletHeader;
-import org.apache.carbondata.format.FileHeader;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.InputMetricsStats;
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-import org.apache.carbondata.hadoop.util.CarbonTypeUtil;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.execution.vectorized.ColumnVector;
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.CalendarIntervalType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.CalendarInterval;
-import org.apache.spark.unsafe.types.UTF8String;
-
-/**
- * Stream record reader
- */
-public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
- // vector reader
- private boolean isVectorReader;
-
- // metadata
- private CarbonTable carbonTable;
- private CarbonColumn[] storageColumns;
- private boolean[] isRequired;
- private DataType[] measureDataTypes;
- private int dimensionCount;
- private int measureCount;
-
- // input
- private FileSplit fileSplit;
- private Configuration hadoopConf;
- private StreamBlockletReader input;
- private boolean isFirstRow = true;
- private QueryModel model;
-
- // decode data
- private BitSet allNonNull;
- private boolean[] isNoDictColumn;
- private DirectDictionaryGenerator[] directDictionaryGenerators;
- private CacheProvider cacheProvider;
- private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
- private GenericQueryType[] queryTypes;
-
- // vectorized reader
- private StructType outputSchema;
- private ColumnarBatch columnarBatch;
- private boolean isFinished = false;
-
- // filter
- private FilterExecuter filter;
- private boolean[] isFilterRequired;
- private Object[] filterValues;
- private RowIntf filterRow;
- private int[] filterMap;
-
- // output
- private CarbonColumn[] projection;
- private boolean[] isProjectionRequired;
- private int[] projectionMap;
- private Object[] outputValues;
- private InternalRow outputRow;
-
- // empty project, null filter
- private boolean skipScanData;
-
- // return raw row for handoff
- private boolean useRawRow = false;
-
- // InputMetricsStats
- private InputMetricsStats inputMetricsStats;
-
- @Override public void initialize(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- // input
- if (split instanceof CarbonInputSplit) {
- fileSplit = (CarbonInputSplit) split;
- } else if (split instanceof CarbonMultiBlockSplit) {
- fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0);
- } else {
- fileSplit = (FileSplit) split;
- }
-
- // metadata
- hadoopConf = context.getConfiguration();
- if (model == null) {
- CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
- model = format.createQueryModel(split, context);
- }
- carbonTable = model.getTable();
- List<CarbonDimension> dimensions =
- carbonTable.getDimensionByTableName(carbonTable.getTableName());
- dimensionCount = dimensions.size();
- List<CarbonMeasure> measures =
- carbonTable.getMeasureByTableName(carbonTable.getTableName());
- measureCount = measures.size();
- List<CarbonColumn> carbonColumnList =
- carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName());
- storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]);
- isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns);
- directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length];
- for (int i = 0; i < storageColumns.length; i++) {
- if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
- directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
- .getDirectDictionaryGenerator(storageColumns[i].getDataType());
- }
- }
- measureDataTypes = new DataType[measureCount];
- for (int i = 0; i < measureCount; i++) {
- measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType();
- }
-
- // decode data
- allNonNull = new BitSet(storageColumns.length);
- projection = model.getProjectionColumns();
-
- isRequired = new boolean[storageColumns.length];
- boolean[] isFiltlerDimensions = model.getIsFilterDimensions();
- boolean[] isFiltlerMeasures = model.getIsFilterMeasures();
- isFilterRequired = new boolean[storageColumns.length];
- filterMap = new int[storageColumns.length];
- for (int i = 0; i < storageColumns.length; i++) {
- if (storageColumns[i].isDimension()) {
- if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) {
- isRequired[i] = true;
- isFilterRequired[i] = true;
- filterMap[i] = storageColumns[i].getOrdinal();
- }
- } else {
- if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) {
- isRequired[i] = true;
- isFilterRequired[i] = true;
- filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal();
- }
- }
- }
-
- isProjectionRequired = new boolean[storageColumns.length];
- projectionMap = new int[storageColumns.length];
- for (int i = 0; i < storageColumns.length; i++) {
- for (int j = 0; j < projection.length; j++) {
- if (storageColumns[i].getColName().equals(projection[j].getColName())) {
- isRequired[i] = true;
- isProjectionRequired[i] = true;
- projectionMap[i] = j;
- break;
- }
- }
- }
-
- // initialize filter
- if (null != model.getFilterExpressionResolverTree()) {
- initializeFilter();
- } else if (projection.length == 0) {
- skipScanData = true;
- }
-
- }
-
- private void initializeFilter() {
-
- List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
- .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
- carbonTable.getMeasureByTableName(carbonTable.getTableName()));
- int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
- for (int i = 0; i < dimLensWithComplex.length; i++) {
- dimLensWithComplex[i] = Integer.MAX_VALUE;
- }
-
- int[] dictionaryColumnCardinality =
- CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
- SegmentProperties segmentProperties =
- new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality);
- Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
-
- FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree();
- filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties,
- complexDimensionInfoMap);
- // for row filter, we need update column index
- FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
- carbonTable.getDimensionOrdinalMax());
-
- }
-
- public void setQueryModel(QueryModel model) {
- this.model = model;
- }
-
- private byte[] getSyncMarker(String filePath) throws IOException {
- CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath);
- FileHeader header = headerReader.readHeader();
- return header.getSync_marker();
- }
-
- public void setUseRawRow(boolean useRawRow) {
- this.useRawRow = useRawRow;
- }
-
- private void initializeAtFirstRow() throws IOException {
- filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount];
- filterRow = new RowImpl();
- filterRow.setValues(filterValues);
-
- outputValues = new Object[projection.length];
- outputRow = new GenericInternalRow(outputValues);
-
- Path file = fileSplit.getPath();
-
- byte[] syncMarker = getSyncMarker(file.toString());
-
- FileSystem fs = file.getFileSystem(hadoopConf);
-
- int bufferSize = Integer.parseInt(hadoopConf.get(CarbonStreamInputFormat.READ_BUFFER_SIZE,
- CarbonStreamInputFormat.READ_BUFFER_SIZE_DEFAULT));
-
- FSDataInputStream fileIn = fs.open(file, bufferSize);
- fileIn.seek(fileSplit.getStart());
- input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(),
- fileSplit.getStart() == 0);
-
- cacheProvider = CacheProvider.getInstance();
- cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY);
- queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache);
-
- outputSchema = new StructType(CarbonTypeUtil.convertCarbonSchemaToSparkSchema(projection));
- }
-
- @Override public boolean nextKeyValue() throws IOException, InterruptedException {
- if (isFirstRow) {
- isFirstRow = false;
- initializeAtFirstRow();
- }
- if (isFinished) {
- return false;
- }
-
- if (isVectorReader) {
- return nextColumnarBatch();
- }
-
- return nextRow();
- }
-
- /**
- * for vector reader, check next columnar batch
- */
- private boolean nextColumnarBatch() throws IOException {
- boolean hasNext;
- boolean scanMore = false;
- do {
- // move to the next blocklet
- hasNext = input.nextBlocklet();
- if (hasNext) {
- // read blocklet header
- BlockletHeader header = input.readBlockletHeader();
- if (isScanRequired(header)) {
- scanMore = !scanBlockletAndFillVector(header);
- } else {
- input.skipBlockletData(true);
- scanMore = true;
- }
- } else {
- isFinished = true;
- scanMore = false;
- }
- } while (scanMore);
- return hasNext;
- }
-
- /**
- * check next Row
- */
- private boolean nextRow() throws IOException {
- // read row one by one
- try {
- boolean hasNext;
- boolean scanMore = false;
- do {
- hasNext = input.hasNext();
- if (hasNext) {
- if (skipScanData) {
- input.nextRow();
- scanMore = false;
- } else {
- if (useRawRow) {
- // read raw row for streaming handoff which does not require decode raw row
- readRawRowFromStream();
- } else {
- readRowFromStream();
- }
- if (null != filter) {
- scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax());
- } else {
- scanMore = false;
- }
- }
- } else {
- if (input.nextBlocklet()) {
- BlockletHeader header = input.readBlockletHeader();
- if (isScanRequired(header)) {
- if (skipScanData) {
- input.skipBlockletData(false);
- } else {
- input.readBlockletData(header);
- }
- } else {
- input.skipBlockletData(true);
- }
- scanMore = true;
- } else {
- isFinished = true;
- scanMore = false;
- }
- }
- } while (scanMore);
- return hasNext;
- } catch (FilterUnsupportedException e) {
- throw new IOException("Failed to filter row in detail reader", e);
- }
- }
-
- @Override public Void getCurrentKey() throws IOException, InterruptedException {
- return null;
- }
-
- @Override public Object getCurrentValue() throws IOException, InterruptedException {
- if (isVectorReader) {
- int value = columnarBatch.numValidRows();
- if (inputMetricsStats != null) {
- inputMetricsStats.incrementRecordRead((long) value);
- }
-
- return columnarBatch;
- }
-
- if (inputMetricsStats != null) {
- inputMetricsStats.incrementRecordRead(1L);
- }
-
- return outputRow;
- }
-
- private boolean isScanRequired(BlockletHeader header) {
- // TODO require to implement min-max index
- if (null == filter) {
- return true;
- }
- return true;
- }
-
- private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException {
- // if filter is null and output projection is empty, use the row number of blocklet header
- if (skipScanData) {
- int rowNums = header.getBlocklet_info().getNum_rows();
- columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, rowNums);
- columnarBatch.setNumRows(rowNums);
- input.skipBlockletData(true);
- return rowNums > 0;
- }
-
- input.readBlockletData(header);
- columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, input.getRowNums());
- int rowNum = 0;
- if (null == filter) {
- while (input.hasNext()) {
- readRowFromStream();
- putRowToColumnBatch(rowNum++);
- }
- } else {
- try {
- while (input.hasNext()) {
- readRowFromStream();
- if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) {
- putRowToColumnBatch(rowNum++);
- }
- }
- } catch (FilterUnsupportedException e) {
- throw new IOException("Failed to filter row in vector reader", e);
- }
- }
- columnarBatch.setNumRows(rowNum);
- return rowNum > 0;
- }
-
- private void readRowFromStream() {
- input.nextRow();
- short nullLen = input.readShort();
- BitSet nullBitSet = allNonNull;
- if (nullLen > 0) {
- nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
- }
- int colCount = 0;
- // primitive type dimension
- for (; colCount < isNoDictColumn.length; colCount++) {
- if (nullBitSet.get(colCount)) {
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = null;
- }
- } else {
- if (isNoDictColumn[colCount]) {
- int v = input.readShort();
- if (isRequired[colCount]) {
- byte[] b = input.readBytes(v);
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = b;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] =
- DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(b,
- storageColumns[colCount].getDataType());
- }
- } else {
- input.skipBytes(v);
- }
- } else if (null != directDictionaryGenerators[colCount]) {
- if (isRequired[colCount]) {
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = input.copy(4);
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] =
- directDictionaryGenerators[colCount].getValueFromSurrogate(input.readInt());
- } else {
- input.skipBytes(4);
- }
- } else {
- input.skipBytes(4);
- }
- } else {
- if (isRequired[colCount]) {
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = input.copy(4);
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = input.readInt();
- } else {
- input.skipBytes(4);
- }
- } else {
- input.skipBytes(4);
- }
- }
- }
- }
- // complex type dimension
- for (; colCount < dimensionCount; colCount++) {
- if (nullBitSet.get(colCount)) {
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = null;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = null;
- }
- } else {
- short v = input.readShort();
- if (isRequired[colCount]) {
- byte[] b = input.readBytes(v);
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = b;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = queryTypes[colCount]
- .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(b));
- }
- } else {
- input.skipBytes(v);
- }
- }
- }
- // measure
- DataType dataType;
- for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
- if (nullBitSet.get(colCount)) {
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = null;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = null;
- }
- } else {
- dataType = measureDataTypes[msrCount];
- if (dataType == DataTypes.BOOLEAN) {
- if (isRequired[colCount]) {
- boolean v = input.readBoolean();
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = v;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = v;
- }
- } else {
- input.skipBytes(1);
- }
- } else if (dataType == DataTypes.SHORT) {
- if (isRequired[colCount]) {
- short v = input.readShort();
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = v;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = v;
- }
- } else {
- input.skipBytes(2);
- }
- } else if (dataType == DataTypes.INT) {
- if (isRequired[colCount]) {
- int v = input.readInt();
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = v;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = v;
- }
- } else {
- input.skipBytes(4);
- }
- } else if (dataType == DataTypes.LONG) {
- if (isRequired[colCount]) {
- long v = input.readLong();
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = v;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = v;
- }
- } else {
- input.skipBytes(8);
- }
- } else if (dataType == DataTypes.DOUBLE) {
- if (isRequired[colCount]) {
- double v = input.readDouble();
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = v;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = v;
- }
- } else {
- input.skipBytes(8);
- }
- } else if (DataTypes.isDecimal(dataType)) {
- int len = input.readShort();
- if (isRequired[colCount]) {
- BigDecimal v = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
- if (isFilterRequired[colCount]) {
- filterValues[filterMap[colCount]] = v;
- }
- if (isProjectionRequired[colCount]) {
- outputValues[projectionMap[colCount]] = Decimal.apply(v);
- }
- } else {
- input.skipBytes(len);
- }
- }
- }
- }
- }
-
- private void readRawRowFromStream() {
- input.nextRow();
- short nullLen = input.readShort();
- BitSet nullBitSet = allNonNull;
- if (nullLen > 0) {
- nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
- }
- int colCount = 0;
- // primitive type dimension
- for (; colCount < isNoDictColumn.length; colCount++) {
- if (nullBitSet.get(colCount)) {
- outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
- } else {
- if (isNoDictColumn[colCount]) {
- int v = input.readShort();
- outputValues[colCount] = input.readBytes(v);
- } else {
- outputValues[colCount] = input.readInt();
- }
- }
- }
- // complex type dimension
- for (; colCount < dimensionCount; colCount++) {
- if (nullBitSet.get(colCount)) {
- outputValues[colCount] = null;
- } else {
- short v = input.readShort();
- outputValues[colCount] = input.readBytes(v);
- }
- }
- // measure
- DataType dataType;
- for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
- if (nullBitSet.get(colCount)) {
- outputValues[colCount] = null;
- } else {
- dataType = measureDataTypes[msrCount];
- if (dataType == DataTypes.BOOLEAN) {
- outputValues[colCount] = input.readBoolean();
- } else if (dataType == DataTypes.SHORT) {
- outputValues[colCount] = input.readShort();
- } else if (dataType == DataTypes.INT) {
- outputValues[colCount] = input.readInt();
- } else if (dataType == DataTypes.LONG) {
- outputValues[colCount] = input.readLong();
- } else if (dataType == DataTypes.DOUBLE) {
- outputValues[colCount] = input.readDouble();
- } else if (DataTypes.isDecimal(dataType)) {
- int len = input.readShort();
- outputValues[colCount] = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
- }
- }
- }
- }
-
- private void putRowToColumnBatch(int rowId) {
- for (int i = 0; i < projection.length; i++) {
- Object value = outputValues[i];
- ColumnVector col = columnarBatch.column(i);
- org.apache.spark.sql.types.DataType t = col.dataType();
- if (null == value) {
- col.putNull(rowId);
- } else {
- if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
- col.putBoolean(rowId, (boolean)value);
- } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
- col.putByte(rowId, (byte) value);
- } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
- col.putShort(rowId, (short) value);
- } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
- col.putInt(rowId, (int) value);
- } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
- col.putLong(rowId, (long) value);
- } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
- col.putFloat(rowId, (float) value);
- } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
- col.putDouble(rowId, (double) value);
- } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
- UTF8String v = (UTF8String) value;
- col.putByteArray(rowId, v.getBytes());
- } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
- DecimalType dt = (DecimalType)t;
- Decimal d = Decimal.fromDecimal(value);
- if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
- col.putInt(rowId, (int)d.toUnscaledLong());
- } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
- col.putLong(rowId, d.toUnscaledLong());
- } else {
- final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
- byte[] bytes = integer.toByteArray();
- col.putByteArray(rowId, bytes, 0, bytes.length);
- }
- } else if (t instanceof CalendarIntervalType) {
- CalendarInterval c = (CalendarInterval) value;
- col.getChildColumn(0).putInt(rowId, c.months);
- col.getChildColumn(1).putLong(rowId, c.microseconds);
- } else if (t instanceof org.apache.spark.sql.types.DateType) {
- col.putInt(rowId, (int) value);
- } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
- col.putLong(rowId, (long) value);
- }
- }
- }
- }
-
- @Override public float getProgress() throws IOException, InterruptedException {
- return 0;
- }
-
- public void setVectorReader(boolean isVectorReader) {
- this.isVectorReader = isVectorReader;
- }
-
- public void setInputMetricsStats(InputMetricsStats inputMetricsStats) {
- this.inputMetricsStats = inputMetricsStats;
- }
-
- @Override public void close() throws IOException {
- if (null != input) {
- input.close();
- }
- if (null != columnarBatch) {
- columnarBatch.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
deleted file mode 100644
index a4b3be8..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.streaming;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.CarbonMetadataUtil;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.format.FileHeader;
-import org.apache.carbondata.processing.loading.BadRecordsLogger;
-import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider;
-import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.loading.DataField;
-import org.apache.carbondata.processing.loading.DataLoadProcessBuilder;
-import org.apache.carbondata.processing.loading.converter.RowConverter;
-import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.processing.loading.parser.RowParser;
-import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl;
-import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskID;
-
-/**
- * Stream record writer
- */
-public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(CarbonStreamRecordWriter.class.getName());
-
- // basic info
- private Configuration hadoopConf;
- private CarbonLoadModel carbonLoadModel;
- private CarbonDataLoadConfiguration configuration;
- private CarbonTable carbonTable;
- private int maxRowNums;
- private int maxCacheSize;
-
- // parser and converter
- private RowParser rowParser;
- private BadRecordsLogger badRecordLogger;
- private RowConverter converter;
- private CarbonRow currentRow = new CarbonRow(null);
-
- // encoder
- private DataField[] dataFields;
- private BitSet nullBitSet;
- private boolean[] isNoDictionaryDimensionColumn;
- private int dimensionWithComplexCount;
- private int measureCount;
- private DataType[] measureDataTypes;
- private StreamBlockletWriter output = null;
-
- // data write
- private String segmentDir;
- private String fileName;
- private DataOutputStream outputStream;
- private boolean isFirstRow = true;
- private boolean hasException = false;
-
- CarbonStreamRecordWriter(TaskAttemptContext job) throws IOException {
- initialize(job);
- }
-
- public CarbonStreamRecordWriter(TaskAttemptContext job, CarbonLoadModel carbonLoadModel)
- throws IOException {
- this.carbonLoadModel = carbonLoadModel;
- initialize(job);
- }
-
- private void initialize(TaskAttemptContext job) throws IOException {
- // set basic information
- hadoopConf = job.getConfiguration();
- if (carbonLoadModel == null) {
- carbonLoadModel = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf);
- if (carbonLoadModel == null) {
- throw new IOException(
- "CarbonStreamRecordWriter require configuration: mapreduce.output.carbon.load.model");
- }
- }
- String segmentId = CarbonStreamOutputFormat.getSegmentId(hadoopConf);
- carbonLoadModel.setSegmentId(segmentId);
- carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
- long taskNo = TaskID.forName(hadoopConf.get("mapred.tip.id")).getId();
- carbonLoadModel.setTaskNo("" + taskNo);
- configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel);
- maxRowNums = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS,
- CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT) - 1;
- maxCacheSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE,
- CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE_DEFAULT);
-
- segmentDir = CarbonTablePath.getSegmentPath(
- carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId);
- fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0");
- }
-
- private void initializeAtFirstRow() throws IOException, InterruptedException {
-
- // initialize metadata
- isNoDictionaryDimensionColumn =
- CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
- dimensionWithComplexCount = configuration.getDimensionCount();
- measureCount = configuration.getMeasureCount();
- dataFields = configuration.getDataFields();
- measureDataTypes = new DataType[measureCount];
- for (int i = 0; i < measureCount; i++) {
- measureDataTypes[i] =
- dataFields[dimensionWithComplexCount + i].getColumn().getDataType();
- }
-
- // initialize parser and converter
- rowParser = new RowParserImpl(dataFields, configuration);
- badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration);
- converter = new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger);
- configuration.setCardinalityFinder(converter);
- converter.initialize();
-
- // initialize encoder
- nullBitSet = new BitSet(dataFields.length);
- int rowBufferSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE,
- CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT);
- output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize);
-
- // initialize data writer
- String filePath = segmentDir + File.separator + fileName;
- FileFactory.FileType fileType = FileFactory.getFileType(filePath);
- CarbonFile carbonFile = FileFactory.getCarbonFile(filePath, fileType);
- if (carbonFile.exists()) {
- // if the file is existed, use the append api
- outputStream = FileFactory.getDataOutputStreamUsingAppend(filePath, fileType);
- } else {
- // IF the file is not existed, use the create api
- outputStream = FileFactory.getDataOutputStream(filePath, fileType);
- writeFileHeader();
- }
-
- isFirstRow = false;
- }
-
- @Override public void write(Void key, Object value) throws IOException, InterruptedException {
- if (isFirstRow) {
- initializeAtFirstRow();
- }
-
- // parse and convert row
- currentRow.setData(rowParser.parseRow((Object[]) value));
- converter.convert(currentRow);
-
- // null bit set
- nullBitSet.clear();
- for (int i = 0; i < dataFields.length; i++) {
- if (null == currentRow.getObject(i)) {
- nullBitSet.set(i);
- }
- }
- output.nextRow();
- byte[] b = nullBitSet.toByteArray();
- output.writeShort(b.length);
- if (b.length > 0) {
- output.writeBytes(b);
- }
- int dimCount = 0;
- Object columnValue;
-
- // primitive type dimension
- for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
- columnValue = currentRow.getObject(dimCount);
- if (null != columnValue) {
- if (isNoDictionaryDimensionColumn[dimCount]) {
- byte[] col = (byte[]) columnValue;
- output.writeShort(col.length);
- output.writeBytes(col);
- } else {
- output.writeInt((int) columnValue);
- }
- }
- }
- // complex type dimension
- for (; dimCount < dimensionWithComplexCount; dimCount++) {
- columnValue = currentRow.getObject(dimCount);
- if (null != columnValue) {
- byte[] col = (byte[]) columnValue;
- output.writeShort(col.length);
- output.writeBytes(col);
- }
- }
- // measure
- DataType dataType;
- for (int msrCount = 0; msrCount < measureCount; msrCount++) {
- columnValue = currentRow.getObject(dimCount + msrCount);
- if (null != columnValue) {
- dataType = measureDataTypes[msrCount];
- if (dataType == DataTypes.BOOLEAN) {
- output.writeBoolean((boolean) columnValue);
- } else if (dataType == DataTypes.SHORT) {
- output.writeShort((short) columnValue);
- } else if (dataType == DataTypes.INT) {
- output.writeInt((int) columnValue);
- } else if (dataType == DataTypes.LONG) {
- output.writeLong((long) columnValue);
- } else if (dataType == DataTypes.DOUBLE) {
- output.writeDouble((double) columnValue);
- } else if (DataTypes.isDecimal(dataType)) {
- BigDecimal val = (BigDecimal) columnValue;
- byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
- output.writeShort(bigDecimalInBytes.length);
- output.writeBytes(bigDecimalInBytes);
- } else {
- String msg =
- "unsupported data type:" + dataFields[dimCount + msrCount].getColumn().getDataType()
- .getName();
- LOGGER.error(msg);
- throw new IOException(msg);
- }
- }
- }
-
- if (output.isFull()) {
- appendBlockletToDataFile();
- }
- }
-
- private void writeFileHeader() throws IOException {
- List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
- .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
- carbonTable.getMeasureByTableName(carbonTable.getTableName()));
- int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
- for (int i = 0; i < dimLensWithComplex.length; i++) {
- dimLensWithComplex[i] = Integer.MAX_VALUE;
- }
- int[] dictionaryColumnCardinality =
- CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
- List<Integer> cardinality = new ArrayList<>();
- List<org.apache.carbondata.format.ColumnSchema> columnSchemaList = AbstractFactDataWriter
- .getColumnSchemaListAndCardinality(cardinality, dictionaryColumnCardinality,
- wrapperColumnSchemaList);
- FileHeader fileHeader =
- CarbonMetadataUtil.getFileHeader(true, columnSchemaList, System.currentTimeMillis());
- fileHeader.setIs_footer_present(false);
- fileHeader.setIs_splitable(true);
- fileHeader.setSync_marker(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
- outputStream.write(CarbonUtil.getByteArray(fileHeader));
- }
-
- /**
- * write a blocklet to file
- */
- private void appendBlockletToDataFile() throws IOException {
- if (output.getRowIndex() == -1) {
- return;
- }
- output.apppendBlocklet(outputStream);
- outputStream.flush();
- // reset data
- output.reset();
- }
-
- @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- try {
- // append remain buffer data
- if (!hasException && !isFirstRow) {
- appendBlockletToDataFile();
- converter.finish();
- }
- } finally {
- // close resource
- CarbonUtil.closeStreams(outputStream);
- if (output != null) {
- output.close();
- }
- if (badRecordLogger != null) {
- badRecordLogger.closeStreams();
- }
- }
- }
-
- public String getSegmentDir() {
- return segmentDir;
- }
-
- public String getFileName() {
- return fileName;
- }
-
- public void setHasException(boolean hasException) {
- this.hasException = hasException;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
deleted file mode 100644
index 1989198..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.streaming;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.format.BlockletHeader;
-
-/**
- * stream blocklet reader
- */
-public class StreamBlockletReader {
-
- private byte[] buffer;
- private int offset;
- private final byte[] syncMarker;
- private final byte[] syncBuffer;
- private final int syncLen;
- private long pos = 0;
- private final InputStream in;
- private final long limitStart;
- private final long limitEnd;
- private boolean isAlreadySync = false;
- private Compressor compressor = CompressorFactory.getInstance().getCompressor();
- private int rowNums = 0;
- private int rowIndex = 0;
- private boolean isHeaderPresent;
-
- StreamBlockletReader(byte[] syncMarker, InputStream in, long limit, boolean isHeaderPresent) {
- this.syncMarker = syncMarker;
- syncLen = syncMarker.length;
- syncBuffer = new byte[syncLen];
- this.in = in;
- limitStart = limit;
- limitEnd = limitStart + syncLen;
- this.isHeaderPresent = isHeaderPresent;
- }
-
- private void ensureCapacity(int capacity) {
- if (buffer == null || capacity > buffer.length) {
- buffer = new byte[capacity];
- }
- }
-
- /**
- * find the first position of sync_marker in input stream
- */
- private boolean sync() throws IOException {
- if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
- return false;
- }
- boolean skipHeader = false;
- for (int i = 0; i < limitStart; i++) {
- int j = 0;
- for (; j < syncLen; j++) {
- if (syncMarker[j] != syncBuffer[(i + j) % syncLen]) break;
- }
- if (syncLen == j) {
- if (isHeaderPresent) {
- if (skipHeader) {
- return true;
- } else {
- skipHeader = true;
- }
- } else {
- return true;
- }
- }
- int value = in.read();
- if (-1 == value) {
- return false;
- }
- syncBuffer[i % syncLen] = (byte) value;
- pos++;
- }
- return false;
- }
-
- BlockletHeader readBlockletHeader() throws IOException {
- int len = readIntFromStream();
- byte[] b = new byte[len];
- if (!readBytesFromStream(b, 0, len)) {
- throw new EOFException("Failed to read blocklet header");
- }
- BlockletHeader header = CarbonUtil.readBlockletHeader(b);
- rowNums = header.getBlocklet_info().getNum_rows();
- rowIndex = 0;
- return header;
- }
-
- void readBlockletData(BlockletHeader header) throws IOException {
- ensureCapacity(header.getBlocklet_length());
- offset = 0;
- int len = readIntFromStream();
- byte[] b = new byte[len];
- if (!readBytesFromStream(b, 0, len)) {
- throw new EOFException("Failed to read blocklet data");
- }
- compressor.rawUncompress(b, buffer);
- }
-
- void skipBlockletData(boolean reset) throws IOException {
- int len = readIntFromStream();
- skip(len);
- pos += len;
- if (reset) {
- this.rowNums = 0;
- this.rowIndex = 0;
- }
- }
-
- private void skip(int len) throws IOException {
- long remaining = len;
- do {
- long skipLen = in.skip(remaining);
- remaining -= skipLen;
- } while (remaining > 0);
- }
-
- /**
- * find the next blocklet
- */
- boolean nextBlocklet() throws IOException {
- if (pos >= limitStart) {
- return false;
- }
- if (isAlreadySync) {
- if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
- return false;
- }
- } else {
- isAlreadySync = true;
- if (!sync()) {
- return false;
- }
- }
-
- return pos < limitEnd;
- }
-
- boolean hasNext() throws IOException {
- return rowIndex < rowNums;
- }
-
- void nextRow() {
- rowIndex++;
- }
-
- int readIntFromStream() throws IOException {
- int ch1 = in.read();
- int ch2 = in.read();
- int ch3 = in.read();
- int ch4 = in.read();
- if ((ch1 | ch2 | ch3 | ch4) < 0) throw new EOFException();
- pos += 4;
- return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
- }
-
- /**
- * Reads <code>len</code> bytes of data from the input stream into
- * an array of bytes.
- * @return <code>true</code> if reading data successfully, or
- * <code>false</code> if there is no more data because the end of the stream has been reached.
- */
- boolean readBytesFromStream(byte[] b, int offset, int len) throws IOException {
- int readLen = in.read(b, offset, len);
- if (readLen < 0) {
- return false;
- }
- pos += readLen;
- if (readLen < len) {
- return readBytesFromStream(b, offset + readLen, len - readLen);
- } else {
- return true;
- }
- }
-
- boolean readBoolean() {
- return (buffer[offset++]) != 0;
- }
-
- short readShort() {
- short v = (short) ((buffer[offset + 1] & 255) +
- ((buffer[offset]) << 8));
- offset += 2;
- return v;
- }
-
- byte[] copy(int len) {
- byte[] b = new byte[len];
- System.arraycopy(buffer, offset, b, 0, len);
- return b;
- }
-
- int readInt() {
- int v = ((buffer[offset + 3] & 255) +
- ((buffer[offset + 2] & 255) << 8) +
- ((buffer[offset + 1] & 255) << 16) +
- ((buffer[offset]) << 24));
- offset += 4;
- return v;
- }
-
- long readLong() {
- long v = ((long)(buffer[offset + 7] & 255)) +
- ((long) (buffer[offset + 6] & 255) << 8) +
- ((long) (buffer[offset + 5] & 255) << 16) +
- ((long) (buffer[offset + 4] & 255) << 24) +
- ((long) (buffer[offset + 3] & 255) << 32) +
- ((long) (buffer[offset + 2] & 255) << 40) +
- ((long) (buffer[offset + 1] & 255) << 48) +
- ((long) (buffer[offset]) << 56);
- offset += 8;
- return v;
- }
-
- double readDouble() {
- return Double.longBitsToDouble(readLong());
- }
-
- byte[] readBytes(int len) {
- byte[] b = new byte[len];
- System.arraycopy(buffer, offset, b, 0, len);
- offset += len;
- return b;
- }
-
- void skipBytes(int len) {
- offset += len;
- }
-
- int getRowNums() {
- return rowNums;
- }
-
- void close() {
- CarbonUtil.closeStreams(in);
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java
deleted file mode 100644
index a0328b3..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletWriter.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.streaming;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.format.BlockletHeader;
-import org.apache.carbondata.format.BlockletInfo;
-import org.apache.carbondata.format.MutationType;
-
-/**
- * stream blocklet writer
- */
-public class StreamBlockletWriter {
- private byte[] buffer;
- private int maxSize;
- private int maxRowNum;
- private int rowSize;
- private int count = 0;
- private int rowIndex = -1;
- private Compressor compressor = CompressorFactory.getInstance().getCompressor();
-
- StreamBlockletWriter(int maxSize, int maxRowNum, int rowSize) {
- buffer = new byte[maxSize];
- this.maxSize = maxSize;
- this.maxRowNum = maxRowNum;
- this.rowSize = rowSize;
- }
-
- private void ensureCapacity(int space) {
- int newcount = space + count;
- if (newcount > buffer.length) {
- byte[] newbuf = new byte[Math.max(newcount, buffer.length + rowSize)];
- System.arraycopy(buffer, 0, newbuf, 0, count);
- buffer = newbuf;
- }
- }
-
- void reset() {
- count = 0;
- rowIndex = -1;
- }
-
- byte[] getBytes() {
- return buffer;
- }
-
- int getCount() {
- return count;
- }
-
- int getRowIndex() {
- return rowIndex;
- }
-
- void nextRow() {
- rowIndex++;
- }
-
- boolean isFull() {
- return rowIndex == maxRowNum || count >= maxSize;
- }
-
- void writeBoolean(boolean val) {
- ensureCapacity(1);
- buffer[count] = (byte) (val ? 1 : 0);
- count += 1;
- }
-
- void writeShort(int val) {
- ensureCapacity(2);
- buffer[count + 1] = (byte) (val);
- buffer[count] = (byte) (val >>> 8);
- count += 2;
- }
-
- void writeInt(int val) {
- ensureCapacity(4);
- buffer[count + 3] = (byte) (val);
- buffer[count + 2] = (byte) (val >>> 8);
- buffer[count + 1] = (byte) (val >>> 16);
- buffer[count] = (byte) (val >>> 24);
- count += 4;
- }
-
- void writeLong(long val) {
- ensureCapacity(8);
- buffer[count + 7] = (byte) (val);
- buffer[count + 6] = (byte) (val >>> 8);
- buffer[count + 5] = (byte) (val >>> 16);
- buffer[count + 4] = (byte) (val >>> 24);
- buffer[count + 3] = (byte) (val >>> 32);
- buffer[count + 2] = (byte) (val >>> 40);
- buffer[count + 1] = (byte) (val >>> 48);
- buffer[count] = (byte) (val >>> 56);
- count += 8;
- }
-
- void writeDouble(double val) {
- writeLong(Double.doubleToLongBits(val));
- }
-
- void writeBytes(byte[] b) {
- writeBytes(b, 0, b.length);
- }
-
- void writeBytes(byte[] b, int off, int len) {
- ensureCapacity(len);
- System.arraycopy(b, off, buffer, count, len);
- count += len;
- }
-
- void apppendBlocklet(DataOutputStream outputStream) throws IOException {
- outputStream.write(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
-
- BlockletInfo blockletInfo = new BlockletInfo();
- blockletInfo.setNum_rows(getRowIndex() + 1);
- BlockletHeader blockletHeader = new BlockletHeader();
- blockletHeader.setBlocklet_length(getCount());
- blockletHeader.setMutation(MutationType.INSERT);
- blockletHeader.setBlocklet_info(blockletInfo);
- byte[] headerBytes = CarbonUtil.getByteArray(blockletHeader);
- outputStream.writeInt(headerBytes.length);
- outputStream.write(headerBytes);
-
- byte[] compressed = compressor.compressByte(getBytes(), getCount());
- outputStream.writeInt(compressed.length);
- outputStream.write(compressed);
- }
-
- void close() {
- }
-}