You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/09/14 09:20:41 UTC
[48/54] [abbrv] carbondata git commit: [CARBONDATA-1400] Fix bug of
array column out of bound when writing carbondata file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
new file mode 100644
index 0000000..f08444b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingCodec;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec;
+import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
+import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.ComplexDimensionIndexCodec;
+import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.DictDimensionIndexCodec;
+import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.DirectDictDimensionIndexCodec;
+import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.HighCardDictDimensionIndexCodec;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Default factory will select encoding base on column page data type and statistics
+ */
+public class DefaultEncodingFactory extends EncodingFactory {
+
+ private static final int THREE_BYTES_MAX = (int) Math.pow(2, 23) - 1;
+ private static final int THREE_BYTES_MIN = - THREE_BYTES_MAX - 1;
+
+ private static final boolean newWay = false;
+
+ private static EncodingFactory encodingFactory = new DefaultEncodingFactory();
+
+ public static EncodingFactory getInstance() {
+ // TODO: make it configurable after added new encodingFactory
+ return encodingFactory;
+ }
+
+ @Override
+ public ColumnPageEncoder createEncoder(TableSpec.ColumnSpec columnSpec, ColumnPage inputPage) {
+ // TODO: add log
+ if (columnSpec instanceof TableSpec.MeasureSpec) {
+ return createEncoderForMeasure(inputPage);
+ } else {
+ if (newWay) {
+ return createEncoderForDimension((TableSpec.DimensionSpec) columnSpec, inputPage);
+ } else {
+ assert columnSpec instanceof TableSpec.DimensionSpec;
+ return createEncoderForDimensionLegacy((TableSpec.DimensionSpec) columnSpec);
+ }
+ }
+ }
+
+ private ColumnPageEncoder createEncoderForDimension(TableSpec.DimensionSpec columnSpec,
+ ColumnPage inputPage) {
+ Compressor compressor = CompressorFactory.getInstance().getCompressor();
+ switch (columnSpec.getColumnType()) {
+ case GLOBAL_DICTIONARY:
+ case DIRECT_DICTIONARY:
+ case PLAIN_VALUE:
+ return new DirectCompressCodec(inputPage.getDataType()).createEncoder(null);
+ case COMPLEX:
+ return new ComplexDimensionIndexCodec(false, false, compressor).createEncoder(null);
+ default:
+ throw new RuntimeException("unsupported dimension type: " +
+ columnSpec.getColumnType());
+ }
+ }
+
+ private ColumnPageEncoder createEncoderForDimensionLegacy(TableSpec.DimensionSpec columnSpec) {
+ TableSpec.DimensionSpec dimensionSpec = columnSpec;
+ Compressor compressor = CompressorFactory.getInstance().getCompressor();
+ switch (dimensionSpec.getColumnType()) {
+ case GLOBAL_DICTIONARY:
+ return new DictDimensionIndexCodec(
+ dimensionSpec.isInSortColumns(),
+ dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
+ compressor).createEncoder(null);
+ case DIRECT_DICTIONARY:
+ return new DirectDictDimensionIndexCodec(
+ dimensionSpec.isInSortColumns(),
+ dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
+ compressor).createEncoder(null);
+ case PLAIN_VALUE:
+ return new HighCardDictDimensionIndexCodec(
+ dimensionSpec.isInSortColumns(),
+ dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
+ compressor).createEncoder(null);
+ default:
+ throw new RuntimeException("unsupported dimension type: " +
+ dimensionSpec.getColumnType());
+ }
+ }
+
+ private ColumnPageEncoder createEncoderForMeasure(ColumnPage columnPage) {
+ SimpleStatsResult stats = columnPage.getStatistics();
+ switch (stats.getDataType()) {
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ return selectCodecByAlgorithmForIntegral(stats).createEncoder(null);
+ case FLOAT:
+ case DOUBLE:
+ return selectCodecByAlgorithmForFloating(stats).createEncoder(null);
+ case DECIMAL:
+ case BYTE_ARRAY:
+ return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
+ default:
+ throw new RuntimeException("unsupported data type: " + stats.getDataType());
+ }
+ }
+
+ private static DataType fitLongMinMax(long max, long min) {
+ if (max <= Byte.MAX_VALUE && min >= Byte.MIN_VALUE) {
+ return DataType.BYTE;
+ } else if (max <= Short.MAX_VALUE && min >= Short.MIN_VALUE) {
+ return DataType.SHORT;
+ } else if (max <= THREE_BYTES_MAX && min >= THREE_BYTES_MIN) {
+ return DataType.SHORT_INT;
+ } else if (max <= Integer.MAX_VALUE && min >= Integer.MIN_VALUE) {
+ return DataType.INT;
+ } else {
+ return DataType.LONG;
+ }
+ }
+
+ private static DataType fitMinMax(DataType dataType, Object max, Object min) {
+ switch (dataType) {
+ case BYTE:
+ return fitLongMinMax((byte) max, (byte) min);
+ case SHORT:
+ return fitLongMinMax((short) max, (short) min);
+ case INT:
+ return fitLongMinMax((int) max, (int) min);
+ case LONG:
+ return fitLongMinMax((long) max, (long) min);
+ case DOUBLE:
+ return fitLongMinMax((long) (double) max, (long) (double) min);
+ default:
+ throw new RuntimeException("internal error: " + dataType);
+ }
+ }
+
+ // fit the long input value into minimum data type
+ private static DataType fitDelta(DataType dataType, Object max, Object min) {
+ // use long data type to calculate delta to avoid overflow
+ long value;
+ switch (dataType) {
+ case BYTE:
+ value = (long)(byte) max - (long)(byte) min;
+ break;
+ case SHORT:
+ value = (long)(short) max - (long)(short) min;
+ break;
+ case INT:
+ value = (long)(int) max - (long)(int) min;
+ break;
+ case LONG:
+ // TODO: add overflow detection and return delta type
+ return DataType.LONG;
+ case DOUBLE:
+ return DataType.LONG;
+ default:
+ throw new RuntimeException("internal error: " + dataType);
+ }
+ if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
+ return DataType.BYTE;
+ } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
+ return DataType.SHORT;
+ } else if (value <= THREE_BYTES_MAX && value >= THREE_BYTES_MIN) {
+ return DataType.SHORT_INT;
+ } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
+ return DataType.INT;
+ } else {
+ return DataType.LONG;
+ }
+ }
+
+ /**
+ * choose between adaptive encoder or delta adaptive encoder, based on whose target data type
+ * size is smaller
+ */
+ static ColumnPageCodec selectCodecByAlgorithmForIntegral(SimpleStatsResult stats) {
+ DataType srcDataType = stats.getDataType();
+ DataType adaptiveDataType = fitMinMax(stats.getDataType(), stats.getMax(), stats.getMin());
+ DataType deltaDataType;
+
+ if (adaptiveDataType == DataType.LONG) {
+ deltaDataType = DataType.LONG;
+ } else {
+ deltaDataType = fitDelta(stats.getDataType(), stats.getMax(), stats.getMin());
+ }
+ if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) ==
+ srcDataType.getSizeInBytes()) {
+ // no effect to use adaptive or delta, use compression only
+ return new DirectCompressCodec(stats.getDataType());
+ }
+ if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
+ // choose adaptive encoding
+ return new AdaptiveIntegralCodec(stats.getDataType(), adaptiveDataType, stats);
+ } else {
+ // choose delta adaptive encoding
+ return new AdaptiveDeltaIntegralCodec(stats.getDataType(), deltaDataType, stats);
+ }
+ }
+
+ // choose between upscale adaptive encoder or upscale delta adaptive encoder,
+ // based on whose target data type size is smaller
+ static ColumnPageCodec selectCodecByAlgorithmForFloating(SimpleStatsResult stats) {
+ DataType srcDataType = stats.getDataType();
+ double maxValue = (double) stats.getMax();
+ double minValue = (double) stats.getMin();
+ int decimalCount = stats.getDecimalCount();
+
+ //Here we should use the Max abs as max to getDatatype, let's say -1 and -10000000, -1 is max,
+ //but we can't use -1 to getDatatype, we should use -10000000.
+ double absMaxValue = Math.max(Math.abs(maxValue), Math.abs(minValue));
+
+ if (decimalCount == 0) {
+ // short, int, long
+ return selectCodecByAlgorithmForIntegral(stats);
+ } else if (decimalCount < 0) {
+ return new DirectCompressCodec(DataType.DOUBLE);
+ } else {
+ // double
+ long max = (long) (Math.pow(10, decimalCount) * absMaxValue);
+ DataType adaptiveDataType = fitLongMinMax(max, 0);
+ if (adaptiveDataType.getSizeInBytes() < DataType.DOUBLE.getSizeInBytes()) {
+ return new AdaptiveFloatingCodec(srcDataType, adaptiveDataType, stats);
+ } else {
+ return new DirectCompressCodec(DataType.DOUBLE);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
deleted file mode 100644
index 04ca8a3..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingCodec;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec;
-import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
-import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.ComplexDimensionIndexCodec;
-import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.DictDimensionIndexCodec;
-import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.DirectDictDimensionIndexCodec;
-import org.apache.carbondata.core.datastore.page.encoding.dimension.legacy.HighCardDictDimensionIndexCodec;
-import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-
-/**
- * Default strategy will select encoding base on column page data type and statistics
- */
-public class DefaultEncodingStrategy extends EncodingStrategy {
-
- private static final int THREE_BYTES_MAX = (int) Math.pow(2, 23) - 1;
- private static final int THREE_BYTES_MIN = - THREE_BYTES_MAX - 1;
-
- private static final boolean newWay = false;
-
- @Override
- public ColumnPageEncoder createEncoder(TableSpec.ColumnSpec columnSpec, ColumnPage inputPage) {
- // TODO: add log
- if (columnSpec instanceof TableSpec.MeasureSpec) {
- return createEncoderForMeasure(inputPage);
- } else {
- if (newWay) {
- return createEncoderForDimension((TableSpec.DimensionSpec) columnSpec, inputPage);
- } else {
- assert columnSpec instanceof TableSpec.DimensionSpec;
- return createEncoderForDimensionLegacy((TableSpec.DimensionSpec) columnSpec);
- }
- }
- }
-
- private ColumnPageEncoder createEncoderForDimension(TableSpec.DimensionSpec columnSpec,
- ColumnPage inputPage) {
- Compressor compressor = CompressorFactory.getInstance().getCompressor();
- switch (columnSpec.getDimensionType()) {
- case GLOBAL_DICTIONARY:
- case DIRECT_DICTIONARY:
- case PLAIN_VALUE:
- return new DirectCompressCodec(inputPage.getDataType()).createEncoder(null);
- case COMPLEX:
- return new ComplexDimensionIndexCodec(false, false, compressor).createEncoder(null);
- default:
- throw new RuntimeException("unsupported dimension type: " +
- columnSpec.getDimensionType());
- }
- }
-
- private ColumnPageEncoder createEncoderForDimensionLegacy(TableSpec.DimensionSpec columnSpec) {
- TableSpec.DimensionSpec dimensionSpec = columnSpec;
- Compressor compressor = CompressorFactory.getInstance().getCompressor();
- switch (dimensionSpec.getDimensionType()) {
- case GLOBAL_DICTIONARY:
- return new DictDimensionIndexCodec(
- dimensionSpec.isInSortColumns(),
- dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
- compressor).createEncoder(null);
- case DIRECT_DICTIONARY:
- return new DirectDictDimensionIndexCodec(
- dimensionSpec.isInSortColumns(),
- dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
- compressor).createEncoder(null);
- case PLAIN_VALUE:
- return new HighCardDictDimensionIndexCodec(
- dimensionSpec.isInSortColumns(),
- dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
- compressor).createEncoder(null);
- default:
- throw new RuntimeException("unsupported dimension type: " +
- dimensionSpec.getDimensionType());
- }
- }
-
- private ColumnPageEncoder createEncoderForMeasure(ColumnPage columnPage) {
- SimpleStatsResult stats = columnPage.getStatistics();
- switch (stats.getDataType()) {
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- return selectCodecByAlgorithmForIntegral(stats).createEncoder(null);
- case FLOAT:
- case DOUBLE:
- return selectCodecByAlgorithmForFloating(stats).createEncoder(null);
- case DECIMAL:
- case BYTE_ARRAY:
- return new DirectCompressCodec(columnPage.getDataType()).createEncoder(null);
- default:
- throw new RuntimeException("unsupported data type: " + stats.getDataType());
- }
- }
-
- private static DataType fitLongMinMax(long max, long min) {
- if (max <= Byte.MAX_VALUE && min >= Byte.MIN_VALUE) {
- return DataType.BYTE;
- } else if (max <= Short.MAX_VALUE && min >= Short.MIN_VALUE) {
- return DataType.SHORT;
- } else if (max <= THREE_BYTES_MAX && min >= THREE_BYTES_MIN) {
- return DataType.SHORT_INT;
- } else if (max <= Integer.MAX_VALUE && min >= Integer.MIN_VALUE) {
- return DataType.INT;
- } else {
- return DataType.LONG;
- }
- }
-
- private static DataType fitMinMax(DataType dataType, Object max, Object min) {
- switch (dataType) {
- case BYTE:
- return fitLongMinMax((byte) max, (byte) min);
- case SHORT:
- return fitLongMinMax((short) max, (short) min);
- case INT:
- return fitLongMinMax((int) max, (int) min);
- case LONG:
- return fitLongMinMax((long) max, (long) min);
- case DOUBLE:
- return fitLongMinMax((long) (double) max, (long) (double) min);
- default:
- throw new RuntimeException("internal error: " + dataType);
- }
- }
-
- // fit the long input value into minimum data type
- private static DataType fitDelta(DataType dataType, Object max, Object min) {
- // use long data type to calculate delta to avoid overflow
- long value;
- switch (dataType) {
- case BYTE:
- value = (long)(byte) max - (long)(byte) min;
- break;
- case SHORT:
- value = (long)(short) max - (long)(short) min;
- break;
- case INT:
- value = (long)(int) max - (long)(int) min;
- break;
- case LONG:
- // TODO: add overflow detection and return delta type
- return DataType.LONG;
- case DOUBLE:
- return DataType.LONG;
- default:
- throw new RuntimeException("internal error: " + dataType);
- }
- if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
- return DataType.BYTE;
- } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
- return DataType.SHORT;
- } else if (value <= THREE_BYTES_MAX && value >= THREE_BYTES_MIN) {
- return DataType.SHORT_INT;
- } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
- return DataType.INT;
- } else {
- return DataType.LONG;
- }
- }
-
- /**
- * choose between adaptive encoder or delta adaptive encoder, based on whose target data type
- * size is smaller
- */
- static ColumnPageCodec selectCodecByAlgorithmForIntegral(SimpleStatsResult stats) {
- DataType srcDataType = stats.getDataType();
- DataType adaptiveDataType = fitMinMax(stats.getDataType(), stats.getMax(), stats.getMin());
- DataType deltaDataType;
-
- if (adaptiveDataType == DataType.LONG) {
- deltaDataType = DataType.LONG;
- } else {
- deltaDataType = fitDelta(stats.getDataType(), stats.getMax(), stats.getMin());
- }
- if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) ==
- srcDataType.getSizeInBytes()) {
- // no effect to use adaptive or delta, use compression only
- return new DirectCompressCodec(stats.getDataType());
- }
- if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
- // choose adaptive encoding
- return new AdaptiveIntegralCodec(stats.getDataType(), adaptiveDataType, stats);
- } else {
- // choose delta adaptive encoding
- return new AdaptiveDeltaIntegralCodec(stats.getDataType(), deltaDataType, stats);
- }
- }
-
- // choose between upscale adaptive encoder or upscale delta adaptive encoder,
- // based on whose target data type size is smaller
- static ColumnPageCodec selectCodecByAlgorithmForFloating(SimpleStatsResult stats) {
- DataType srcDataType = stats.getDataType();
- double maxValue = (double) stats.getMax();
- double minValue = (double) stats.getMin();
- int decimalCount = stats.getDecimalCount();
-
- //Here we should use the Max abs as max to getDatatype, let's say -1 and -10000000, -1 is max,
- //but we can't use -1 to getDatatype, we should use -10000000.
- double absMaxValue = Math.max(Math.abs(maxValue), Math.abs(minValue));
-
- if (decimalCount == 0) {
- // short, int, long
- return selectCodecByAlgorithmForIntegral(stats);
- } else if (decimalCount < 0) {
- return new DirectCompressCodec(DataType.DOUBLE);
- } else {
- // double
- long max = (long) (Math.pow(10, decimalCount) * absMaxValue);
- DataType adaptiveDataType = fitLongMinMax(max, 0);
- if (adaptiveDataType.getSizeInBytes() < DataType.DOUBLE.getSizeInBytes()) {
- return new AdaptiveFloatingCodec(srcDataType, adaptiveDataType, stats);
- } else {
- return new DirectCompressCodec(DataType.DOUBLE);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
new file mode 100644
index 0000000..9a52183
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.ColumnType;
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingCodec;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec;
+import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
+import org.apache.carbondata.core.datastore.page.encoding.rle.RLECodec;
+import org.apache.carbondata.core.datastore.page.encoding.rle.RLEEncoderMeta;
+import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.Encoding;
+
+import static org.apache.carbondata.format.Encoding.ADAPTIVE_DELTA_INTEGRAL;
+import static org.apache.carbondata.format.Encoding.ADAPTIVE_FLOATING;
+import static org.apache.carbondata.format.Encoding.ADAPTIVE_INTEGRAL;
+import static org.apache.carbondata.format.Encoding.DIRECT_COMPRESS;
+import static org.apache.carbondata.format.Encoding.RLE_INTEGRAL;
+
+/**
+ * Base class for encoding factory implementation.
+ */
+public abstract class EncodingFactory {
+
+ /**
+ * Return new encoder for specified column
+ */
+ public abstract ColumnPageEncoder createEncoder(TableSpec.ColumnSpec columnSpec,
+ ColumnPage inputPage);
+
+ /**
+ * Return new decoder based on encoder metadata read from file
+ */
+ public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas)
+ throws IOException {
+ assert (encodings.size() == 1);
+ assert (encoderMetas.size() == 1);
+ Encoding encoding = encodings.get(0);
+ byte[] encoderMeta = encoderMetas.get(0).array();
+ ByteArrayInputStream stream = new ByteArrayInputStream(encoderMeta);
+ DataInputStream in = new DataInputStream(stream);
+ if (encoding == DIRECT_COMPRESS) {
+ ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+ metadata.readFields(in);
+ return new DirectCompressCodec(metadata.getStoreDataType()).createDecoder(metadata);
+ } else if (encoding == ADAPTIVE_INTEGRAL) {
+ ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+ metadata.readFields(in);
+ SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
+ return new AdaptiveIntegralCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(),
+ stats).createDecoder(metadata);
+ } else if (encoding == ADAPTIVE_DELTA_INTEGRAL) {
+ ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+ metadata.readFields(in);
+ SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
+ return new AdaptiveDeltaIntegralCodec(metadata.getSchemaDataType(),
+ metadata.getStoreDataType(), stats).createDecoder(metadata);
+ } else if (encoding == ADAPTIVE_FLOATING) {
+ ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+ metadata.readFields(in);
+ SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
+ return new AdaptiveFloatingCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(),
+ stats).createDecoder(metadata);
+ } else if (encoding == RLE_INTEGRAL) {
+ RLEEncoderMeta metadata = new RLEEncoderMeta();
+ metadata.readFields(in);
+ return new RLECodec().createDecoder(metadata);
+ } else {
+ // for backward compatibility
+ ValueEncoderMeta metadata = CarbonUtil.deserializeEncoderMetaV3(encoderMeta);
+ return createDecoderLegacy(metadata);
+ }
+ }
+
+ /**
+ * Old way of creating decoder, based on algorithm
+ */
+ public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata) {
+ SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
+ TableSpec.ColumnSpec spec = new TableSpec.ColumnSpec("legacy", stats.getDataType(),
+ ColumnType.MEASURE);
+ String compressor = "snappy";
+ switch (metadata.getType()) {
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ // create the codec based on algorithm and create decoder by recovering the metadata
+ ColumnPageCodec codec = DefaultEncodingFactory.selectCodecByAlgorithmForIntegral(stats);
+ if (codec instanceof AdaptiveIntegralCodec) {
+ AdaptiveIntegralCodec adaptiveCodec = (AdaptiveIntegralCodec) codec;
+ ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
+ adaptiveCodec.getTargetDataType(), stats, compressor);
+ return codec.createDecoder(meta);
+ } else if (codec instanceof AdaptiveDeltaIntegralCodec) {
+ AdaptiveDeltaIntegralCodec adaptiveCodec = (AdaptiveDeltaIntegralCodec) codec;
+ ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
+ adaptiveCodec.getTargetDataType(), stats, compressor);
+ return codec.createDecoder(meta);
+ } else if (codec instanceof DirectCompressCodec) {
+ ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
+ metadata.getType(), stats, compressor);
+ return codec.createDecoder(meta);
+ } else {
+ throw new RuntimeException("internal error");
+ }
+ case FLOAT:
+ case DOUBLE:
+ // create the codec based on algorithm and create decoder by recovering the metadata
+ codec = DefaultEncodingFactory.selectCodecByAlgorithmForFloating(stats);
+ if (codec instanceof AdaptiveFloatingCodec) {
+ AdaptiveFloatingCodec adaptiveCodec = (AdaptiveFloatingCodec) codec;
+ ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
+ adaptiveCodec.getTargetDataType(), stats, compressor);
+ return codec.createDecoder(meta);
+ } else if (codec instanceof DirectCompressCodec) {
+ ColumnPageEncoderMeta meta = new ColumnPageEncoderMeta(spec,
+ metadata.getType(), stats, compressor);
+ return codec.createDecoder(meta);
+ } else {
+ throw new RuntimeException("internal error");
+ }
+ case DECIMAL:
+ case BYTE_ARRAY:
+ // no dictionary dimension
+ return new DirectCompressCodec(stats.getDataType()).createDecoder(
+ new ColumnPageEncoderMeta(spec, stats.getDataType(), stats, compressor));
+ default:
+ throw new RuntimeException("unsupported data type: " + stats.getDataType());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
deleted file mode 100644
index 3b7b10c..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralEncoderMeta;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingCodec;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingEncoderMeta;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralEncoderMeta;
-import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
-import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressorEncoderMeta;
-import org.apache.carbondata.core.datastore.page.encoding.rle.RLECodec;
-import org.apache.carbondata.core.datastore.page.encoding.rle.RLEEncoderMeta;
-import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
-import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.metadata.ValueEncoderMeta;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.format.Encoding;
-
-import static org.apache.carbondata.format.Encoding.ADAPTIVE_DELTA_INTEGRAL;
-import static org.apache.carbondata.format.Encoding.ADAPTIVE_FLOATING;
-import static org.apache.carbondata.format.Encoding.ADAPTIVE_INTEGRAL;
-import static org.apache.carbondata.format.Encoding.DIRECT_COMPRESS;
-import static org.apache.carbondata.format.Encoding.RLE_INTEGRAL;
-
-/**
- * Base class for encoding strategy implementation.
- */
-public abstract class EncodingStrategy {
-
- /**
- * Return new encoder for specified column
- */
- public abstract ColumnPageEncoder createEncoder(TableSpec.ColumnSpec columnSpec,
- ColumnPage inputPage);
-
- /**
- * Return new decoder based on encoder metadata read from file
- */
- public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas)
- throws IOException {
- assert (encodings.size() == 1);
- assert (encoderMetas.size() == 1);
- Encoding encoding = encodings.get(0);
- byte[] encoderMeta = encoderMetas.get(0).array();
- ByteArrayInputStream stream = new ByteArrayInputStream(encoderMeta);
- DataInputStream in = new DataInputStream(stream);
- if (encoding == DIRECT_COMPRESS) {
- DirectCompressorEncoderMeta metadata = new DirectCompressorEncoderMeta();
- metadata.readFields(in);
- return new DirectCompressCodec(metadata.getDataType()).createDecoder(metadata);
- } else if (encoding == ADAPTIVE_INTEGRAL) {
- AdaptiveIntegralEncoderMeta metadata = new AdaptiveIntegralEncoderMeta();
- metadata.readFields(in);
- SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
- return new AdaptiveIntegralCodec(metadata.getDataType(), metadata.getTargetDataType(),
- stats).createDecoder(metadata);
- } else if (encoding == ADAPTIVE_DELTA_INTEGRAL) {
- AdaptiveDeltaIntegralEncoderMeta metadata = new AdaptiveDeltaIntegralEncoderMeta();
- metadata.readFields(in);
- SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
- return new AdaptiveDeltaIntegralCodec(metadata.getDataType(), metadata.getTargetDataType(),
- stats).createDecoder(metadata);
- } else if (encoding == RLE_INTEGRAL) {
- RLEEncoderMeta metadata = new RLEEncoderMeta();
- metadata.readFields(in);
- return new RLECodec().createDecoder(metadata);
- } else if (encoding == ADAPTIVE_FLOATING) {
- AdaptiveFloatingEncoderMeta metadata = new AdaptiveFloatingEncoderMeta();
- metadata.readFields(in);
- SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
- return new AdaptiveFloatingCodec(metadata.getDataType(), metadata.getTargetDataType(),
- stats).createDecoder(metadata);
- } else {
- // for backward compatibility
- ValueEncoderMeta metadata = CarbonUtil.deserializeEncoderMetaV3(encoderMeta);
- return createDecoderLegacy(metadata);
- }
- }
-
- /**
- * Old way of creating decoder, based on algorithm
- */
- public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata) {
- SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
- switch (metadata.getType()) {
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- // create the codec based on algorithm and create decoder by recovering the metadata
- ColumnPageCodec codec = DefaultEncodingStrategy.selectCodecByAlgorithmForIntegral(stats);
- if (codec instanceof AdaptiveIntegralCodec) {
- AdaptiveIntegralCodec adaptiveCodec = (AdaptiveIntegralCodec) codec;
- AdaptiveIntegralEncoderMeta meta = new AdaptiveIntegralEncoderMeta(
- "snappy", adaptiveCodec.getTargetDataType(), stats);
- return codec.createDecoder(meta);
- } else if (codec instanceof AdaptiveDeltaIntegralCodec) {
- AdaptiveDeltaIntegralCodec adaptiveCodec = (AdaptiveDeltaIntegralCodec) codec;
- AdaptiveDeltaIntegralEncoderMeta meta = new AdaptiveDeltaIntegralEncoderMeta(
- "snappy", adaptiveCodec.getTargetDataType(), stats);
- return codec.createDecoder(meta);
- } else if (codec instanceof DirectCompressCodec) {
- DirectCompressorEncoderMeta meta = new DirectCompressorEncoderMeta(
- "snappy", metadata.getType(), stats);
- return codec.createDecoder(meta);
- } else {
- throw new RuntimeException("internal error");
- }
- case FLOAT:
- case DOUBLE:
- // create the codec based on algorithm and create decoder by recovering the metadata
- codec = DefaultEncodingStrategy.selectCodecByAlgorithmForFloating(stats);
- if (codec instanceof AdaptiveFloatingCodec) {
- AdaptiveFloatingCodec adaptiveCodec = (AdaptiveFloatingCodec) codec;
- AdaptiveFloatingEncoderMeta meta = new AdaptiveFloatingEncoderMeta(
- "snappy", adaptiveCodec.getTargetDataType(), stats);
- return codec.createDecoder(meta);
- } else if (codec instanceof DirectCompressCodec) {
- DirectCompressorEncoderMeta meta = new DirectCompressorEncoderMeta(
- "snappy", metadata.getType(), stats);
- return codec.createDecoder(meta);
- } else {
- throw new RuntimeException("internal error");
- }
- case DECIMAL:
- case BYTE_ARRAY:
- // no dictionary dimension
- return new DirectCompressCodec(stats.getDataType()).createDecoder(
- new DirectCompressorEncoderMeta("snappy", stats.getDataType(), stats));
- default:
- throw new RuntimeException("unsupported data type: " + stats.getDataType());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java
deleted file mode 100644
index 56527cb..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-/**
- * Factory to create Encoding Strategy.
- * Now only a default strategy is supported which will choose encoding based on data type
- * and column data stats.
- */
-public class EncodingStrategyFactory {
-
- private static EncodingStrategy defaultStrategy = new DefaultEncodingStrategy();
-
- public static EncodingStrategy getStrategy() {
- // TODO: make it configurable after added new strategy
- return defaultStrategy;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
index 135c317..ece5cb6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
@@ -17,9 +17,7 @@
package org.apache.carbondata.core.datastore.page.encoding.adaptive;
-import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
-import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -47,10 +45,6 @@ public abstract class AdaptiveCodec implements ColumnPageCodec {
this.targetDataType = targetDataType;
}
- public EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) {
- throw new UnsupportedOperationException("internal error");
- }
-
public DataType getTargetDataType() {
return targetDataType;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index 60ff3ab..ad327f7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -87,7 +87,8 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
if (encodedPage != null) {
throw new IllegalStateException("already encoded");
}
- encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+ encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+ input.getPageSize());
input.convertValue(converter);
byte[] result = encodedPage.compress(compressor);
encodedPage.freeMemory();
@@ -96,8 +97,8 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
@Override
protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
- return new AdaptiveDeltaIntegralEncoderMeta(
- compressor.getName(), targetDataType, inputPage.getStatistics());
+ return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType,
+ inputPage.getStatistics(), compressor.getName());
}
@Override
@@ -111,16 +112,12 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
}
@Override
- public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
- assert meta instanceof AdaptiveDeltaIntegralEncoderMeta;
- AdaptiveDeltaIntegralEncoderMeta codecMeta = (AdaptiveDeltaIntegralEncoderMeta) meta;
- final Compressor compressor = CompressorFactory.getInstance().getCompressor(
- codecMeta.getCompressorName());
+ public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
return new ColumnPageDecoder() {
@Override
public ColumnPage decode(byte[] input, int offset, int length)
throws MemoryException, IOException {
- ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+ ColumnPage page = ColumnPage.decompress(meta, input, offset, length);
return LazyColumnPage.newPage(page, converter);
}
};
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java
deleted file mode 100644
index c2d86d9..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding.adaptive;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.Writable;
-
-public class AdaptiveDeltaIntegralEncoderMeta extends AdaptiveEncoderMeta implements Writable {
-
- public AdaptiveDeltaIntegralEncoderMeta() {
- }
-
- public AdaptiveDeltaIntegralEncoderMeta(String compressorName, DataType targetDataType,
- SimpleStatsResult stats) {
- super(targetDataType, stats, compressorName);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java
deleted file mode 100644
index 3104dd6..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding.adaptive;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
-import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.Writable;
-
-/**
- * Metadata for AdaptiveIntegralCodec and DeltaIntegralCodec
- */
-public class AdaptiveEncoderMeta extends ColumnPageEncoderMeta implements Writable {
-
- private DataType targetDataType;
- private String compressorName;
-
- AdaptiveEncoderMeta() {
-
- }
-
- AdaptiveEncoderMeta(DataType targetDataType, SimpleStatsResult stats,
- String compressorName) {
- super(stats.getDataType(), stats);
- this.targetDataType = targetDataType;
- this.compressorName = compressorName;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- out.writeByte(targetDataType.ordinal());
- out.writeUTF(compressorName);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- this.targetDataType = DataType.valueOf(in.readByte());
- this.compressorName = in.readUTF();
- }
-
- public DataType getTargetDataType() {
- return targetDataType;
- }
-
- public String getCompressorName() {
- return compressorName;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index 789383c..c238245 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -71,7 +71,8 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
if (encodedPage != null) {
throw new IllegalStateException("already encoded");
}
- encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+ encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+ input.getPageSize());
input.convertValue(converter);
byte[] result = encodedPage.compress(compressor);
encodedPage.freeMemory();
@@ -87,24 +88,20 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
@Override
protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
- return new AdaptiveFloatingEncoderMeta(compressor.getName(), targetDataType, stats);
+ return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType, stats,
+ compressor.getName());
}
};
}
@Override
- public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
- assert meta instanceof AdaptiveFloatingEncoderMeta;
- AdaptiveFloatingEncoderMeta codecMeta = (AdaptiveFloatingEncoderMeta) meta;
- final Compressor compressor =
- CompressorFactory.getInstance().getCompressor(codecMeta.getCompressorName());
- final DataType targetDataType = codecMeta.getTargetDataType();
+ public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
return new ColumnPageDecoder() {
@Override
public ColumnPage decode(byte[] input, int offset, int length)
throws MemoryException, IOException {
- ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+ ColumnPage page = ColumnPage.decompress(meta, input, offset, length);
return LazyColumnPage.newPage(page, converter);
}
};
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingEncoderMeta.java
deleted file mode 100644
index 085e751..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingEncoderMeta.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding.adaptive;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.Writable;
-
-public class AdaptiveFloatingEncoderMeta extends AdaptiveEncoderMeta implements Writable {
-
- public AdaptiveFloatingEncoderMeta() {
- }
-
- public AdaptiveFloatingEncoderMeta(String compressorName, DataType targetDataType,
- SimpleStatsResult stats) {
- super(targetDataType, stats, compressorName);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index 543a86e..6df2e64 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -62,7 +62,8 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
if (encodedPage != null) {
throw new IllegalStateException("already encoded");
}
- encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+ encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+ input.getPageSize());
input.convertValue(converter);
byte[] result = encodedPage.compress(compressor);
encodedPage.freeMemory();
@@ -78,24 +79,20 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
@Override
protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
- return new AdaptiveIntegralEncoderMeta(compressor.getName(), targetDataType, stats);
+ return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType, stats,
+ compressor.getName());
}
};
}
@Override
- public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
- assert meta instanceof AdaptiveIntegralEncoderMeta;
- AdaptiveIntegralEncoderMeta codecMeta = (AdaptiveIntegralEncoderMeta) meta;
- final Compressor compressor = CompressorFactory.getInstance().getCompressor(
- codecMeta.getCompressorName());
- final DataType targetDataType = codecMeta.getTargetDataType();
+ public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
return new ColumnPageDecoder() {
@Override
public ColumnPage decode(byte[] input, int offset, int length)
throws MemoryException, IOException {
- ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+ ColumnPage page = ColumnPage.decompress(meta, input, offset, length);
return LazyColumnPage.newPage(page, converter);
}
};
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java
deleted file mode 100644
index 0a4f399..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding.adaptive;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.Writable;
-
-public class AdaptiveIntegralEncoderMeta extends AdaptiveEncoderMeta implements Writable {
-
- public AdaptiveIntegralEncoderMeta() {
- }
-
- public AdaptiveIntegralEncoderMeta(String compressorName, DataType targetDataType,
- SimpleStatsResult stats) {
- super(targetDataType, stats, compressorName);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index cb1508f..13879b9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -60,10 +60,7 @@ public class DirectCompressCodec implements ColumnPageCodec {
@Override
public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
- assert meta instanceof DirectCompressorEncoderMeta;
- DirectCompressorEncoderMeta codecMeta = (DirectCompressorEncoderMeta) meta;
- return new DirectDecompressor(codecMeta.getCompressorName(),
- codecMeta.getScale(), codecMeta.getPrecision());
+ return new DirectDecompressor(meta);
}
private static class DirectCompressor extends ColumnPageEncoder {
@@ -88,32 +85,27 @@ public class DirectCompressCodec implements ColumnPageCodec {
@Override
protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
- return new DirectCompressorEncoderMeta(compressor.getName(), inputPage.getDataType(),
- inputPage.getStatistics());
+ return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), inputPage.getDataType(),
+ inputPage.getStatistics(), compressor.getName());
}
}
private class DirectDecompressor implements ColumnPageDecoder {
- private Compressor compressor;
- private int scale;
- private int precision;
+ private ColumnPageEncoderMeta meta;
- DirectDecompressor(String compressorName, int scale, int precision) {
- this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
- this.scale = scale;
- this.precision = precision;
+ DirectDecompressor(ColumnPageEncoderMeta meta) {
+ this.meta = meta;
}
@Override
public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
ColumnPage decodedPage;
if (dataType == DataType.DECIMAL) {
- decodedPage = ColumnPage.decompressDecimalPage(compressor, input, offset, length,
- scale, precision);
+ decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length);
} else {
- decodedPage = ColumnPage.decompress(compressor, dataType, input, offset, length);
+ decodedPage = ColumnPage.decompress(meta, input, offset, length);
}
return LazyColumnPage.newPage(decodedPage, converter);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java
deleted file mode 100644
index cf19259..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding.compress;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
-import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.Writable;
-
-public class DirectCompressorEncoderMeta extends ColumnPageEncoderMeta implements Writable {
- private String compressorName;
-
- public DirectCompressorEncoderMeta() {
- }
-
- public DirectCompressorEncoderMeta(String compressorName, final DataType dataType,
- SimpleStatsResult stats) {
- super(dataType, stats);
- this.compressorName = compressorName;
- }
-
- public String getCompressorName() {
- return compressorName;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- out.writeUTF(compressorName);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- compressorName = in.readUTF();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
index 12690a5..419b589 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
@@ -64,7 +65,7 @@ public class RLECodec implements ColumnPageCodec {
public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
assert meta instanceof RLEEncoderMeta;
RLEEncoderMeta codecMeta = (RLEEncoderMeta) meta;
- return new RLEDecoder(codecMeta.getDataType(), codecMeta.getPageSize());
+ return new RLEDecoder(meta.getColumnSpec(), codecMeta.getPageSize());
}
// This codec supports integral type only
@@ -157,7 +158,7 @@ public class RLECodec implements ColumnPageCodec {
@Override
protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
- return new RLEEncoderMeta(
+ return new RLEEncoderMeta(inputPage.getColumnSpec(),
inputPage.getDataType(), inputPage.getPageSize(), inputPage.getStatistics());
}
@@ -291,21 +292,21 @@ public class RLECodec implements ColumnPageCodec {
// TODO: add a on-the-fly decoder for filter query with high selectivity
private class RLEDecoder implements ColumnPageDecoder {
- // src data type
- private DataType dataType;
+ private TableSpec.ColumnSpec columnSpec;
private int pageSize;
- private RLEDecoder(DataType dataType, int pageSize) {
- validateDataType(dataType);
- this.dataType = dataType;
+ private RLEDecoder(TableSpec.ColumnSpec columnSpec, int pageSize) {
+ validateDataType(columnSpec.getSchemaDataType());
+ this.columnSpec = columnSpec;
this.pageSize = pageSize;
}
@Override
public ColumnPage decode(byte[] input, int offset, int length)
throws MemoryException, IOException {
+ DataType dataType = columnSpec.getSchemaDataType();
DataInputStream in = new DataInputStream(new ByteArrayInputStream(input, offset, length));
- ColumnPage resultPage = ColumnPage.newPage(dataType, pageSize);
+ ColumnPage resultPage = ColumnPage.newPage(columnSpec, dataType, pageSize);
switch (dataType) {
case BYTE:
decodeBytePage(in, resultPage);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
index 5d68872..8871671 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLEEncoderMeta.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -37,8 +38,9 @@ public class RLEEncoderMeta extends ColumnPageEncoderMeta implements Writable {
}
- public RLEEncoderMeta(DataType dataType, int pageSize, SimpleStatsResult stats) {
- super(dataType, stats);
+ public RLEEncoderMeta(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
+ SimpleStatsResult stats) {
+ super(columnSpec, dataType, stats, "");
this.pageSize = pageSize;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
index 4fb891f..2f6178b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
@@ -51,10 +51,10 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
// this is for decode flow, create stats from encoder meta in carbondata file
public static PrimitivePageStatsCollector newInstance(ColumnPageEncoderMeta meta) {
- PrimitivePageStatsCollector instance =
- new PrimitivePageStatsCollector(meta.getDataType(), meta.getScale(), meta.getPrecision());
+ PrimitivePageStatsCollector instance = new PrimitivePageStatsCollector(meta.getSchemaDataType(),
+ meta.getScale(), meta.getPrecision());
// set min max from meta
- switch (meta.getDataType()) {
+ switch (meta.getSchemaDataType()) {
case BYTE:
instance.minByte = (byte) meta.getMinValue();
instance.maxByte = (byte) meta.getMaxValue();
@@ -85,7 +85,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
break;
default:
throw new UnsupportedOperationException(
- "unsupported data type for stats collection: " + meta.getDataType());
+ "unsupported data type for stats collection: " + meta.getSchemaDataType());
}
return instance;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
index 4caa0b3..39227a3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
@@ -63,16 +63,15 @@ public class ArrayQueryType extends ComplexQueryType implements GenericQueryType
public void parseBlocksAndReturnComplexColumnByteArray(DimensionRawColumnChunk[] rawColumnChunks,
int rowNumber, int pageNumber, DataOutputStream dataOutputStream) throws IOException {
- byte[] input = new byte[8];
- copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber, input);
+ byte[] input = copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber);
ByteBuffer byteArray = ByteBuffer.wrap(input);
int dataLength = byteArray.getInt();
dataOutputStream.writeInt(dataLength);
if (dataLength > 0) {
- int columnIndex = byteArray.getInt();
+ int dataOffset = byteArray.getInt();
for (int i = 0; i < dataLength; i++) {
children
- .parseBlocksAndReturnComplexColumnByteArray(rawColumnChunks, columnIndex++, pageNumber,
+ .parseBlocksAndReturnComplexColumnByteArray(rawColumnChunks, dataOffset++, pageNumber,
dataOutputStream);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
index 2274186..ee43a10 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
@@ -36,17 +36,16 @@ public class ComplexQueryType {
}
/**
- * Method will copy the block chunk holder data to the passed
- * byte[], this method is also used by child
- *
- * @param rowNumber
- * @param input
+ * Method will copy the block chunk holder data and return the cloned value.
+ * This method is also used by child.
*/
- protected void copyBlockDataChunk(DimensionRawColumnChunk[] rawColumnChunks,
- int rowNumber, int pageNumber, byte[] input) {
+ protected byte[] copyBlockDataChunk(DimensionRawColumnChunk[] rawColumnChunks,
+ int rowNumber, int pageNumber) {
byte[] data =
rawColumnChunks[blockIndex].convertToDimColDataChunk(pageNumber).getChunkData(rowNumber);
- System.arraycopy(data, 0, input, 0, data.length);
+ byte[] output = new byte[data.length];
+ System.arraycopy(data, 0, output, 0, output.length);
+ return output;
}
/*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
index 9c9be86..56c265b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -46,8 +45,6 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
private int keySize;
- private int blockIndex;
-
private Dictionary dictionary;
private org.apache.carbondata.core.metadata.datatype.DataType dataType;
@@ -63,7 +60,6 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
this.dictionary = dictionary;
this.name = name;
this.parentname = parentname;
- this.blockIndex = blockIndex;
this.isDirectDictionary = isDirectDictionary;
}
@@ -95,10 +91,7 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
@Override public void parseBlocksAndReturnComplexColumnByteArray(
DimensionRawColumnChunk[] rawColumnChunks, int rowNumber,
int pageNumber, DataOutputStream dataOutputStream) throws IOException {
- DimensionColumnDataChunk dataChunk =
- rawColumnChunks[blockIndex].convertToDimColDataChunk(pageNumber);
- byte[] currentVal = new byte[dataChunk.getColumnValueSize()];
- copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber, currentVal);
+ byte[] currentVal = copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber);
dataOutputStream.write(currentVal);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
index bb64e92..23a9f81 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
@@ -84,8 +84,7 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp
@Override public void parseBlocksAndReturnComplexColumnByteArray(
DimensionRawColumnChunk[] dimensionColumnDataChunks, int rowNumber,
int pageNumber, DataOutputStream dataOutputStream) throws IOException {
- byte[] input = new byte[8];
- copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, pageNumber, input);
+ byte[] input = copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, pageNumber);
ByteBuffer byteArray = ByteBuffer.wrap(input);
int childElement = byteArray.getInt();
dataOutputStream.writeInt(childElement);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java
index 9e17717..79d3388 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/page/encoding/RLECodecSuite.java
@@ -20,6 +20,8 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import org.apache.carbondata.core.datastore.ColumnType;
+import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.rle.RLECodec;
import org.apache.carbondata.core.datastore.page.encoding.rle.RLEEncoderMeta;
@@ -42,7 +44,9 @@ public class RLECodecSuite {
TestData(byte[] inputByteData, byte[] expectedEncodedByteData) throws IOException, MemoryException {
this.inputByteData = inputByteData;
- inputBytePage = ColumnPage.newPage(DataType.BYTE, inputByteData.length);
+ inputBytePage = ColumnPage.newPage(
+ new TableSpec.ColumnSpec("test", DataType.BYTE, ColumnType.MEASURE),
+ DataType.BYTE, inputByteData.length);
inputBytePage.setStatsCollector(PrimitivePageStatsCollector.newInstance(DataType.BYTE, 0, 0));
for (int i = 0; i < inputByteData.length; i++) {
inputBytePage.putData(i, inputByteData[i]);
@@ -125,7 +129,9 @@ public class RLECodecSuite {
private void testBytePageDecode(byte[] inputBytes, byte[] expectedDecodedBytes) throws IOException, MemoryException {
RLECodec codec = new RLECodec();
- RLEEncoderMeta meta = new RLEEncoderMeta(DataType.BYTE, expectedDecodedBytes.length, null);
+ RLEEncoderMeta meta = new RLEEncoderMeta(
+ new TableSpec.ColumnSpec("test", DataType.BYTE, ColumnType.MEASURE),
+ DataType.BYTE, expectedDecodedBytes.length, null);
ColumnPageDecoder decoder = codec.createDecoder(meta);
ColumnPage page = decoder.decode(inputBytes, 0, inputBytes.length);
byte[] decoded = page.getBytePage();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index 3e1b63b..35b45ca 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -19,13 +19,12 @@ package org.apache.carbondata.core.util;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.BitSet;
import java.util.List;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
-import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveEncoderMeta;
import org.apache.carbondata.core.datastore.page.key.TablePageKey;
import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
@@ -124,7 +123,7 @@ public class CarbonMetadataUtilTest {
meta.setDecimal(5);
meta.setMinValue(objMinArr);
meta.setMaxValue(objMaxArr);
- meta.setType(AdaptiveEncoderMeta.DOUBLE_MEASURE);
+ meta.setType(ColumnPageEncoderMeta.DOUBLE_MEASURE);
List<Encoding> encoders = new ArrayList<>();
encoders.add(Encoding.INVERTED_INDEX);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/examples/spark2/src/main/resources/data.csv
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/resources/data.csv b/examples/spark2/src/main/resources/data.csv
index 3061ec7..a63fa65 100644
--- a/examples/spark2/src/main/resources/data.csv
+++ b/examples/spark2/src/main/resources/data.csv
@@ -1,3 +1,4 @@
+shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData
1,10,1100,48.4,spark,2015/4/23 12:01:01,1.23,2015/4/23,aaa,2.5,'foo'#'bar'#'world'
5,17,1140,43.4,spark,2015/7/27 12:01:02,3.45,2015/7/27,bbb,2.5,'foo'#'bar'#'world'
1,11,1100,44.4,flink,2015/5/23 12:01:03,23.23,2015/5/23,ccc,2.5,'foo'#'bar'#'world'
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c1ddbf2/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index 3b2094a..c0429b5 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -78,8 +78,7 @@ object CarbonSessionExample {
s"""
| LOAD DATA LOCAL INPATH '$path'
| INTO TABLE carbon_table
- | OPTIONS('FILEHEADER'='shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField,complexData',
- | 'COMPLEX_DELIMITER_LEVEL_1'='#')
+ | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#')
""".stripMargin)
// scalastyle:on