You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/04/05 13:39:40 UTC
[11/13] incubator-carbondata git commit: refactor write step
refactor write step
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/8cca0afc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/8cca0afc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/8cca0afc
Branch: refs/heads/12-dev
Commit: 8cca0afc5db16557146dfaa33e14c2823d895966
Parents: bd044c2
Author: jackylk <ja...@huawei.com>
Authored: Thu Mar 30 11:21:21 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Apr 5 14:36:47 2017 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 2 +-
.../core/datastore/NodeMeasureDataStore.java | 32 --
...ractHeavyCompressedDoubleArrayDataStore.java | 89 -----
...yCompressedDoubleArrayDataInMemoryStore.java | 28 --
.../HeavyCompressedDoubleArrayDataStore.java | 57 ++++
.../core/util/CarbonMetadataUtil.java | 2 +-
.../apache/carbondata/core/util/CarbonUtil.java | 2 +-
.../carbondata/core/util/DataTypeUtil.java | 5 +-
.../sort/unsafe/UnsafeCarbonRowPage.java | 6 +-
.../holder/UnsafeSortTempFileChunkHolder.java | 2 +-
.../merger/UnsafeIntermediateFileMerger.java | 2 +-
.../sortdata/IntermediateFileMerger.java | 2 +-
.../sortandgroupby/sortdata/SortDataRows.java | 2 +-
.../sortdata/SortTempFileChunkHolder.java | 2 +-
.../store/CarbonFactDataHandlerColumnar.java | 328 +++++++++++--------
.../processing/store/StoreFactory.java | 33 --
.../store/writer/CarbonFactDataWriter.java | 4 +-
.../writer/v1/CarbonFactDataWriterImplV1.java | 13 +-
.../writer/v3/CarbonFactDataWriterImplV3.java | 14 +-
19 files changed, 287 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 789c321..b82d53c 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -631,7 +631,7 @@ public final class CarbonCommonConstants {
/**
* DOUBLE_VALUE_MEASURE
*/
- public static final char SUM_COUNT_VALUE_MEASURE = 'n';
+ public static final char DOUBLE_MEASURE = 'n';
/**
* BYTE_VALUE_MEASURE
*/
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/datastore/NodeMeasureDataStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/NodeMeasureDataStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/NodeMeasureDataStore.java
deleted file mode 100644
index 2f54847..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/NodeMeasureDataStore.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore;
-
-import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
-
-public interface NodeMeasureDataStore {
- /**
- * This method will be used to get the writable key array.
- * writable measure data array will hold below information:
- * <size of measure data array><measure data array>
- * total length will be 4 bytes for size + measure data array length
- *
- * @return writable array (compressed or normal)
- */
- byte[][] getWritableMeasureDataArray(CarbonWriteDataHolder[] dataHolderArray);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
deleted file mode 100644
index b274b21..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/AbstractHeavyCompressedDoubleArrayDataStore.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.impl.data.compressed;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.NodeMeasureDataStore;
-import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
-import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil;
-
-public abstract class AbstractHeavyCompressedDoubleArrayDataStore
- implements NodeMeasureDataStore //NodeMeasureDataStore<double[]>
-{
-
- private LogService LOGGER =
- LogServiceFactory.getLogService(AbstractHeavyCompressedDoubleArrayDataStore.class.getName());
-
- /**
- * values.
- */
- protected ValueCompressionHolder[] values;
-
- /**
- * compressionModel.
- */
- protected WriterCompressModel compressionModel;
-
- /**
- * type
- */
- private char[] type;
-
- /**
- * AbstractHeavyCompressedDoubleArrayDataStore constructor.
- *
- * @param compressionModel
- */
- public AbstractHeavyCompressedDoubleArrayDataStore(WriterCompressModel compressionModel) {
- this.compressionModel = compressionModel;
- if (null != compressionModel) {
- this.type = compressionModel.getType();
- values =
- new ValueCompressionHolder[compressionModel.getValueCompressionHolder().length];
- }
- }
-
- // this method first invokes encoding routine to encode the data chunk,
- // followed by invoking compression routine for preparing the data chunk for writing.
- @Override public byte[][] getWritableMeasureDataArray(CarbonWriteDataHolder[] dataHolder) {
- byte[][] returnValue = new byte[values.length][];
- for (int i = 0; i < compressionModel.getValueCompressionHolder().length; i++) {
- values[i] = compressionModel.getValueCompressionHolder()[i];
- if (type[i] != CarbonCommonConstants.BYTE_VALUE_MEASURE
- && type[i] != CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- // first perform encoding of the data chunk
- values[i].setValue(
- ValueCompressionUtil.getValueCompressor(compressionModel.getCompressionFinders()[i])
- .getCompressedValues(compressionModel.getCompressionFinders()[i], dataHolder[i],
- compressionModel.getMaxValue()[i],
- compressionModel.getMantissa()[i]));
- } else {
- values[i].setValue(dataHolder[i].getWritableByteArrayValues());
- }
- values[i].compress();
- returnValue[i] = values[i].getCompressedData();
- }
-
- return returnValue;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
deleted file mode 100644
index a484b8f..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataInMemoryStore.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.impl.data.compressed;
-
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
-
-public class HeavyCompressedDoubleArrayDataInMemoryStore
- extends AbstractHeavyCompressedDoubleArrayDataStore {
-
- public HeavyCompressedDoubleArrayDataInMemoryStore(WriterCompressModel compressionModel) {
- super(compressionModel);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java
new file mode 100644
index 0000000..d3d67fd
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.impl.data.compressed;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+
+public class HeavyCompressedDoubleArrayDataStore {
+
+ // this method first invokes encoding routine to encode the data chunk,
+ // followed by invoking compression routine for preparing the data chunk for writing.
+ public static byte[][] encodeMeasureDataArray(
+ WriterCompressModel compressionModel,
+ CarbonWriteDataHolder[] dataHolder) {
+ char[] type = compressionModel.getType();
+ ValueCompressionHolder[] values =
+ new ValueCompressionHolder[compressionModel.getValueCompressionHolder().length];
+ byte[][] returnValue = new byte[values.length][];
+ for (int i = 0; i < compressionModel.getValueCompressionHolder().length; i++) {
+ values[i] = compressionModel.getValueCompressionHolder()[i];
+ if (type[i] != CarbonCommonConstants.BYTE_VALUE_MEASURE
+ && type[i] != CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+ // first perform encoding of the data chunk
+ values[i].setValue(
+ ValueCompressionUtil.getValueCompressor(compressionModel.getCompressionFinders()[i])
+ .getCompressedValues(compressionModel.getCompressionFinders()[i], dataHolder[i],
+ compressionModel.getMaxValue()[i],
+ compressionModel.getMantissa()[i]));
+ } else {
+ values[i].setValue(dataHolder[i].getWritableByteArrayValues());
+ }
+ values[i].compress();
+ returnValue[i] = values[i].getCompressedData();
+ }
+
+ return returnValue;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index f134f0c..e60d675 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -824,7 +824,7 @@ public class CarbonMetadataUtil {
public static byte[] serializeEncodeMetaUsingByteBuffer(ValueEncoderMeta valueEncoderMeta) {
ByteBuffer buffer = null;
- if (valueEncoderMeta.getType() == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+ if (valueEncoderMeta.getType() == CarbonCommonConstants.DOUBLE_MEASURE) {
buffer = ByteBuffer.allocate(
(CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+ 3);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index bf8c03b..a442087 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1427,7 +1427,7 @@ public final class CarbonUtil {
ValueEncoderMeta valueEncoderMeta = new ValueEncoderMeta();
valueEncoderMeta.setType(measureType);
switch (measureType) {
- case CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE:
+ case CarbonCommonConstants.DOUBLE_MEASURE:
valueEncoderMeta.setMaxValue(buffer.getDouble());
valueEncoderMeta.setMinValue(buffer.getDouble());
valueEncoderMeta.setUniqueValue(buffer.getDouble());
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 10411b0..e437405 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -152,10 +152,13 @@ public final class DataTypeUtil {
case LONG:
return CarbonCommonConstants.BIG_INT_MEASURE;
default:
- return CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE;
+ return CarbonCommonConstants.DOUBLE_MEASURE;
}
}
+ // bytes of 0 in BigDecimal
+ public static final byte[] zeroBigDecimalBytes = bigDecimalToByte(BigDecimal.valueOf(0));
+
/**
* This method will convert a big decimal value to bytes
*
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
index e468028..e682263 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
@@ -112,7 +112,7 @@ public class UnsafeCarbonRowPage {
for (int mesCount = 0; mesCount < measureSize; mesCount++) {
Object value = row[mesCount + dimensionSize];
if (null != value) {
- if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+ if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
Double val = (Double) value;
CarbonUnsafe.unsafe.putDouble(baseObject, address + size, val);
size += 8;
@@ -183,7 +183,7 @@ public class UnsafeCarbonRowPage {
for (int mesCount = 0; mesCount < measureSize; mesCount++) {
if (isSet(nullSetWords, mesCount)) {
- if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+ if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
Double val = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
size += 8;
rowToFill[dimensionSize + mesCount] = val;
@@ -254,7 +254,7 @@ public class UnsafeCarbonRowPage {
for (int mesCount = 0; mesCount < measureSize; mesCount++) {
if (isSet(nullSetWords, mesCount)) {
- if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+ if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
double val = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
size += 8;
stream.writeDouble(val);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 60f259e..de2b874 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -324,7 +324,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
for (int mesCount = 0; mesCount < measureCount; mesCount++) {
if (UnsafeCarbonRowPage.isSet(words, mesCount)) {
- if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+ if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
row[dimensionCount + mesCount] = stream.readDouble();
} else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
row[dimensionCount + mesCount] = stream.readLong();
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 7862a95..e52dc8a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -310,7 +310,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
for (int mesCount = 0; mesCount < measureSize; mesCount++) {
Object value = row[mesCount + dimensionSize];
if (null != value) {
- if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+ if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
Double val = (Double) value;
rowData.putDouble(size, val);
size += 8;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
index 0ac2d5c..5487593 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
@@ -341,7 +341,7 @@ public class IntermediateFileMerger implements Callable<Void> {
if (aggType[counter] == CarbonCommonConstants.BYTE_VALUE_MEASURE) {
Double val = (Double) NonDictionaryUtil.getMeasure(fieldIndex, row);
stream.writeDouble(val);
- } else if (aggType[counter] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+ } else if (aggType[counter] == CarbonCommonConstants.DOUBLE_MEASURE) {
Double val = (Double) NonDictionaryUtil.getMeasure(fieldIndex, row);
stream.writeDouble(val);
} else if (aggType[counter] == CarbonCommonConstants.BIG_INT_MEASURE) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index 9b5a850..3a7a579 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -285,7 +285,7 @@ public class SortDataRows {
Object value = row[mesCount + dimColCount];
if (null != value) {
stream.write((byte) 1);
- if (aggType[mesCount] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+ if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
Double val = (Double) value;
stream.writeDouble(val);
} else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
index ae01404..b4ccc6f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
@@ -332,7 +332,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
// read measure values
for (int i = 0; i < this.measureCount; i++) {
if (stream.readByte() == 1) {
- if (aggType[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+ if (aggType[i] == CarbonCommonConstants.DOUBLE_MEASURE) {
measures[index++] = stream.readDouble();
} else if (aggType[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
measures[index++] = stream.readLong();
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 2affa03..da75428 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -50,6 +50,7 @@ import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
import org.apache.carbondata.core.datastore.columnar.IndexStorage;
import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
+import org.apache.carbondata.core.datastore.impl.data.compressed.HeavyCompressedDoubleArrayDataStore;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
@@ -478,155 +479,224 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
}
- private NodeHolder processDataRows(List<Object[]> dataRows)
- throws CarbonDataWriterException {
- Object[] max = new Object[measureCount];
- Object[] min = new Object[measureCount];
- int[] decimal = new int[measureCount];
- Object[] uniqueValue = new Object[measureCount];
- // to store index of the measure columns which are null
- BitSet[] nullValueIndexBitSet = getMeasureNullValueIndexBitSet(measureCount);
- for (int i = 0; i < max.length; i++) {
- if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
- max[i] = Long.MIN_VALUE;
- } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
- max[i] = -Double.MAX_VALUE;
- } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- max[i] = new BigDecimal(0.0);
- } else {
- max[i] = 0.0;
+ /** statics for one blocklet/page */
+ class Statistics {
+ /** min and max value of the measures */
+ Object[] min, max;
+
+ /**
+ * the unique value is the non-exist value in the row,
+ * and will be used as storage key for null values of measures
+ */
+ Object[] uniqueValue;
+
+ /** decimal count of the measures */
+ int[] decimal;
+
+ Statistics(int measureCount) {
+ max = new Object[measureCount];
+ min = new Object[measureCount];
+ uniqueValue = new Object[measureCount];
+ decimal = new int[measureCount];
+ for (int i = 0; i < measureCount; i++) {
+ if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
+ max[i] = Long.MIN_VALUE;
+ min[i] = Long.MAX_VALUE;
+ uniqueValue[i] = Long.MIN_VALUE;
+ } else if (type[i] == CarbonCommonConstants.DOUBLE_MEASURE) {
+ max[i] = Double.MIN_VALUE;
+ min[i] = Double.MAX_VALUE;
+ uniqueValue[i] = Double.MIN_VALUE;
+ } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+ max[i] = new BigDecimal(Double.MIN_VALUE);
+ min[i] = new BigDecimal(Double.MAX_VALUE);
+ uniqueValue[i] = new BigDecimal(Double.MIN_VALUE);
+ } else {
+ max[i] = 0.0;
+ min[i] = 0.0;
+ uniqueValue[i] = 0.0;
+ }
+ decimal[i] = 0;
}
}
- for (int i = 0; i < min.length; i++) {
- if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
- min[i] = Long.MAX_VALUE;
- uniqueValue[i] = Long.MIN_VALUE;
- } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
- min[i] = Double.MAX_VALUE;
- uniqueValue[i] = Double.MIN_VALUE;
- } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- min[i] = new BigDecimal(Double.MAX_VALUE);
- uniqueValue[i] = new BigDecimal(Double.MIN_VALUE);
- } else {
- min[i] = 0.0;
- uniqueValue[i] = 0.0;
+
+ /**
+ * update the statistics for the input row
+ */
+ void update(int[] msrIndex, Object[] row, boolean compactionFlow) {
+ // Update row level min max
+ for (int i = 0; i < msrIndex.length; i++) {
+ int count = msrIndex[i];
+ if (row[count] != null) {
+ if (type[count] == CarbonCommonConstants.DOUBLE_MEASURE) {
+ double value = (double) row[count];
+ double maxVal = (double) max[count];
+ double minVal = (double) min[count];
+ max[count] = (maxVal > value ? max[count] : value);
+ min[count] = (minVal < value ? min[count] : value);
+ int num = getDecimalCount(value);
+ decimal[count] = (decimal[count] > num ? decimal[count] : num);
+ uniqueValue[count] = (double) min[count] - 1;
+ } else if (type[count] == CarbonCommonConstants.BIG_INT_MEASURE) {
+ long value = (long) row[count];
+ long maxVal = (long) max[count];
+ long minVal = (long) min[count];
+ max[count] = (maxVal > value ? max[count] : value);
+ min[count] = (minVal < value ? min[count] : value);
+ uniqueValue[count] = (long) min[count] - 1;
+ } else if (type[count] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+ byte[] buff = null;
+ // in compaction flow the measure with decimal type will come as spark decimal.
+ // need to convert it to byte array.
+ if (compactionFlow) {
+ BigDecimal bigDecimal = ((Decimal) row[count]).toJavaBigDecimal();
+ buff = DataTypeUtil.bigDecimalToByte(bigDecimal);
+ } else {
+ buff = (byte[]) row[count];
+ }
+ BigDecimal value = DataTypeUtil.byteToBigDecimal(buff);
+ decimal[count] = value.scale();
+ BigDecimal val = (BigDecimal) min[count];
+ uniqueValue[count] = (val.subtract(new BigDecimal(1.0)));
+ }
+ }
}
}
- for (int i = 0; i < decimal.length; i++) {
- decimal[i] = 0;
- }
+ }
+ class IndexKey {
+ byte[] currentMDKey = null;
+ byte[][] currentNoDictionaryKey = null;
byte[] startKey = null;
byte[] endKey = null;
byte[][] noDictStartKey = null;
byte[][] noDictEndKey = null;
- CarbonWriteDataHolder[] dataHolder = initialiseDataHolder(dataRows.size());
- CarbonWriteDataHolder keyDataHolder = initialiseKeyBlockHolder(dataRows.size());
- CarbonWriteDataHolder noDictionaryKeyDataHolder = null;
+
+ /** update all keys based on the input row */
+ void update(Object[] row, boolean firstRow) {
+ currentMDKey = (byte[]) row[mdKeyIndex];
+ if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
+ currentNoDictionaryKey = (byte[][]) row[mdKeyIndex - 1];
+ }
+ if (firstRow) {
+ startKey = currentMDKey;
+ noDictStartKey = currentNoDictionaryKey;
+ }
+ endKey = currentMDKey;
+ noDictEndKey = currentNoDictionaryKey;
+ }
+ }
+
+ /** generate the NodeHolder from the input rows */
+ private NodeHolder processDataRows(List<Object[]> dataRows)
+ throws CarbonDataWriterException {
+ // to store index of the measure columns which are null
+ BitSet[] nullValueIndexBitSet = getMeasureNullValueIndexBitSet(measureCount);
+ // statistics for one blocklet/page
+ Statistics stats = new Statistics(measureCount);
+ IndexKey keys = new IndexKey();
+
+ // initialize measureHolder, mdKeyHolder and noDictionaryHolder, these three Holders
+ // are the input for final encoding
+ CarbonWriteDataHolder[] measureHolder = initialiseDataHolder(dataRows.size());
+ CarbonWriteDataHolder mdKeyHolder = initialiseKeyBlockHolder(dataRows.size());
+ CarbonWriteDataHolder noDictionaryHolder = null;
if ((noDictionaryCount + complexColCount) > 0) {
- noDictionaryKeyDataHolder = initialiseKeyBlockHolderForNonDictionary(dataRows.size());
+ noDictionaryHolder = initialiseKeyBlockHolderForNonDictionary(dataRows.size());
}
+ // loop on the input rows, fill measureHolder, mdKeyHolder and noDictionaryHolder
for (int count = 0; count < dataRows.size(); count++) {
Object[] row = dataRows.get(count);
- byte[] mdKey = (byte[]) row[this.mdKeyIndex];
- byte[][] noDictionaryKey = null;
- if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
- noDictionaryKey = (byte[][]) row[this.mdKeyIndex - 1];
- }
- ByteBuffer byteBuffer = null;
- byte[] b = null;
- if (count == 0) {
- startKey = mdKey;
- noDictStartKey = noDictionaryKey;
- }
- endKey = mdKey;
- noDictEndKey = noDictionaryKey;
- // add to key store
- if (mdKey.length > 0) {
- keyDataHolder.setWritableByteArrayValueByIndex(count, mdKey);
+ keys.update(row, (count == 0));
+ if (keys.currentMDKey.length > 0) {
+ mdKeyHolder.setWritableByteArrayValueByIndex(count, keys.currentMDKey);
}
- // for storing the byte [] for high card.
if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
- noDictionaryKeyDataHolder.setWritableNonDictByteArrayValueByIndex(count, noDictionaryKey);
+ noDictionaryHolder.setWritableNonDictByteArrayValueByIndex(count,
+ keys.currentNoDictionaryKey);
}
+ fillMeasureHolder(row, count, measureHolder, nullValueIndexBitSet);
+ stats.update(otherMeasureIndex, row, compactionFlow);
+ stats.update(customMeasureIndex, row, compactionFlow);
+ }
- for (int k = 0; k < otherMeasureIndex.length; k++) {
- if (type[otherMeasureIndex[k]] == CarbonCommonConstants.BIG_INT_MEASURE) {
- if (null == row[otherMeasureIndex[k]]) {
- nullValueIndexBitSet[otherMeasureIndex[k]].set(count);
- dataHolder[otherMeasureIndex[k]].setWritableLongValueByIndex(count, 0L);
- } else {
- dataHolder[otherMeasureIndex[k]]
- .setWritableLongValueByIndex(count, row[otherMeasureIndex[k]]);
- }
+ // generate encoded byte array for 3 holders
+ // for measure columns: encode and compress the measureHolder
+ WriterCompressModel compressionModel =
+ ValueCompressionUtil.getWriterCompressModel(
+ stats.max, stats.min, stats.decimal, stats.uniqueValue, type, new byte[measureCount]);
+ byte[][] encodedMeasureArray =
+ HeavyCompressedDoubleArrayDataStore.encodeMeasureDataArray(
+ compressionModel, measureHolder);
+
+ // for mdkey and noDictionary, it is already in bytes, just get the array from holder
+ byte[][] mdKeyArray = mdKeyHolder.getByteArrayValues();
+ byte[][][] noDictionaryArray = null;
+ if ((noDictionaryCount + complexColCount) > 0) {
+ noDictionaryArray = noDictionaryHolder.getNonDictByteArrayValues();
+ }
+
+ // create NodeHolder using these encoded byte arrays
+ NodeHolder nodeHolder =
+ createNodeHolderObjectWithOutKettle(
+ encodedMeasureArray, mdKeyArray, noDictionaryArray, dataRows.size(),
+ keys.startKey, keys.endKey, compressionModel, keys.noDictStartKey, keys.noDictEndKey,
+ nullValueIndexBitSet);
+ LOGGER.info("Number Of records processed: " + dataRows.size());
+ return nodeHolder;
+ }
+
+ private void fillMeasureHolder(Object[] row, int count, CarbonWriteDataHolder[] measureHolder,
+ BitSet[] nullValueIndexBitSet) {
+ for (int k = 0; k < otherMeasureIndex.length; k++) {
+ if (type[otherMeasureIndex[k]] == CarbonCommonConstants.BIG_INT_MEASURE) {
+ if (null == row[otherMeasureIndex[k]]) {
+ nullValueIndexBitSet[otherMeasureIndex[k]].set(count);
+ measureHolder[otherMeasureIndex[k]].setWritableLongValueByIndex(count, 0L);
} else {
- if (null == row[otherMeasureIndex[k]]) {
- nullValueIndexBitSet[otherMeasureIndex[k]].set(count);
- dataHolder[otherMeasureIndex[k]].setWritableDoubleValueByIndex(count, 0.0);
- } else {
- dataHolder[otherMeasureIndex[k]]
- .setWritableDoubleValueByIndex(count, row[otherMeasureIndex[k]]);
- }
+ measureHolder[otherMeasureIndex[k]]
+ .setWritableLongValueByIndex(count, row[otherMeasureIndex[k]]);
}
- }
- calculateMaxMin(max, min, decimal, otherMeasureIndex, row);
- for (int i = 0; i < customMeasureIndex.length; i++) {
- if (null == row[customMeasureIndex[i]]
- && type[customMeasureIndex[i]] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- BigDecimal val = BigDecimal.valueOf(0);
- b = DataTypeUtil.bigDecimalToByte(val);
- nullValueIndexBitSet[customMeasureIndex[i]].set(count);
+ } else {
+ if (null == row[otherMeasureIndex[k]]) {
+ nullValueIndexBitSet[otherMeasureIndex[k]].set(count);
+ measureHolder[otherMeasureIndex[k]].setWritableDoubleValueByIndex(count, 0.0);
} else {
- if (this.compactionFlow) {
- BigDecimal bigDecimal = ((Decimal) row[customMeasureIndex[i]]).toJavaBigDecimal();
- b = DataTypeUtil.bigDecimalToByte(bigDecimal);
- } else {
- b = (byte[]) row[customMeasureIndex[i]];
- }
+ measureHolder[otherMeasureIndex[k]]
+ .setWritableDoubleValueByIndex(count, row[otherMeasureIndex[k]]);
}
- BigDecimal value = DataTypeUtil.byteToBigDecimal(b);
- String[] bigdVals = value.toPlainString().split("\\.");
- long[] bigDvalue = new long[2];
- if (bigdVals.length == 2) {
- bigDvalue[0] = Long.parseLong(bigdVals[0]);
- BigDecimal bd = new BigDecimal(CarbonCommonConstants.POINT + bigdVals[1]);
- bigDvalue[1] = (long) (bd.doubleValue() * Math.pow(10, value.scale()));
+ }
+ }
+ ByteBuffer byteBuffer = null;
+ byte[] measureBytes = null;
+ for (int i = 0; i < customMeasureIndex.length; i++) {
+ if (null == row[customMeasureIndex[i]]
+ && type[customMeasureIndex[i]] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+ measureBytes = DataTypeUtil.zeroBigDecimalBytes;
+ nullValueIndexBitSet[customMeasureIndex[i]].set(count);
+ } else {
+ if (this.compactionFlow) {
+ BigDecimal bigDecimal = ((Decimal) row[customMeasureIndex[i]]).toJavaBigDecimal();
+ measureBytes = DataTypeUtil.bigDecimalToByte(bigDecimal);
} else {
- bigDvalue[0] = Long.parseLong(bigdVals[0]);
+ measureBytes = (byte[]) row[customMeasureIndex[i]];
}
- byteBuffer = ByteBuffer.allocate(b.length + CarbonCommonConstants.INT_SIZE_IN_BYTE);
- byteBuffer.putInt(b.length);
- byteBuffer.put(b);
- byteBuffer.flip();
- b = byteBuffer.array();
- dataHolder[customMeasureIndex[i]].setWritableByteArrayValueByIndex(count, b);
}
- calculateMaxMin(max, min, decimal, customMeasureIndex, row);
- }
- calculateUniqueValue(min, uniqueValue);
- byte[][] byteArrayValues = keyDataHolder.getByteArrayValues().clone();
- byte[][][] noDictionaryValueHolder = null;
- if ((noDictionaryCount + complexColCount) > 0) {
- noDictionaryValueHolder = noDictionaryKeyDataHolder.getNonDictByteArrayValues();
+ byteBuffer = ByteBuffer.allocate(measureBytes.length +
+ CarbonCommonConstants.INT_SIZE_IN_BYTE);
+ byteBuffer.putInt(measureBytes.length);
+ byteBuffer.put(measureBytes);
+ byteBuffer.flip();
+ measureBytes = byteBuffer.array();
+ measureHolder[customMeasureIndex[i]].setWritableByteArrayValueByIndex(count, measureBytes);
}
- WriterCompressModel compressionModel = ValueCompressionUtil
- .getWriterCompressModel(max, min, decimal, uniqueValue, type, new byte[max.length]);
- byte[][] writableMeasureDataArray =
- StoreFactory.createDataStore(compressionModel).getWritableMeasureDataArray(dataHolder)
- .clone();
- NodeHolder nodeHolder =
- getNodeHolderObject(writableMeasureDataArray, byteArrayValues, dataRows.size(),
- startKey, endKey, compressionModel, noDictionaryValueHolder, noDictStartKey,
- noDictEndKey, nullValueIndexBitSet);
- LOGGER.info("Number Of records processed: " + dataRows.size());
- return nodeHolder;
}
- private NodeHolder getNodeHolderObject(byte[][] dataHolderLocal,
- byte[][] byteArrayValues, int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
- WriterCompressModel compressionModel, byte[][][] noDictionaryData,
- byte[][] noDictionaryStartKey, byte[][] noDictionaryEndKey, BitSet[] nullValueIndexBitSet)
+ private NodeHolder createNodeHolderObjectWithOutKettle(byte[][] measureArray, byte[][] mdKeyArray,
+ byte[][][] noDictionaryArray, int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
+ WriterCompressModel compressionModel, byte[][] noDictionaryStartKey,
+ byte[][] noDictionaryEndKey, BitSet[] nullValueIndexBitSet)
throws CarbonDataWriterException {
byte[][][] noDictionaryColumnsData = null;
List<ArrayList<byte[]>> colsAndValues = new ArrayList<ArrayList<byte[]>>();
@@ -636,19 +706,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
colsAndValues.add(new ArrayList<byte[]>());
}
int noOfColumn = colGrpModel.getNoOfColumnStore();
- DataHolder[] dataHolders = getDataHolders(noOfColumn, byteArrayValues.length);
- for (int i = 0; i < byteArrayValues.length; i++) {
- byte[][] splitKey = columnarSplitter.splitKey(byteArrayValues[i]);
+ DataHolder[] dataHolders = getDataHolders(noOfColumn, mdKeyArray.length);
+ for (int i = 0; i < mdKeyArray.length; i++) {
+ byte[][] splitKey = columnarSplitter.splitKey(mdKeyArray[i]);
for (int j = 0; j < splitKey.length; j++) {
dataHolders[j].addData(splitKey[j], i);
}
}
if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
- noDictionaryColumnsData = new byte[noDictionaryCount][noDictionaryData.length][];
- for (int i = 0; i < noDictionaryData.length; i++) {
+ noDictionaryColumnsData = new byte[noDictionaryCount][noDictionaryArray.length][];
+ for (int i = 0; i < noDictionaryArray.length; i++) {
int complexColumnIndex = primitiveDimLens.length + noDictionaryCount;
- byte[][] splitKey = noDictionaryData[i];
+ byte[][] splitKey = noDictionaryArray[i];
int complexTypeIndex = 0;
for (int j = 0; j < splitKey.length; j++) {
@@ -754,7 +824,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictionaryEndKey);
}
return this.dataWriter
- .buildDataNodeHolder(blockStorage, dataHolderLocal, entryCountLocal, startkeyLocal,
+ .buildDataNodeHolder(blockStorage, measureArray, entryCountLocal, startkeyLocal,
endKeyLocal, compressionModel, composedNonDictStartKey, composedNonDictEndKey,
nullValueIndexBitSet);
}
@@ -914,7 +984,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
for (int i = 0; i < msrIndex.length; i++) {
int count = msrIndex[i];
if (row[count] != null) {
- if (type[count] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+ if (type[count] == CarbonCommonConstants.DOUBLE_MEASURE) {
double value = (double) row[count];
double maxVal = (double) max[count];
double minVal = (double) min[count];
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/store/StoreFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/StoreFactory.java b/processing/src/main/java/org/apache/carbondata/processing/store/StoreFactory.java
deleted file mode 100644
index 0097483..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/StoreFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store;
-
-import org.apache.carbondata.core.datastore.NodeMeasureDataStore;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
-import org.apache.carbondata.core.datastore.impl.data.compressed.HeavyCompressedDoubleArrayDataInMemoryStore;
-
-public final class StoreFactory {
-
- private StoreFactory() {
- }
-
- public static NodeMeasureDataStore createDataStore(WriterCompressModel compressionModel) {
- return new HeavyCompressedDoubleArrayDataInMemoryStore(compressionModel);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
index 227f92b..c8f740b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
@@ -31,7 +31,7 @@ public interface CarbonFactDataWriter<T> {
* file format
* <key><measure1><measure2>....
*
- * @param dataArray measure array
+ * @param measureArray measure array
* @param entryCount number of entries
* @param startKey start key of leaf
* @param endKey end key of leaf
@@ -40,7 +40,7 @@ public interface CarbonFactDataWriter<T> {
* @throws CarbonDataWriterException throws new CarbonDataWriterException if any problem
*/
- NodeHolder buildDataNodeHolder(IndexStorage<T>[] keyStorageArray, byte[][] dataArray,
+ NodeHolder buildDataNodeHolder(IndexStorage<T>[] keyStorageArray, byte[][] measureArray,
int entryCount, byte[] startKey, byte[] endKey, WriterCompressModel compressionModel,
byte[] noDictionaryStartKey, byte[] noDictionaryEndKey, BitSet[] nullValueIndexBitSet)
throws CarbonDataWriterException;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
index cf5311c..64077e2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
@@ -47,9 +47,10 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
}
@Override
- public NodeHolder buildDataNodeHolder(IndexStorage<int[]>[] keyStorageArray, byte[][] dataArray,
- int entryCount, byte[] startKey, byte[] endKey, WriterCompressModel compressionModel,
- byte[] noDictionaryStartKey, byte[] noDictionaryEndKey, BitSet[] nullValueIndexBitSet)
+ public NodeHolder buildDataNodeHolder(IndexStorage<int[]>[] keyStorageArray,
+ byte[][] measureArray, int entryCount, byte[] startKey, byte[] endKey,
+ WriterCompressModel compressionModel, byte[] noDictionaryStartKey,
+ byte[] noDictionaryEndKey, BitSet[] nullValueIndexBitSet)
throws CarbonDataWriterException {
// if there are no NO-Dictionary column present in the table then
// set the empty byte array
@@ -143,13 +144,13 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
int[] msrLength = new int[dataWriterVo.getMeasureCount()];
// calculate the total size required for all the measure and get the
// each measure size
- for (int i = 0; i < dataArray.length; i++) {
- currentMsrLenght = dataArray[i].length;
+ for (int i = 0; i < measureArray.length; i++) {
+ currentMsrLenght = measureArray[i].length;
totalMsrArrySize += currentMsrLenght;
msrLength[i] = currentMsrLenght;
}
NodeHolder holder = new NodeHolder();
- holder.setDataArray(dataArray);
+ holder.setDataArray(measureArray);
holder.setKeyArray(keyBlockData);
holder.setMeasureNullValueIndex(nullValueIndexBitSet);
// end key format will be <length of dictionary key><length of no
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8cca0afc/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index 3b7c763..aa54a4b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -83,7 +83,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
* be written in carbon data file
*/
@Override public NodeHolder buildDataNodeHolder(IndexStorage<short[]>[] keyStorageArray,
- byte[][] dataArray, int entryCount, byte[] startKey, byte[] endKey,
+ byte[][] measureArray, int entryCount, byte[] startKey, byte[] endKey,
WriterCompressModel compressionModel, byte[] noDictionaryStartKey, byte[] noDictionaryEndKey,
BitSet[] nullValueIndexBitSet) throws CarbonDataWriterException {
// if there are no NO-Dictionary column present in the table then
@@ -113,8 +113,8 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
byte[][] dimensionMinValue = new byte[keyStorageArray.length][];
byte[][] dimensionMaxValue = new byte[keyStorageArray.length][];
- byte[][] measureMinValue = new byte[dataArray.length][];
- byte[][] measureMaxValue = new byte[dataArray.length][];
+ byte[][] measureMinValue = new byte[measureArray.length][];
+ byte[][] measureMaxValue = new byte[measureArray.length][];
byte[][] keyBlockData = fillAndCompressedKeyBlockData(keyStorageArray, entryCount);
boolean[] colGrpBlock = new boolean[keyStorageArray.length];
@@ -137,7 +137,7 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
colGrpBlock[i] = true;
}
}
- for (int i = 0; i < dataArray.length; i++) {
+ for (int i = 0; i < measureArray.length; i++) {
measureMaxValue[i] = CarbonMetadataUtil
.getByteValueForMeasure(compressionModel.getMaxValue()[i],
dataWriterVo.getSegmentProperties().getMeasures().get(i).getDataType());
@@ -176,13 +176,13 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
int[] msrLength = new int[dataWriterVo.getMeasureCount()];
// calculate the total size required for all the measure and get the
// each measure size
- for (int i = 0; i < dataArray.length; i++) {
- currentMsrLenght = dataArray[i].length;
+ for (int i = 0; i < measureArray.length; i++) {
+ currentMsrLenght = measureArray[i].length;
totalMsrArrySize += currentMsrLenght;
msrLength[i] = currentMsrLenght;
}
NodeHolder holder = new NodeHolder();
- holder.setDataArray(dataArray);
+ holder.setDataArray(measureArray);
holder.setKeyArray(keyBlockData);
holder.setMeasureNullValueIndex(nullValueIndexBitSet);
// end key format will be <length of dictionary key><length of no