You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/28 06:10:02 UTC
[2/7] carbondata git commit: [CARBONDATA-1098] Change page statistics
use exact type and use column page in writer
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
index 755308e..f5d7dc7 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
@@ -50,14 +50,6 @@ public class DataTypeUtilTest {
}
- @Test public void testGetAggType() {
- assertTrue(getAggType(DataType.DECIMAL) == 'b');
- assertTrue(getAggType(DataType.INT) == 'd');
- assertTrue(getAggType(DataType.LONG) == 'd');
- assertTrue(getAggType(DataType.NULL) == 'n');
-
- }
-
@Test public void testBigDecimalToByte() {
byte[] result = bigDecimalToByte(BigDecimal.ONE);
assertTrue(result == result);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java b/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
index 64651e5..5fc6df9 100644
--- a/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
@@ -24,21 +24,26 @@ import java.util.BitSet;
import java.util.List;
import java.util.UUID;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
-import org.apache.carbondata.core.metadata.ValueEncoderMeta;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedMeasurePage;
import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
+import org.apache.carbondata.core.metadata.CodecMetaFactory;
+import org.apache.carbondata.core.metadata.ColumnPageCodecMeta;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.reader.CarbonFooterReader;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.ColumnSchema;
import junit.framework.TestCase;
-import org.apache.carbondata.format.ColumnSchema;
+import mockit.Mock;
+import mockit.MockUp;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -87,9 +92,7 @@ public class CarbonFooterWriterTest extends TestCase{
int[] colCardinality = CarbonUtil.getFormattedCardinality(cardinalities, wrapperColumnSchema);
SegmentProperties segmentProperties = new SegmentProperties(wrapperColumnSchema, colCardinality);
writer.writeFooter(CarbonMetadataUtil.convertFileFooter(
- infoColumnars,
- 6,
- cardinalities,columnSchema, segmentProperties
+ infoColumnars, cardinalities,columnSchema, segmentProperties
), 0);
CarbonFooterReader metaDataReader = new CarbonFooterReader(filePath, 0);
@@ -125,41 +128,7 @@ public class CarbonFooterWriterTest extends TestCase{
return dimColumn;
}
- /**
- * test writing fact metadata.
- */
- @Test public void testReadFactMetadata() throws IOException {
- deleteFile();
- createFile();
- CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
- List<BlockletInfoColumnar> infoColumnars = getBlockletInfoColumnars();
- int[] cardinalities = new int[] { 2, 4, 5, 7, 9, 10};
- List<ColumnSchema> columnSchema = Arrays.asList(new ColumnSchema[]{getDimensionColumn("IMEI1"),
- getDimensionColumn("IMEI2"),
- getDimensionColumn("IMEI3"),
- getDimensionColumn("IMEI4"),
- getDimensionColumn("IMEI5"),
- getDimensionColumn("IMEI6")});
- List<org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema> wrapperColumnSchema = Arrays.asList(new org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema[]{getWrapperDimensionColumn("IMEI1"),
- getWrapperDimensionColumn("IMEI2"),
- getWrapperDimensionColumn("IMEI3"),
- getWrapperDimensionColumn("IMEI4"),
- getWrapperDimensionColumn("IMEI5"),
- getWrapperDimensionColumn("IMEI6")});
- int[] colCardinality = CarbonUtil.getFormattedCardinality(cardinalities, wrapperColumnSchema);
- SegmentProperties segmentProperties = new SegmentProperties(wrapperColumnSchema, cardinalities);
- writer.writeFooter(CarbonMetadataUtil
- .convertFileFooter(infoColumnars, 6, colCardinality,
- columnSchema,segmentProperties), 0);
-
- CarbonFooterReader metaDataReader = new CarbonFooterReader(filePath, 0);
- List<BlockletInfoColumnar> nodeInfoColumnars =
- CarbonMetadataUtil.convertBlockletInfo(metaDataReader.readFooter());
-
- assertTrue(nodeInfoColumnars.size() == infoColumnars.size());
- }
-
- private List<BlockletInfoColumnar> getBlockletInfoColumnars() {
+ private List<BlockletInfoColumnar> getBlockletInfoColumnars() throws IOException {
BlockletInfoColumnar infoColumnar = new BlockletInfoColumnar();
infoColumnar.setStartKey(new byte[] { 1, 2, 3 });
infoColumnar.setEndKey(new byte[] { 8, 9, 10 });
@@ -179,27 +148,47 @@ public class CarbonFooterWriterTest extends TestCase{
infoColumnar.setMeasureLength(new int[] { 6, 7 });
infoColumnar.setMeasureOffset(new long[] { 33, 99 });
infoColumnar.setAggKeyBlock(new boolean[] { true, true, true, true });
- infoColumnar.setColGrpBlocks(new boolean[] { false, false, false, false });
infoColumnar.setMeasureNullValueIndex(new BitSet[] {new BitSet(),new BitSet()});
+ infoColumnar.setEncodedTablePage(EncodedTablePage.newEmptyInstance());
+
+ final ValueEncoderMeta meta = CodecMetaFactory.createMeta();
+
+ new MockUp<ColumnPageCodecMeta>() {
+ @SuppressWarnings("unused") @Mock
+ public byte[] serialize() {
+ return new byte[]{1,2};
+ }
+ @SuppressWarnings("unused") @Mock
+ public byte[] getMaxAsBytes() {
+ return new byte[]{1,2};
+ }
+ @SuppressWarnings("unused") @Mock
+ public byte[] getMinAsBytes() {
+ return new byte[]{1,2};
+ }
+ @SuppressWarnings("unused") @Mock
+ public DataType getSrcDataType() {
+ return DataType.DOUBLE;
+ }
+
+ };
+
+ new MockUp<EncodedMeasurePage>() {
+ @SuppressWarnings("unused") @Mock
+ public ValueEncoderMeta getMetaData() {
+ return meta;
+ }
+ };
+
+ final EncodedMeasurePage measure = new EncodedMeasurePage(2, new byte[]{0,1}, meta,
+ new BitSet());
+ new MockUp<EncodedTablePage>() {
+ @SuppressWarnings("unused") @Mock
+ public EncodedMeasurePage getMeasure(int measureIndex) {
+ return measure;
+ }
+ };
- ValueEncoderMeta[] metas = new ValueEncoderMeta[2];
- metas[0] = new ValueEncoderMeta();
- metas[0].setMinValue(0);
- metas[0].setMaxValue(44d);
- metas[0].setUniqueValue(0d);
- metas[0].setDecimal(0);
- metas[0].setType(CarbonCommonConstants.DOUBLE_MEASURE);
- metas[0].setDataTypeSelected((byte)0);
- metas[1] = new ValueEncoderMeta();
- metas[1].setMinValue(0);
- metas[1].setMaxValue(55d);
- metas[1].setUniqueValue(0d);
- metas[1].setDecimal(0);
- metas[1].setType(CarbonCommonConstants.DOUBLE_MEASURE);
- metas[1].setDataTypeSelected((byte)0);
-
- MeasurePageStatsVO stats = MeasurePageStatsVO.build(metas);
- infoColumnar.setStats(stats);
List<BlockletInfoColumnar> infoColumnars = new ArrayList<BlockletInfoColumnar>();
infoColumnars.add(infoColumnar);
return infoColumnars;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/format/src/main/thrift/carbondata.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata.thrift b/format/src/main/thrift/carbondata.thrift
index b4cbc4e..8acd0b1 100644
--- a/format/src/main/thrift/carbondata.thrift
+++ b/format/src/main/thrift/carbondata.thrift
@@ -119,7 +119,7 @@ struct DataChunk{
* in Row Major format.
*
* For V3, one data chunk is one page data of 32K rows.
- * For V2 & V1, one data chunk is one blocklet data.
+ * For V2, one data chunk is one blocklet data.
*/
struct DataChunk2{
1: required ChunkCompressionMeta chunk_meta; // The metadata of a chunk
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
index 2980ad3..3ca8cf1 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
@@ -116,8 +116,8 @@ class CarbonHiveSerDe extends AbstractSerDe {
@Override public Writable serialize(Object obj, ObjectInspector objectInspector)
throws SerDeException {
if (!objInspector.getCategory().equals(ObjectInspector.Category.STRUCT)) {
- throw new SerDeException("Cannot serialize " + objInspector.getCategory()
- + ". Can only serialize a struct");
+ throw new SerDeException("Cannot serializeStartKey " + objInspector.getCategory()
+ + ". Can only serializeStartKey a struct");
}
serializedSize += ((StructObjectInspector) objInspector).getAllStructFieldRefs().size();
status = LAST_OPERATION.SERIALIZE;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java b/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java
index be17823..757a342 100644
--- a/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java
+++ b/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java
@@ -87,7 +87,7 @@
// assertEquals("deserialization gives the wrong object", t, row);
//
// // Serialize
-// final ArrayWritable serializedArr = (ArrayWritable) serDe.serialize(row, oi);
+// final ArrayWritable serializedArr = (ArrayWritable) serDe.serializeStartKey(row, oi);
// assertEquals("size correct after serialization", serDe.getSerDeStats().getRawDataSize(),
// serializedArr.get().length);
// assertTrue("serialized object should be equal to starting object",
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/integration/spark-common-test/src/test/resources/complexTypeDecimal.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/complexTypeDecimal.csv b/integration/spark-common-test/src/test/resources/complexTypeDecimal.csv
index 8c90d27..db4bca9 100644
--- a/integration/spark-common-test/src/test/resources/complexTypeDecimal.csv
+++ b/integration/spark-common-test/src/test/resources/complexTypeDecimal.csv
@@ -1,9 +1,9 @@
ID,date,country,name,phonetype,serialname,salary,complex
-1.2,2015/7/23,china,aaa1,phone197,ASD69643,15000,3.113$3.33
-2,2015/7/24,china,aaa2,phone756,ASD42892,15001,3.123$7.33
-4.3,2015/7/26,china,aaa4,phone2435,ASD66902,15003,3.123$56.33
-5,2015/7/27,china,aaa5,phone2441,ASD90633,15004,3.133$5.33
-6.5,2015/7/28,china,aaa6,phone294,ASD59961,15005,3.133$54.33
-8,2015/7/30,china,aaa8,phone1848,ASD57308,15007,32.13$56.33
-9.1,2015/7/18,china,aaa9,phone706,ASD86717,15008,3.213$44.33
-10,2015/7/19,usa,aaa10,phone685,ASD30505,15009,32.13$33.33
\ No newline at end of file
+1.2,2015/07/23,china,aaa1,phone197,ASD69643,15000,3.113$3.33
+2,2015/07/24,china,aaa2,phone756,ASD42892,15001,3.123$7.33
+4.3,2015/07/26,china,aaa4,phone2435,ASD66902,15003,3.123$56.33
+5,2015/07/27,china,aaa5,phone2441,ASD90633,15004,3.133$5.33
+6.5,2015/07/28,china,aaa6,phone294,ASD59961,15005,3.133$54.33
+8,2015/07/30,china,aaa8,phone1848,ASD57308,15007,32.13$56.33
+9.1,2015/07/18,china,aaa9,phone706,ASD86717,15008,3.213$44.33
+10,2015/07/19,usa,aaa10,phone685,ASD30505,15009,32.13$33.33
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/integration/spark-common-test/src/test/resources/complexTypeDecimalNestedHive.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/complexTypeDecimalNestedHive.csv b/integration/spark-common-test/src/test/resources/complexTypeDecimalNestedHive.csv
index ae67723..e8c023b 100644
--- a/integration/spark-common-test/src/test/resources/complexTypeDecimalNestedHive.csv
+++ b/integration/spark-common-test/src/test/resources/complexTypeDecimalNestedHive.csv
@@ -1,4 +1,4 @@
-1.2,2015/7/23,china,aaa1,phone197,ASD69643,15000,3.113:imei$3.33:imsi
+1.2,2015-7-23 00:00:00,china,aaa1,phone197,ASD69643,15000,3.113:imei$3.33:imsi
2,2015/7/24,china,aaa2,phone756,ASD42892,15001,3.123:imei$7.33:imsi
4.3,2015/7/26,china,aaa4,phone2435,ASD66902,15003,3.123:imei$56.33:imsi
5,2015/7/27,china,aaa5,phone2441,ASD90633,15004,3.133:imei$5.33:imsi
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/core/datastore/columnar/ColGroupBlockStorage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/core/datastore/columnar/ColGroupBlockStorage.java b/processing/src/main/java/org/apache/carbondata/core/datastore/columnar/ColGroupBlockStorage.java
deleted file mode 100644
index 70ac3d9..0000000
--- a/processing/src/main/java/org/apache/carbondata/core/datastore/columnar/ColGroupBlockStorage.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.columnar;
-
-import java.util.concurrent.Callable;
-
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-import org.apache.carbondata.processing.store.colgroup.ColGroupDataHolder;
-import org.apache.carbondata.processing.store.colgroup.ColGroupMinMax;
-
-/**
- * it is holder of column group dataPage and also min max for colgroup block dataPage
- */
-public class ColGroupBlockStorage implements IndexStorage, Callable<IndexStorage> {
-
- private byte[][] dataPage;
-
- private ColGroupMinMax colGrpMinMax;
-
- public ColGroupBlockStorage(SegmentProperties segmentProperties, int colGrpIndex,
- byte[][] dataPage) {
- colGrpMinMax = new ColGroupMinMax(segmentProperties, colGrpIndex);
- this.dataPage = dataPage;
- for (int i = 0; i < dataPage.length; i++) {
- colGrpMinMax.add(dataPage[i]);
- }
- }
-
- /**
- * sorting is not required for colgroup storage and hence return true
- */
- @Override public boolean isAlreadySorted() {
- return true;
- }
-
- /**
- * for column group storage its not required
- */
- public ColGroupDataHolder getRowIdPage() {
- //not required for column group storage
- return null;
- }
-
- /**
- * for column group storage its not required
- */
- public ColGroupDataHolder getRowIdRlePage() {
- // not required for column group storage
- return null;
- }
-
- /**
- * for column group storage its not required
- */
- public byte[][] getDataPage() {
- return dataPage;
- }
-
- /**
- * for column group storage its not required
- */
- public ColGroupDataHolder getDataRlePage() {
- //not required for column group
- return null;
- }
-
- /**
- * for column group storage its not required
- */
- @Override public int getTotalSize() {
- return dataPage.length;
- }
-
- @Override public byte[] getMin() {
- return colGrpMinMax.getMin();
- }
-
- @Override public byte[] getMax() {
- return colGrpMinMax.getMax();
- }
-
- /**
- * return self
- */
- @Override public IndexStorage call() throws Exception {
- return this;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
index 50fb4c5..1bcbe54 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
@@ -17,8 +17,12 @@
package org.apache.carbondata.processing.newflow.sort;
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
public class SortStepRowUtil {
public static Object[] convertRow(Object[] data, SortParameters parameters,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
index 912cd3a..aad874b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
@@ -177,7 +177,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
readCounter++;
}
} catch (Exception e) {
- throw new CarbonDataLoadingException("unable to generate the mdkey", e);
+ throw new CarbonDataLoadingException(e);
}
rowCounter.getAndAdd(batch.getSize());
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
index 6528d44..653da7b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
@@ -31,8 +31,8 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
public class IntermediateFileMerger implements Callable<Void> {
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
index bc6640d..11c42a9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
@@ -22,7 +22,7 @@ import java.util.Comparator;
import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
public class RowComparator implements Comparator<Object[]> {
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
index 8d914ea..be29bf8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
@@ -18,7 +18,7 @@ package org.apache.carbondata.processing.sortandgroupby.sortdata;
import java.util.Comparator;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
/**
* This class is used as comparator for comparing dims which are non high cardinality dims.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
index 1bcbaa8..b17c69a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
@@ -35,9 +35,8 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHolder> {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java
index d629cdc..51b3964 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java
@@ -23,8 +23,8 @@ import java.io.IOException;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
public class UnCompressedTempSortFileWriter extends AbstractTempSortFileWriter {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 3ed5888..6ed5d31 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -38,7 +38,7 @@ import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.page.encoding.EncodedData;
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
@@ -51,7 +51,6 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.NodeHolder;
import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
import org.apache.carbondata.processing.store.file.FileManager;
import org.apache.carbondata.processing.store.file.IFileManagerComposite;
@@ -148,8 +147,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private ColumnarFormatVersion version;
- private TablePageEncoder encoder;
-
private SortScopeOptions.SortScope sortScope;
/**
@@ -201,7 +198,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
}
this.version = CarbonProperties.getInstance().getFormatVersion();
- this.encoder = new TablePageEncoder(model);
String noInvertedIdxCol = "";
for (CarbonDimension cd : model.getSegmentProperties().getDimensions()) {
if (!cd.isUseInvertedIndex()) {
@@ -343,35 +339,26 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
/**
- * generate the NodeHolder from the input rows (one page in case of V3 format)
+ * generate the EncodedTablePage from the input rows (one page in case of V3 format)
*/
- private NodeHolder processDataRows(List<CarbonRow> dataRows)
+ private EncodedTablePage processDataRows(List<CarbonRow> dataRows)
throws CarbonDataWriterException, KeyGenException, MemoryException, IOException {
if (dataRows.size() == 0) {
- return new NodeHolder();
+ return EncodedTablePage.newEmptyInstance();
}
TablePage tablePage = new TablePage(model, dataRows.size());
- TablePageKey keys = new TablePageKey(model, dataRows.size());
int rowId = 0;
// convert row to columnar data
for (CarbonRow row : dataRows) {
- tablePage.addRow(rowId, row);
- keys.update(rowId, row);
- rowId++;
+ tablePage.addRow(rowId++, row);
}
- // apply and compress dimensions and measure
- EncodedData encodedData = encoder.encode(tablePage);
-
- TablePageStatistics tablePageStatistics = new TablePageStatistics(
- model.getTableSpec(), tablePage, encodedData, tablePage.getMeasureStats());
-
- NodeHolder nodeHolder = dataWriter.buildDataNodeHolder(encodedData, tablePageStatistics, keys);
+ EncodedTablePage encoded = tablePage.encode();
tablePage.freeMemory();
LOGGER.info("Number Of records processed: " + dataRows.size());
- return nodeHolder;
+ return encoded;
}
/**
@@ -470,7 +457,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
consumerExecutorService.shutdownNow();
processWriteTaskSubmitList(consumerExecutorServiceTaskList);
- this.dataWriter.writeBlockletInfoToFile();
+ this.dataWriter.writeFooterToFile();
LOGGER.info("All blocklets have been finished writing");
// close all the open stream for both the files
this.dataWriter.closeWriter();
@@ -666,7 +653,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
/**
* array of blocklet data holder objects
*/
- private NodeHolder[] nodeHolders;
+ private EncodedTablePage[] encodedTablePages;
/**
* flag to check whether the producer has completed processing for holder
* object which is required to be picked form an index
@@ -678,7 +665,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
private int currentIndex;
private BlockletDataHolder() {
- nodeHolders = new NodeHolder[numberOfCores];
+ encodedTablePages = new EncodedTablePage[numberOfCores];
available = new AtomicBoolean(false);
}
@@ -686,32 +673,32 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* @return a node holder object
* @throws InterruptedException if consumer thread is interrupted
*/
- public synchronized NodeHolder get() throws InterruptedException {
- NodeHolder nodeHolder = nodeHolders[currentIndex];
+ public synchronized EncodedTablePage get() throws InterruptedException {
+ EncodedTablePage encodedTablePage = encodedTablePages[currentIndex];
// if node holder is null means producer thread processing the data which has to
// be inserted at this current index has not completed yet
- if (null == nodeHolder && !processingComplete) {
+ if (null == encodedTablePage && !processingComplete) {
available.set(false);
}
while (!available.get()) {
wait();
}
- nodeHolder = nodeHolders[currentIndex];
- nodeHolders[currentIndex] = null;
+ encodedTablePage = encodedTablePages[currentIndex];
+ encodedTablePages[currentIndex] = null;
currentIndex++;
// reset current index when it reaches length of node holder array
- if (currentIndex >= nodeHolders.length) {
+ if (currentIndex >= encodedTablePages.length) {
currentIndex = 0;
}
- return nodeHolder;
+ return encodedTablePage;
}
/**
- * @param nodeHolder
+ * @param encodedTablePage
* @param index
*/
- public synchronized void put(NodeHolder nodeHolder, int index) {
- nodeHolders[index] = nodeHolder;
+ public synchronized void put(EncodedTablePage encodedTablePage, int index) {
+ encodedTablePages[index] = encodedTablePage;
// notify the consumer thread when index at which object is to be inserted
// becomes equal to current index from where data has to be picked for writing
if (index == currentIndex) {
@@ -729,14 +716,14 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
private BlockletDataHolder blockletDataHolder;
private List<CarbonRow> dataRows;
private int sequenceNumber;
- private boolean isWriteAll;
+ private boolean isLastPage;
private Producer(BlockletDataHolder blockletDataHolder, List<CarbonRow> dataRows,
- int sequenceNumber, boolean isWriteAll) {
+ int sequenceNumber, boolean isLastPage) {
this.blockletDataHolder = blockletDataHolder;
this.dataRows = dataRows;
this.sequenceNumber = sequenceNumber;
- this.isWriteAll = isWriteAll;
+ this.isLastPage = isLastPage;
}
/**
@@ -747,11 +734,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
@Override public Void call() throws Exception {
try {
- NodeHolder nodeHolder = processDataRows(dataRows);
- nodeHolder.setWriteAll(isWriteAll);
+ EncodedTablePage encodedTablePage = processDataRows(dataRows);
+ encodedTablePage.setIsLastPage(isLastPage);
// insert the object in array according to sequence number
int indexInNodeHolderArray = (sequenceNumber - 1) % numberOfCores;
- blockletDataHolder.put(nodeHolder, indexInNodeHolderArray);
+ blockletDataHolder.put(encodedTablePage, indexInNodeHolderArray);
return null;
} catch (Throwable throwable) {
LOGGER.error(throwable, "Error in producer");
@@ -781,11 +768,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
@Override public Void call() throws Exception {
while (!processingComplete || blockletProcessingCount.get() > 0) {
- NodeHolder nodeHolder = null;
+ EncodedTablePage encodedTablePage = null;
try {
- nodeHolder = blockletDataHolder.get();
- if (null != nodeHolder) {
- dataWriter.writeBlockletData(nodeHolder);
+ encodedTablePage = blockletDataHolder.get();
+ if (null != encodedTablePage) {
+ dataWriter.writeTablePage(encodedTablePage);
}
blockletProcessingCount.decrementAndGet();
} catch (Throwable throwable) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 7930763..c5a9bec 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -22,23 +22,45 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import org.apache.carbondata.core.datastore.DimensionType;
import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForInt;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
-import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingStrategy;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedDimensionPage;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedMeasurePage;
+import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategy;
+import org.apache.carbondata.core.datastore.page.key.TablePageKey;
+import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.datastore.page.statistics.VarLengthPageStatsCollector;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
import org.apache.spark.sql.types.Decimal;
-
/**
* Represent a page data for all columns, we store its data in columnar layout, so that
* all processing apply to TablePage can be done in vectorized fashion.
@@ -55,24 +77,30 @@ public class TablePage {
private ComplexColumnPage[] complexDimensionPage;
private ColumnPage[] measurePage;
- private MeasurePageStatsVO measurePageStatistics;
-
// the num of rows in this page, it must be less than short value (65536)
private int pageSize;
private CarbonFactDataHandlerModel model;
+ private TablePageKey key;
+
+ private ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+
TablePage(CarbonFactDataHandlerModel model, int pageSize) throws MemoryException {
this.model = model;
this.pageSize = pageSize;
int numDictDimension = model.getMDKeyGenerator().getDimCount();
dictDimensionPage = new ColumnPage[numDictDimension];
for (int i = 0; i < dictDimensionPage.length; i++) {
- dictDimensionPage[i] = ColumnPage.newVarLengthPage(DataType.BYTE_ARRAY, pageSize);
+ ColumnPage page = ColumnPage.newVarLengthPage(DataType.BYTE_ARRAY, pageSize);
+ page.setStatsCollector(VarLengthPageStatsCollector.newInstance());
+ dictDimensionPage[i] = page;
}
noDictDimensionPage = new ColumnPage[model.getNoDictionaryCount()];
for (int i = 0; i < noDictDimensionPage.length; i++) {
- noDictDimensionPage[i] = ColumnPage.newVarLengthPage(DataType.BYTE_ARRAY, pageSize);
+ ColumnPage page = ColumnPage.newVarLengthPage(DataType.BYTE_ARRAY, pageSize);
+ page.setStatsCollector(VarLengthPageStatsCollector.newInstance());
+ noDictDimensionPage[i] = page;
}
complexDimensionPage = new ComplexColumnPage[model.getComplexColumnCount()];
for (int i = 0; i < complexDimensionPage.length; i++) {
@@ -84,22 +112,33 @@ public class TablePage {
DataType[] dataTypes = model.getMeasureDataType();
TableSpec.MeasureSpec measureSpec = model.getTableSpec().getMeasureSpec();
for (int i = 0; i < measurePage.length; i++) {
- measurePage[i] = ColumnPage
+ ColumnPage page = ColumnPage
.newPage(dataTypes[i], pageSize, measureSpec.getScale(i), measureSpec.getPrecision(i));
+ page.setStatsCollector(PrimitivePageStatsCollector.newInstance(dataTypes[i], pageSize));
+ measurePage[i] = page;
}
+ boolean hasNoDictionary = noDictDimensionPage.length > 0;
+ this.key = new TablePageKey(pageSize, model.getMDKeyGenerator(), model.getSegmentProperties(),
+ hasNoDictionary);
}
/**
- * Add one row to the internal store, it will be converted into columnar layout
+ * Add one row to the internal store
*
* @param rowId Id of the input row
* @param row row object
*/
public void addRow(int rowId, CarbonRow row) throws KeyGenException {
- // convert each column category
+ // convert each column category, update key and stats
+ byte[] mdk = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
+ convertToColumnarAndAddToPages(rowId, row, mdk);
+ key.update(rowId, row, mdk);
+ }
+ // convert the input row object to columnar data and add to column pages
+ private void convertToColumnarAndAddToPages(int rowId, CarbonRow row, byte[] mdk)
+ throws KeyGenException {
// 1. convert dictionary columns
- byte[] mdk = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
byte[][] keys = model.getSegmentProperties().getFixedLengthKeySplitter().splitKey(mdk);
for (int i = 0; i < dictDimensionPage.length; i++) {
dictDimensionPage[i].putData(rowId, keys[i]);
@@ -137,11 +176,6 @@ public class TablePage {
}
measurePage[i].putData(rowId, value);
}
-
- // update statistics if it is last row
- if (rowId + 1 == pageSize) {
- this.measurePageStatistics = new MeasurePageStatsVO(measurePage);
- }
}
/**
@@ -160,10 +194,10 @@ public class TablePage {
// initialize the page if first row
if (rowId == 0) {
int depthInComplexColumn = complexDataType.getColsCount();
- getComplexDimensionPage()[index] = new ComplexColumnPage(pageSize, depthInComplexColumn);
+ complexDimensionPage[index] = new ComplexColumnPage(pageSize, depthInComplexColumn);
}
- int depthInComplexColumn = getComplexDimensionPage()[index].getDepth();
+ int depthInComplexColumn = complexDimensionPage[index].getDepth();
// this is the result columnar data which will be added to page,
// size of this list is the depth of complex column, we will fill it by input data
List<ArrayList<byte[]>> encodedComplexColumnar = new ArrayList<>();
@@ -187,8 +221,7 @@ public class TablePage {
}
for (int depth = 0; depth < depthInComplexColumn; depth++) {
- getComplexDimensionPage()[index]
- .putComplexData(rowId, depth, encodedComplexColumnar.get(depth));
+ complexDimensionPage[index].putComplexData(rowId, depth, encodedComplexColumnar.get(depth));
}
}
@@ -217,26 +250,161 @@ public class TablePage {
return output;
}
- ColumnPage[] getDictDimensionPage() {
- return dictDimensionPage;
+ EncodedTablePage encode() throws KeyGenException, MemoryException, IOException {
+ // encode dimensions and measure
+ EncodedDimensionPage[] dimensions = encodeAndCompressDimensions();
+ EncodedMeasurePage[] measures = encodeAndCompressMeasures();
+ return EncodedTablePage.newInstance(pageSize, dimensions, measures, key);
}
- ColumnPage[] getNoDictDimensionPage() {
- return noDictDimensionPage;
+ private EncodingStrategy encodingStrategy = new DefaultEncodingStrategy();
+
+ // apply measure and set encodedData in `encodedData`
+ private EncodedMeasurePage[] encodeAndCompressMeasures()
+ throws MemoryException, IOException {
+ EncodedMeasurePage[] encodedMeasures = new EncodedMeasurePage[measurePage.length];
+ for (int i = 0; i < measurePage.length; i++) {
+ SimpleStatsResult stats = (SimpleStatsResult)(measurePage[i].getStatistics());
+ ColumnPageCodec encoder = encodingStrategy.createCodec(stats);
+ encodedMeasures[i] = (EncodedMeasurePage) encoder.encode(measurePage[i]);
+ }
+ return encodedMeasures;
+ }
+
+ private IndexStorage encodeAndCompressDictDimension(byte[][] data, boolean isSort,
+ boolean isUseInvertedIndex, boolean isRleApplicable) throws KeyGenException {
+ if (isUseInvertedIndex) {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForShort(data, isRleApplicable, false, isSort);
+ } else {
+ return new BlockIndexerStorageForInt(data, isRleApplicable, false, isSort);
+ }
+ } else {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+ } else {
+ return new BlockIndexerStorageForNoInvertedIndexForInt(data);
+ }
+ }
}
- ComplexColumnPage[] getComplexDimensionPage() {
- return complexDimensionPage;
+ private IndexStorage encodeAndCompressDirectDictDimension(byte[][] data, boolean isSort,
+ boolean isUseInvertedIndex, boolean isRleApplicable) throws KeyGenException {
+ if (isUseInvertedIndex) {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForShort(data, isRleApplicable, false, isSort);
+ } else {
+ return new BlockIndexerStorageForInt(data, isRleApplicable, false, isSort);
+ }
+ } else {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+ } else {
+ return new BlockIndexerStorageForNoInvertedIndexForInt(data);
+ }
+ }
}
- ColumnPage[] getMeasurePage() {
- return measurePage;
+ private IndexStorage encodeAndCompressComplexDimension(byte[][] data) {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForShort(data, false, false, false);
+ } else {
+ return new BlockIndexerStorageForInt(data, false, false, false);
+ }
}
- MeasurePageStatsVO getMeasureStats() {
- return measurePageStatistics;
+ private IndexStorage encodeAndCompressNoDictDimension(byte[][] data, boolean isSort,
+ boolean isUseInvertedIndex, boolean isRleApplicable) {
+ if (isUseInvertedIndex) {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForShort(data, isRleApplicable, true, isSort);
+ } else {
+ return new BlockIndexerStorageForInt(data, isRleApplicable, true, isSort);
+ }
+ } else {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForNoInvertedIndexForShort(data, true);
+ } else {
+ return new BlockIndexerStorageForNoInvertedIndexForInt(data);
+ }
+ }
}
+ // apply and compress each dimension, set encoded data in `encodedData`
+ private EncodedDimensionPage[] encodeAndCompressDimensions()
+ throws KeyGenException {
+ TableSpec.DimensionSpec dimensionSpec = model.getTableSpec().getDimensionSpec();
+ int dictionaryColumnCount = -1;
+ int noDictionaryColumnCount = -1;
+ int indexStorageOffset = 0;
+ IndexStorage[] indexStorages = new IndexStorage[dimensionSpec.getNumExpandedDimensions()];
+ Compressor compressor = CompressorFactory.getInstance().getCompressor();
+ EncodedDimensionPage[] compressedColumns = new EncodedDimensionPage[indexStorages.length];
+ boolean[] isUseInvertedIndex = model.getIsUseInvertedIndex();
+ for (int i = 0; i < dimensionSpec.getNumSimpleDimensions(); i++) {
+ ColumnPage page;
+ byte[] flattened;
+ boolean isSortColumn = model.isSortColumn(i);
+ switch (dimensionSpec.getType(i)) {
+ case GLOBAL_DICTIONARY:
+ // dictionary dimension
+ page = dictDimensionPage[++dictionaryColumnCount];
+ indexStorages[indexStorageOffset] = encodeAndCompressDictDimension(
+ page.getByteArrayPage(),
+ isSortColumn,
+ isUseInvertedIndex[i] & isSortColumn,
+ CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i)));
+ flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage());
+ break;
+ case DIRECT_DICTIONARY:
+ // timestamp and date column
+ page = dictDimensionPage[++dictionaryColumnCount];
+ indexStorages[indexStorageOffset] = encodeAndCompressDirectDictDimension(
+ page.getByteArrayPage(),
+ isSortColumn,
+ isUseInvertedIndex[i] & isSortColumn,
+ CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i)));
+ flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage());
+ break;
+ case PLAIN_VALUE:
+ // high cardinality dimension, encoded as plain string
+ page = noDictDimensionPage[++noDictionaryColumnCount];
+ indexStorages[indexStorageOffset] = encodeAndCompressNoDictDimension(
+ page.getByteArrayPage(),
+ isSortColumn,
+ isUseInvertedIndex[i] & isSortColumn,
+ CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i)));
+ flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage());
+ break;
+ case COMPLEX:
+ // we need to add complex column at last, so skipping it here
+ continue;
+ default:
+ throw new RuntimeException("unsupported dimension type: " + dimensionSpec.getType(i));
+ }
+ byte[] compressedData = compressor.compressByte(flattened);
+ compressedColumns[indexStorageOffset] = new EncodedDimensionPage(
+ pageSize, compressedData, indexStorages[indexStorageOffset], dimensionSpec.getType(i));
+ SimpleStatsResult stats = (SimpleStatsResult) page.getStatistics();
+ compressedColumns[indexStorageOffset].setNullBitSet(stats.getNullBits());
+ indexStorageOffset++;
+ }
+
+ // handle complex type column
+ for (int i = 0; i < dimensionSpec.getNumComplexDimensions(); i++) {
+ Iterator<byte[][]> iterator = complexDimensionPage[i].iterator();
+ while (iterator.hasNext()) {
+ byte[][] data = iterator.next();
+ indexStorages[indexStorageOffset] = encodeAndCompressComplexDimension(data);
+ byte[] flattened = ByteUtil.flatten(data);
+ byte[] compressedData = compressor.compressByte(flattened);
+ compressedColumns[indexStorageOffset] = new EncodedDimensionPage(
+ pageSize, compressedData, indexStorages[indexStorageOffset], DimensionType.COMPLEX);
+ indexStorageOffset++;
+ }
+ }
+ return compressedColumns;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
deleted file mode 100644
index 8547845..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForInt;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
-import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingStrategy;
-import org.apache.carbondata.core.datastore.page.encoding.EncodedData;
-import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategy;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
-import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-class TablePageEncoder {
-
- private ColumnarFormatVersion version;
-
- private boolean[] isUseInvertedIndex;
-
- private CarbonFactDataHandlerModel model;
-
- private static final EncodingStrategy encodingStrategy = new DefaultEncodingStrategy();
-
- TablePageEncoder(CarbonFactDataHandlerModel model) {
- this.version = CarbonProperties.getInstance().getFormatVersion();
- this.model = model;
- this.isUseInvertedIndex = model.getIsUseInvertedIndex();
- }
-
- // function to apply all columns in one table page
- EncodedData encode(TablePage tablePage)
- throws KeyGenException, MemoryException, IOException {
- EncodedData encodedData = new EncodedData();
- encodeAndCompressDimensions(tablePage, encodedData);
- encodeAndCompressMeasures(tablePage, encodedData);
- return encodedData;
- }
-
- // apply measure and set encodedData in `encodedData`
- private void encodeAndCompressMeasures(TablePage tablePage, EncodedData encodedData)
- throws MemoryException, IOException {
- ColumnPage[] measurePage = tablePage.getMeasurePage();
- byte[][] encodedMeasures = new byte[measurePage.length][];
- for (int i = 0; i < measurePage.length; i++) {
- ColumnPageCodec encoder = encodingStrategy.createCodec(measurePage[i].getStatistics());
- encodedMeasures[i] = encoder.encode(measurePage[i]);
- }
- encodedData.measures = encodedMeasures;
- }
-
- private IndexStorage encodeAndCompressDictDimension(byte[][] data, boolean isSort,
- boolean isUseInvertedIndex, boolean isRleApplicable) throws KeyGenException {
- if (isUseInvertedIndex) {
- if (version == ColumnarFormatVersion.V3) {
- return new BlockIndexerStorageForShort(data, isRleApplicable, false, isSort);
- } else {
- return new BlockIndexerStorageForInt(data, isRleApplicable, false, isSort);
- }
- } else {
- if (version == ColumnarFormatVersion.V3) {
- return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
- } else {
- return new BlockIndexerStorageForNoInvertedIndexForInt(data);
- }
- }
- }
-
- private IndexStorage encodeAndCompressDirectDictDimension(byte[][] data, boolean isSort,
- boolean isUseInvertedIndex, boolean isRleApplicable) throws KeyGenException {
- if (isUseInvertedIndex) {
- if (version == ColumnarFormatVersion.V3) {
- return new BlockIndexerStorageForShort(data, isRleApplicable, false, isSort);
- } else {
- return new BlockIndexerStorageForInt(data, isRleApplicable, false, isSort);
- }
- } else {
- if (version == ColumnarFormatVersion.V3) {
- return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
- } else {
- return new BlockIndexerStorageForNoInvertedIndexForInt(data);
- }
- }
- }
-
- private IndexStorage encodeAndCompressComplexDimension(byte[][] data) {
- if (version == ColumnarFormatVersion.V3) {
- return new BlockIndexerStorageForShort(data, false, false, false);
- } else {
- return new BlockIndexerStorageForInt(data, false, false, false);
- }
- }
-
- private IndexStorage encodeAndCompressNoDictDimension(byte[][] data, boolean isSort,
- boolean isUseInvertedIndex, boolean isRleApplicable) {
- if (isUseInvertedIndex) {
- if (version == ColumnarFormatVersion.V3) {
- return new BlockIndexerStorageForShort(data, isRleApplicable, true, isSort);
- } else {
- return new BlockIndexerStorageForInt(data, isRleApplicable, true, isSort);
- }
- } else {
- if (version == ColumnarFormatVersion.V3) {
- return new BlockIndexerStorageForNoInvertedIndexForShort(data, true);
- } else {
- return new BlockIndexerStorageForNoInvertedIndexForInt(data);
- }
- }
- }
-
- // apply and compress each dimension, set encoded data in `encodedData`
- private void encodeAndCompressDimensions(TablePage tablePage, EncodedData encodedData)
- throws KeyGenException {
- TableSpec.DimensionSpec dimensionSpec = model.getTableSpec().getDimensionSpec();
- int dictionaryColumnCount = -1;
- int noDictionaryColumnCount = -1;
- int indexStorageOffset = 0;
- IndexStorage[] indexStorages = new IndexStorage[dimensionSpec.getNumExpandedDimensions()];
- Compressor compressor = CompressorFactory.getInstance().getCompressor();
- byte[][] compressedColumns = new byte[indexStorages.length][];
- for (int i = 0; i < dimensionSpec.getNumSimpleDimensions(); i++) {
- byte[] flattened;
- boolean isSortColumn = model.isSortColumn(i);
- switch (dimensionSpec.getType(i)) {
- case GLOBAL_DICTIONARY:
- // dictionary dimension
- indexStorages[indexStorageOffset] = encodeAndCompressDictDimension(
- tablePage.getDictDimensionPage()[++dictionaryColumnCount].getByteArrayPage(),
- isSortColumn, isUseInvertedIndex[i] & isSortColumn,
- CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i)));
- flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage());
- break;
- case DIRECT_DICTIONARY:
- // timestamp and date column
- indexStorages[indexStorageOffset] = encodeAndCompressDirectDictDimension(
- tablePage.getDictDimensionPage()[++dictionaryColumnCount].getByteArrayPage(),
- isSortColumn, isUseInvertedIndex[i] & isSortColumn,
- CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i)));
- flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage());
- break;
- case PLAIN_VALUE:
- // high cardinality dimension, encoded as plain string
- indexStorages[indexStorageOffset] = encodeAndCompressNoDictDimension(
- tablePage.getNoDictDimensionPage()[++noDictionaryColumnCount].getByteArrayPage(),
- isSortColumn, isUseInvertedIndex[i] & isSortColumn,
- CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i)));
- flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage());
- break;
- case COMPLEX:
- // we need to add complex column at last, so skipping it here
- continue;
- default:
- throw new RuntimeException("unsupported dimension type: " + dimensionSpec.getType(i));
- }
- compressedColumns[indexStorageOffset] = compressor.compressByte(flattened);
- indexStorageOffset++;
- }
-
- // handle complex type column
- for (int i = 0; i < dimensionSpec.getNumComplexDimensions(); i++) {
- Iterator<byte[][]> iterator = tablePage.getComplexDimensionPage()[i].iterator();
- while (iterator.hasNext()) {
- byte[][] data = iterator.next();
- indexStorages[indexStorageOffset] = encodeAndCompressComplexDimension(data);
- byte[] flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage());
- compressedColumns[indexStorageOffset] = compressor.compressByte(flattened);
- indexStorageOffset++;
- }
- }
-
- encodedData.indexStorages = indexStorages;
- encodedData.dimensions = compressedColumns;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java
deleted file mode 100644
index a66575e..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
-
-public class TablePageKey {
- private int pageSize;
-
- private byte[][] currentNoDictionaryKey;
-
- // MDK start key
- private byte[] startKey;
-
- // MDK end key
- private byte[] endKey;
-
- // startkey for no dictionary columns
- private byte[][] noDictStartKey;
-
- // endkey for no diciotn
- private byte[][] noDictEndKey;
-
- // startkey for no dictionary columns after packing into one column
- private byte[] packedNoDictStartKey;
-
- // endkey for no dictionary columns after packing into one column
- private byte[] packedNoDictEndKey;
-
- private CarbonFactDataHandlerModel model;
-
- TablePageKey(CarbonFactDataHandlerModel model, int pageSize) {
- this.model = model;
- this.pageSize = pageSize;
- }
-
- /** update all keys based on the input row */
- void update(int rowId, CarbonRow row) throws KeyGenException {
- if (model.getNoDictionaryCount() > 0 || model.getComplexIndexMap().size() > 0) {
- currentNoDictionaryKey = WriteStepRowUtil.getNoDictAndComplexDimension(row);
- }
- if (rowId == 0) {
- startKey = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
- noDictStartKey = currentNoDictionaryKey;
- }
- noDictEndKey = currentNoDictionaryKey;
- if (rowId == pageSize - 1) {
- endKey = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
- finalizeKeys();
- }
- }
-
- /** update all keys if SORT_COLUMNS option is used when creating table */
- private void finalizeKeys() {
- // If SORT_COLUMNS is used, may need to update start/end keys since the they may
- // contains dictionary columns that are not in SORT_COLUMNS, which need to be removed from
- // start/end key
- int numberOfDictSortColumns = model.getSegmentProperties().getNumberOfDictSortColumns();
- if (numberOfDictSortColumns > 0) {
- // if SORT_COLUMNS contain dictionary columns
- int[] keySize = model.getSegmentProperties().getFixedLengthKeySplitter().getBlockKeySize();
- if (keySize.length > numberOfDictSortColumns) {
- // if there are some dictionary columns that are not in SORT_COLUMNS, it will come to here
- int newMdkLength = 0;
- for (int i = 0; i < numberOfDictSortColumns; i++) {
- newMdkLength += keySize[i];
- }
- byte[] newStartKeyOfSortKey = new byte[newMdkLength];
- byte[] newEndKeyOfSortKey = new byte[newMdkLength];
- System.arraycopy(startKey, 0, newStartKeyOfSortKey, 0, newMdkLength);
- System.arraycopy(endKey, 0, newEndKeyOfSortKey, 0, newMdkLength);
- startKey = newStartKeyOfSortKey;
- endKey = newEndKeyOfSortKey;
- }
- } else {
- startKey = new byte[0];
- endKey = new byte[0];
- }
-
- // Do the same update for noDictionary start/end Key
- int numberOfNoDictSortColumns = model.getSegmentProperties().getNumberOfNoDictSortColumns();
- if (numberOfNoDictSortColumns > 0) {
- // if sort_columns contain no-dictionary columns
- if (noDictStartKey.length > numberOfNoDictSortColumns) {
- byte[][] newNoDictionaryStartKey = new byte[numberOfNoDictSortColumns][];
- byte[][] newNoDictionaryEndKey = new byte[numberOfNoDictSortColumns][];
- System.arraycopy(
- noDictStartKey, 0, newNoDictionaryStartKey, 0, numberOfNoDictSortColumns);
- System.arraycopy(
- noDictEndKey, 0, newNoDictionaryEndKey, 0, numberOfNoDictSortColumns);
- noDictStartKey = newNoDictionaryStartKey;
- noDictEndKey = newNoDictionaryEndKey;
- }
- packedNoDictStartKey =
- NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictStartKey);
- packedNoDictEndKey =
- NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictEndKey);
- }
- }
-
- public byte[] getStartKey() {
- return startKey;
- }
-
- public byte[] getEndKey() {
- return endKey;
- }
-
- public byte[] getNoDictStartKey() {
- return packedNoDictStartKey;
- }
-
- public byte[] getNoDictEndKey() {
- return packedNoDictEndKey;
- }
-
- public int getPageSize() {
- return pageSize;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
deleted file mode 100644
index 13eaac9..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store;
-
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.encoding.EncodedData;
-import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
-import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
-
-// Statistics of dimension and measure column in a TablePage
-public class TablePageStatistics {
-
- // number of dimension after complex column expanded
- private int numDimensionsExpanded;
-
- // min of each dimension column
- private byte[][] dimensionMinValue;
-
- // max of each dimension column
- private byte[][] dimensionMaxValue;
-
- // min of each measure column
- private byte[][] measureMinValue;
-
- // max os each measure column
- private byte[][] measureMaxValue;
-
- // null bit set for each measure column
- private BitSet[] nullBitSet;
-
- // measure stats
- // TODO: there are redundant stats
- private MeasurePageStatsVO measurePageStatistics;
-
- private TableSpec tableSpec;
-
- TablePageStatistics(TableSpec tableSpec, TablePage tablePage,
- EncodedData encodedData, MeasurePageStatsVO measurePageStatistics) {
- this.numDimensionsExpanded = tableSpec.getDimensionSpec().getNumExpandedDimensions();
- int numMeasures = tableSpec.getMeasureSpec().getNumMeasures();
- this.dimensionMinValue = new byte[numDimensionsExpanded][];
- this.dimensionMaxValue = new byte[numDimensionsExpanded][];
- this.measureMinValue = new byte[numMeasures][];
- this.measureMaxValue = new byte[numMeasures][];
- this.nullBitSet = new BitSet[numMeasures];
- this.tableSpec = tableSpec;
- this.measurePageStatistics = measurePageStatistics;
- updateMinMax(tablePage, encodedData);
- updateNullBitSet(tablePage);
- }
-
- private void updateMinMax(TablePage tablePage, EncodedData encodedData) {
- IndexStorage[] keyStorageArray = encodedData.indexStorages;
- byte[][] measureArray = encodedData.measures;
-
- for (int i = 0; i < numDimensionsExpanded; i++) {
- switch (tableSpec.getDimensionSpec().getType(i)) {
- case GLOBAL_DICTIONARY:
- case DIRECT_DICTIONARY:
- case COLUMN_GROUP:
- case COMPLEX:
- dimensionMinValue[i] = keyStorageArray[i].getMin();
- dimensionMaxValue[i] = keyStorageArray[i].getMax();
- break;
- case PLAIN_VALUE:
- dimensionMinValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMin());
- dimensionMaxValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMax());
- break;
- }
- }
- for (int i = 0; i < measureArray.length; i++) {
- ColumnPageStatsVO stats = tablePage.getMeasurePage()[i].getStatistics();
- measureMaxValue[i] = stats.minBytes();
- measureMinValue[i] = stats.maxBytes();
- }
- }
-
- private void updateNullBitSet(TablePage tablePage) {
- nullBitSet = new BitSet[tablePage.getMeasurePage().length];
- ColumnPage[] measurePages = tablePage.getMeasurePage();
- for (int i = 0; i < nullBitSet.length; i++) {
- nullBitSet[i] = measurePages[i].getNullBitSet();
- }
- }
-
- /**
- * Below method will be used to update the min or max value
- * by removing the length from it
- *
- * @return min max value without length
- */
- private byte[] updateMinMaxForNoDictionary(byte[] valueWithLength) {
- ByteBuffer buffer = ByteBuffer.wrap(valueWithLength);
- byte[] actualValue = new byte[buffer.getShort()];
- buffer.get(actualValue);
- return actualValue;
- }
-
- public byte[][] getDimensionMinValue() {
- return dimensionMinValue;
- }
-
- public byte[][] getDimensionMaxValue() {
- return dimensionMaxValue;
- }
-
- public byte[][] getMeasureMinValue() {
- return measureMinValue;
- }
-
- public byte[][] getMeasureMaxValue() {
- return measureMaxValue;
- }
-
- public BitSet[] getNullBitSet() {
- return nullBitSet;
- }
-
- public MeasurePageStatsVO getMeasurePageStatistics() {
- return measurePageStatistics;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 3756273..b83a82a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
import org.apache.carbondata.core.metadata.CarbonMetadata;
@@ -583,7 +584,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
*
* @throws CarbonDataWriterException
*/
- @Override public void writeBlockletInfoToFile() throws CarbonDataWriterException {
+ @Override public void writeFooterToFile() throws CarbonDataWriterException {
if (this.blockletInfoList.size() > 0) {
writeBlockletInfoToFile(fileChannel, carbonDataFileTempPath);
}
@@ -597,7 +598,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
* @throws CarbonDataWriterException
* @throws CarbonDataWriterException throws new CarbonDataWriterException if any problem
*/
- public abstract void writeBlockletData(NodeHolder nodeHolder) throws CarbonDataWriterException;
+ public abstract void writeTablePage(EncodedTablePage encodedTablePage)
+ throws CarbonDataWriterException;
/**
* Below method will be used to update the min or max value
@@ -613,36 +615,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
}
/**
- * Below method will be used to update the no dictionary start and end key
- *
- * @param key key to be updated
- * @return return no dictionary key
- */
- protected byte[] updateNoDictionaryStartAndEndKey(byte[] key) {
- if (key.length == 0) {
- return key;
- }
- // add key to byte buffer remove the length part of the data
- ByteBuffer buffer = ByteBuffer.wrap(key, 2, key.length - 2);
- // create a output buffer without length
- ByteBuffer output = ByteBuffer.allocate(key.length - 2);
- short numberOfByteToStorLength = 2;
- // as length part is removed, so each no dictionary value index
- // needs to be reshuffled by 2 bytes
- int numberOfNoDictSortColumns =
- dataWriterVo.getSegmentProperties().getNumberOfNoDictSortColumns();
- for (int i = 0; i < numberOfNoDictSortColumns; i++) {
- output.putShort((short) (buffer.getShort() - numberOfByteToStorLength));
- }
- // copy the data part
- while (buffer.hasRemaining()) {
- output.put(buffer.get());
- }
- output.rewind();
- return output.array();
- }
-
- /**
* This method will copy the carbon data file from local store location to
* carbon store location
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
index 56ee762..f194f74 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
@@ -18,35 +18,21 @@
package org.apache.carbondata.processing.store.writer;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.page.encoding.EncodedData;
-import org.apache.carbondata.core.util.NodeHolder;
-import org.apache.carbondata.processing.store.TablePageKey;
-import org.apache.carbondata.processing.store.TablePageStatistics;
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
public interface CarbonFactDataWriter<T> {
/**
- * This method will be used to create NodeHolder for a table page
+ * write a encoded table page
*/
-
- NodeHolder buildDataNodeHolder(EncodedData encoded, TablePageStatistics stats,
- TablePageKey key) throws CarbonDataWriterException;
-
- /**
- * If node holder flag is enabled the object will be added to list
- * and all the blocklets will be return together. If disabled then this
- * method will itself will call for writing the fact data
- *
- * @param holder
- */
- void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException;
+ void writeTablePage(EncodedTablePage encodedTablePage) throws CarbonDataWriterException;
/**
* Below method will be used to write the leaf meta data to file
*
* @throws CarbonDataWriterException
*/
- void writeBlockletInfoToFile() throws CarbonDataWriterException;
+ void writeFooterToFile() throws CarbonDataWriterException;
/**
* Below method will be used to initialise the writer