You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/08/23 07:24:07 UTC
[5/7] carbondata git commit: [CARBONDATA-1371] Support creating
decoder based on encoding metadata
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
index b122615..79c8101 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
@@ -17,11 +17,32 @@
package org.apache.carbondata.core.datastore.page.encoding;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralEncoderMeta;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralEncoderMeta;
+import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
+import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressorEncoderMeta;
+import org.apache.carbondata.core.datastore.page.encoding.rle.RLECodec;
+import org.apache.carbondata.core.datastore.page.encoding.rle.RLEEncoderMeta;
import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.metadata.ColumnPageCodecMeta;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.Encoding;
+
+import static org.apache.carbondata.format.Encoding.ADAPTIVE_DELTA_INTEGRAL;
+import static org.apache.carbondata.format.Encoding.ADAPTIVE_INTEGRAL;
+import static org.apache.carbondata.format.Encoding.DIRECT_COMPRESS;
+import static org.apache.carbondata.format.Encoding.RLE_INTEGRAL;
/**
* Base class for encoding strategy implementation.
@@ -29,87 +50,69 @@ import org.apache.carbondata.core.metadata.ValueEncoderMeta;
public abstract class EncodingStrategy {
/**
- * create codec based on the page data type and statistics
+ * Return new encoder for specified column
+ */
+ public abstract ColumnPageEncoder createEncoder(TableSpec.ColumnSpec columnSpec,
+ ColumnPage inputPage);
+
+ /**
+ * Return new decoder based on encoder metadata read from file
+ */
+ public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas)
+ throws IOException {
+ assert (encodings.size() == 1);
+ assert (encoderMetas.size() == 1);
+ Encoding encoding = encodings.get(0);
+ byte[] encoderMeta = encoderMetas.get(0).array();
+ ByteArrayInputStream stream = new ByteArrayInputStream(encoderMeta);
+ DataInputStream in = new DataInputStream(stream);
+ if (encoding == DIRECT_COMPRESS) {
+ DirectCompressorEncoderMeta metadata = new DirectCompressorEncoderMeta();
+ metadata.readFields(in);
+ return new DirectCompressCodec(metadata.getDataType()).createDecoder(metadata);
+ } else if (encoding == ADAPTIVE_INTEGRAL) {
+ AdaptiveIntegralEncoderMeta metadata = new AdaptiveIntegralEncoderMeta();
+ metadata.readFields(in);
+ SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
+ return new AdaptiveIntegralCodec(metadata.getDataType(), metadata.getTargetDataType(),
+ stats).createDecoder(metadata);
+ } else if (encoding == ADAPTIVE_DELTA_INTEGRAL) {
+ AdaptiveDeltaIntegralEncoderMeta metadata = new AdaptiveDeltaIntegralEncoderMeta();
+ metadata.readFields(in);
+ SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
+ return new AdaptiveDeltaIntegralCodec(metadata.getDataType(), metadata.getTargetDataType(),
+ stats).createDecoder(metadata);
+ } else if (encoding == RLE_INTEGRAL) {
+ RLEEncoderMeta metadata = new RLEEncoderMeta();
+ metadata.readFields(in);
+ return new RLECodec().createDecoder(metadata);
+ } else {
+ // for backward compatibility
+ ValueEncoderMeta metadata = CarbonUtil.deserializeEncoderMetaV3(encoderMeta);
+ return createDecoderLegacy(metadata);
+ }
+ }
+
+ /**
+ * Old way of creating decoder, based on algorithm
*/
- public ColumnPageCodec newCodec(SimpleStatsResult stats) {
- switch (stats.getDataType()) {
+ public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata) {
+ SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
+ switch (metadata.getType()) {
case BYTE:
case SHORT:
case INT:
case LONG:
- return newCodecForIntegralType(stats);
+ return DefaultEncodingStrategy.selectCodecByAlgorithm(stats).createDecoder(null);
case FLOAT:
case DOUBLE:
- return newCodecForFloatingType(stats);
case DECIMAL:
- return newCodecForDecimalType(stats);
case BYTE_ARRAY:
// no dictionary dimension
- return newCodecForByteArrayType(stats);
+ return new DirectCompressCodec(stats.getDataType()).createDecoder(
+ new DirectCompressorEncoderMeta("snappy", stats.getDataType(), stats));
default:
throw new RuntimeException("unsupported data type: " + stats.getDataType());
}
}
-
- /**
- * create codec based on the page data type and statistics contained by ValueEncoderMeta
- */
- public ColumnPageCodec newCodec(ValueEncoderMeta meta) {
- if (meta instanceof ColumnPageCodecMeta) {
- ColumnPageCodecMeta codecMeta = (ColumnPageCodecMeta) meta;
- SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(codecMeta);
- switch (codecMeta.getSrcDataType()) {
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- return newCodecForIntegralType(stats);
- case FLOAT:
- case DOUBLE:
- return newCodecForFloatingType(stats);
- case DECIMAL:
- return newCodecForDecimalType(stats);
- case BYTE_ARRAY:
- // no dictionary dimension
- return newCodecForByteArrayType(stats);
- default:
- throw new RuntimeException("unsupported data type: " + stats.getDataType());
- }
- } else {
- SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(meta);
- switch (meta.getType()) {
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- return newCodecForIntegralType(stats);
- case FLOAT:
- case DOUBLE:
- return newCodecForFloatingType(stats);
- case DECIMAL:
- return newCodecForDecimalType(stats);
- case BYTE_ARRAY:
- // no dictionary dimension
- return newCodecForByteArrayType(stats);
- default:
- throw new RuntimeException("unsupported data type: " + stats.getDataType());
- }
- }
- }
-
- // for byte, short, int, long
- abstract ColumnPageCodec newCodecForIntegralType(SimpleStatsResult stats);
-
- // for float, double
- abstract ColumnPageCodec newCodecForFloatingType(SimpleStatsResult stats);
-
- // for decimal
- abstract ColumnPageCodec newCodecForDecimalType(SimpleStatsResult stats);
-
- // for byte array
- abstract ColumnPageCodec newCodecForByteArrayType(SimpleStatsResult stats);
-
- // for dimension column
- public abstract ColumnPageCodec newCodec(TableSpec.DimensionSpec dimensionSpec);
-
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java
new file mode 100644
index 0000000..56527cb
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategyFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+/**
+ * Factory to create Encoding Strategy.
+ * Now only a default strategy is supported which will choose encoding based on data type
+ * and column data stats.
+ */
+public class EncodingStrategyFactory {
+
+ private static EncodingStrategy defaultStrategy = new DefaultEncodingStrategy();
+
+ public static EncodingStrategy getStrategy() {
+ // TODO: make it configurable after added new strategy
+ return defaultStrategy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/HighCardDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/HighCardDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/HighCardDictDimensionIndexCodec.java
deleted file mode 100644
index c1620c6..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/HighCardDictDimensionIndexCodec.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import org.apache.carbondata.core.datastore.DimensionType;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForInt;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
-import org.apache.carbondata.core.util.ByteUtil;
-
-public class HighCardDictDimensionIndexCodec extends IndexStorageCodec {
-
- HighCardDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) {
- super(isSort, isInvertedIndex, compressor);
- }
-
- @Override
- public String getName() {
- return "HighCardDictDimensionIndexCodec";
- }
-
- @Override
- public EncodedColumnPage encode(ColumnPage input) throws MemoryException {
- IndexStorage indexStorage;
- byte[][] data = input.getByteArrayPage();
- if (isInvertedIndex) {
- if (version == ColumnarFormatVersion.V3) {
- indexStorage = new BlockIndexerStorageForShort(data, false, true, isSort);
- } else {
- indexStorage = new BlockIndexerStorageForInt(data, false, true, isSort);
- }
- } else {
- if (version == ColumnarFormatVersion.V3) {
- indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, true);
- } else {
- indexStorage = new BlockIndexerStorageForNoInvertedIndexForInt(data);
- }
- }
- byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
- byte[] compressed = compressor.compressByte(flattened);
- return new EncodedDimensionPage(input.getPageSize(), compressed, indexStorage,
- DimensionType.PLAIN_VALUE);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/IndexStorageCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/IndexStorageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/IndexStorageCodec.java
deleted file mode 100644
index 3122b15..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/IndexStorageCodec.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
-import org.apache.carbondata.core.util.CarbonProperties;
-
-public abstract class IndexStorageCodec implements ColumnPageCodec {
- protected ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
- protected Compressor compressor;
- protected boolean isSort;
- protected boolean isInvertedIndex;
-
- IndexStorageCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) {
- this.isSort = isSort;
- this.isInvertedIndex = isInvertedIndex;
- this.compressor = compressor;
- }
-
- @Override
- public EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) {
- throw new UnsupportedOperationException("internal error");
- }
-
- @Override
- public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
- throw new UnsupportedOperationException("internal error");
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/RLECodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/RLECodec.java
deleted file mode 100644
index dda89e0..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/RLECodec.java
+++ /dev/null
@@ -1,419 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
-import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.CodecMetaFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-
-/**
- * RLE encoding implementation for integral column page.
- * This encoding keeps track of repeated-run and non-repeated-run, and make use
- * of the highest bit of the length field to indicate the type of run.
- * The length field is encoded as 16 bits value. (Page size must be less than 65535 rows)
- *
- * For example: input data {5, 5, 1, 2, 3, 3, 3, 3, 3} will be encoded to
- * {0x00, 0x02, 0x05, (repeated-run, 2 values of 5)
- * 0x80, 0x03, 0x01, 0x02, 0x03, (non-repeated-run, 3 values: 1, 2, 3)
- * 0x00, 0x04, 0x03} (repeated-run, 4 values of 3)
- */
-public class RLECodec implements ColumnPageCodec {
-
- enum RUN_STATE { INIT, START, REPEATED_RUN, NONREPEATED_RUN }
-
- private DataType dataType;
- private int pageSize;
-
- /**
- * New RLECodec
- * @param dataType data type of the raw column page before encode
- * @param pageSize page size of the raw column page before encode
- */
- RLECodec(DataType dataType, int pageSize) {
- this.dataType = dataType;
- this.pageSize = pageSize;
- }
-
- @Override
- public String getName() {
- return "RLECodec";
- }
-
- @Override
- public EncodedColumnPage encode(ColumnPage input) throws MemoryException, IOException {
- Encoder encoder = new Encoder();
- return encoder.encode(input);
- }
-
- @Override
- public EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) {
- throw new UnsupportedOperationException("complex column does not support RLE encoding");
- }
-
- @Override
- public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException,
- IOException {
- Decoder decoder = new Decoder(dataType, pageSize);
- return decoder.decode(input, offset, length);
- }
-
- // This codec supports integral type only
- private void validateDataType(DataType dataType) {
- switch (dataType) {
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- break;
- default:
- throw new UnsupportedOperationException(dataType + " is not supported for RLE");
- }
- }
-
- private class Encoder {
- // While encoding RLE, this class internally work as a state machine
- // INIT state is the initial state before any value comes
- // START state is the start for each run
- // REPEATED_RUN state means it is collecting repeated values (`lastValue`)
- // NONREPEATED_RUN state means it is collecting non-repeated values (`nonRepeatValues`)
- private RUN_STATE runState;
-
- // count for each run, either REPEATED_RUN or NONREPEATED_RUN
- private short valueCount;
-
- // collected value for REPEATED_RUN
- private Object lastValue;
-
- // collected value for NONREPEATED_RUN
- private List<Object> nonRepeatValues;
-
- // data type of input page
- private DataType dataType;
-
- // output stream for encoded data
- private ByteArrayOutputStream bao;
- private DataOutputStream stream;
-
- private Encoder() {
- this.runState = RUN_STATE.INIT;
- this.valueCount = 0;
- this.nonRepeatValues = new ArrayList<>();
- this.bao = new ByteArrayOutputStream();
- this.stream = new DataOutputStream(bao);
- }
-
- private EncodedColumnPage encode(ColumnPage input) throws MemoryException, IOException {
- validateDataType(input.getDataType());
- this.dataType = input.getDataType();
- switch (dataType) {
- case BYTE:
- byte[] bytePage = input.getBytePage();
- for (int i = 0; i < bytePage.length; i++) {
- putValue(bytePage[i]);
- }
- break;
- case SHORT:
- short[] shortPage = input.getShortPage();
- for (int i = 0; i < shortPage.length; i++) {
- putValue(shortPage[i]);
- }
- break;
- case INT:
- int[] intPage = input.getIntPage();
- for (int i = 0; i < intPage.length; i++) {
- putValue(intPage[i]);
- }
- break;
- case LONG:
- long[] longPage = input.getLongPage();
- for (int i = 0; i < longPage.length; i++) {
- putValue(longPage[i]);
- }
- break;
- default:
- throw new UnsupportedOperationException(input.getDataType() +
- " does not support RLE encoding");
- }
- byte[] encoded = collectResult();
- SimpleStatsResult stats = (SimpleStatsResult) input.getStatistics();
- return new EncodedMeasurePage(
- input.getPageSize(),
- encoded,
- CodecMetaFactory.createMeta(stats, input.getDataType()),
- stats.getNullBits());
- }
-
- private void putValue(Object value) throws IOException {
- if (runState == RUN_STATE.INIT) {
- startNewRun(value);
- } else {
- if (lastValue.equals(value)) {
- putRepeatValue(value);
- } else {
- putNonRepeatValue(value);
- }
- }
- }
-
- // when last row is reached, write out all collected data
- private byte[] collectResult() throws IOException {
- switch (runState) {
- case REPEATED_RUN:
- writeRunLength(valueCount);
- writeRunValue(lastValue);
- break;
- case NONREPEATED_RUN:
- writeRunLength(valueCount | 0x8000);
- for (int i = 0; i < valueCount; i++) {
- writeRunValue(nonRepeatValues.get(i));
- }
- break;
- default:
- assert (runState == RUN_STATE.START);
- writeRunLength(1);
- writeRunValue(lastValue);
- }
- return bao.toByteArray();
- }
-
- private void writeRunLength(int length) throws IOException {
- stream.writeShort(length);
- }
-
- private void writeRunValue(Object value) throws IOException {
- switch (dataType) {
- case BYTE:
- stream.writeByte((byte) value);
- break;
- case SHORT:
- stream.writeShort((short) value);
- break;
- case INT:
- stream.writeInt((int) value);
- break;
- case LONG:
- stream.writeLong((long) value);
- break;
- default:
- throw new RuntimeException("internal error");
- }
- }
-
- // for each run, call this to initialize the state and clear the collected data
- private void startNewRun(Object value) {
- runState = RUN_STATE.START;
- valueCount = 1;
- lastValue = value;
- nonRepeatValues.clear();
- nonRepeatValues.add(value);
- }
-
- // non-repeated run ends, put the collected data to result page
- private void encodeNonRepeatedRun() throws IOException {
- // put the value count (highest bit is 1) and all collected values
- writeRunLength(valueCount | 0x8000);
- for (int i = 0; i < valueCount; i++) {
- writeRunValue(nonRepeatValues.get(i));
- }
- }
-
- // repeated run ends, put repeated value to result page
- private void encodeRepeatedRun() throws IOException {
- // put the value count (highest bit is 0) and repeated value
- writeRunLength(valueCount);
- writeRunValue(lastValue);
- }
-
- private void putRepeatValue(Object value) throws IOException {
- switch (runState) {
- case REPEATED_RUN:
- valueCount++;
- break;
- case NONREPEATED_RUN:
- // non-repeated run ends, encode this run
- encodeNonRepeatedRun();
- startNewRun(value);
- break;
- default:
- assert (runState == RUN_STATE.START);
- // enter repeated run
- runState = RUN_STATE.REPEATED_RUN;
- valueCount++;
- break;
- }
- }
-
- private void putNonRepeatValue(Object value) throws IOException {
- switch (runState) {
- case NONREPEATED_RUN:
- // collect the non-repeated value
- nonRepeatValues.add(value);
- lastValue = value;
- valueCount++;
- break;
- case REPEATED_RUN:
- // repeated-run ends, encode this run
- encodeRepeatedRun();
- startNewRun(value);
- break;
- default:
- assert (runState == RUN_STATE.START);
- // enter non-repeated run
- runState = RUN_STATE.NONREPEATED_RUN;
- nonRepeatValues.add(value);
- lastValue = value;
- valueCount++;
- break;
- }
- }
-
- }
-
- // It decodes data in one shot. It is suitable for scan query
- // TODO: add a on-the-fly decoder for filter query with high selectivity
- private class Decoder {
-
- // src data type
- private DataType dataType;
- private int pageSize;
-
- private Decoder(DataType dataType, int pageSize) throws MemoryException {
- validateDataType(dataType);
- this.dataType = dataType;
- this.pageSize = pageSize;
- }
-
- private ColumnPage decode(byte[] input, int offset, int length)
- throws MemoryException, IOException {
- DataInputStream in = new DataInputStream(new ByteArrayInputStream(input, offset, length));
- ColumnPage resultPage = ColumnPage.newPage(dataType, pageSize);
- switch (dataType) {
- case BYTE:
- decodeBytePage(in, resultPage);
- break;
- case SHORT:
- decodeShortPage(in, resultPage);
- break;
- case INT:
- decodeIntPage(in, resultPage);
- break;
- case LONG:
- decodeLongPage(in, resultPage);
- break;
- default:
- throw new RuntimeException("unsupported datatype:" + dataType);
- }
- return resultPage;
- }
-
- private void decodeBytePage(DataInputStream in, ColumnPage decodedPage)
- throws IOException {
- int rowId = 0;
- do {
- int runLength = in.readShort();
- int count = runLength & 0x7FFF;
- if (runLength < 0) {
- // non-repeated run
- for (int i = 0; i < count; i++) {
- decodedPage.putByte(rowId++, in.readByte());
- }
- } else {
- // repeated run
- byte value = in.readByte();
- for (int i = 0; i < count; i++) {
- decodedPage.putByte(rowId++, value);
- }
- }
- } while (in.available() > 0);
- }
-
- private void decodeShortPage(DataInputStream in, ColumnPage decodedPage)
- throws IOException {
- int rowId = 0;
- do {
- int runLength = in.readShort();
- int count = runLength & 0x7FFF;
- if (runLength < 0) {
- // non-repeated run
- for (int i = 0; i < count; i++) {
- decodedPage.putShort(rowId++, in.readShort());
- }
- } else {
- // repeated run
- short value = in.readShort();
- for (int i = 0; i < count; i++) {
- decodedPage.putShort(rowId++, value);
- }
- }
- } while (in.available() > 0);
- }
-
- private void decodeIntPage(DataInputStream in, ColumnPage decodedPage)
- throws IOException {
- int rowId = 0;
- do {
- int runLength = in.readShort();
- int count = runLength & 0x7FFF;
- if (runLength < 0) {
- // non-repeated run
- for (int i = 0; i < count; i++) {
- decodedPage.putInt(rowId++, in.readInt());
- }
- } else {
- // repeated run
- int value = in.readInt();
- for (int i = 0; i < count; i++) {
- decodedPage.putInt(rowId++, value);
- }
- }
- } while (in.available() > 0);
- }
-
- private void decodeLongPage(DataInputStream in, ColumnPage decodedPage)
- throws IOException {
- int rowId = 0;
- do {
- int runLength = in.readShort();
- int count = runLength & 0x7FFF;
- if (runLength < 0) {
- // non-repeated run
- for (int i = 0; i < count; i++) {
- decodedPage.putLong(rowId++, in.readLong());
- }
- } else {
- // repeated run
- long value = in.readLong();
- for (int i = 0; i < count; i++) {
- decodedPage.putLong(rowId++, value);
- }
- }
- } while (in.available() > 0);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
new file mode 100644
index 0000000..94ca3e6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveCodec.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.adaptive;
+
+import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Subclass of this codec depends on statistics of the column page (adaptive) to perform apply
+ * and decode, it also employs compressor to compress the encoded data
+ */
+public abstract class AdaptiveCodec implements ColumnPageCodec {
+
+ // TODO: cache and reuse the same encoder since snappy is thread-safe
+
+ // statistics of this page, can be used by subclass
+ protected final SimpleStatsResult stats;
+
+ // the data type used for storage
+ protected final DataType targetDataType;
+
+ // the data type specified in schema
+ protected final DataType srcDataType;
+
+ protected AdaptiveCodec(DataType srcDataType, DataType targetDataType,
+ SimpleStatsResult stats) {
+ this.stats = stats;
+ this.srcDataType = srcDataType;
+ this.targetDataType = targetDataType;
+ }
+
+ public EncodedColumnPage[] encodeComplexColumn(ComplexColumnPage input) {
+ throw new UnsupportedOperationException("internal error");
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s[src type: %s, target type: %s, stats(%s)]",
+ getClass().getName(), srcDataType, targetDataType, stats);
+ }
+
+ protected String debugInfo() {
+ return this.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
new file mode 100644
index 0000000..9107a6b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.adaptive;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.format.Encoding;
+
+/**
+ * Codec for integer (byte, short, int, long) data type page.
+ * This codec will calculate delta of page max value and page value,
+ * and do type casting of the diff to make storage minimum.
+ */
+public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
+
+ private ColumnPage encodedPage;
+ private long max;
+
+ public AdaptiveDeltaIntegralCodec(DataType srcDataType, DataType targetDataType,
+ SimpleStatsResult stats) {
+ super(srcDataType, targetDataType, stats);
+ switch (srcDataType) {
+ case BYTE:
+ this.max = (byte) stats.getMax();
+ break;
+ case SHORT:
+ this.max = (short) stats.getMax();
+ break;
+ case INT:
+ this.max = (int) stats.getMax();
+ break;
+ case LONG:
+ this.max = (long) stats.getMax();
+ break;
+ default:
+ // this codec is for integer type only
+ throw new UnsupportedOperationException(
+ "unsupported data type for Delta compress: " + srcDataType);
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "DeltaIntegralCodec";
+ }
+
+ @Override
+ public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
+ return new ColumnPageEncoder() {
+ final Compressor compressor = CompressorFactory.getInstance().getCompressor();
+
+ @Override
+ protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
+ if (encodedPage != null) {
+ throw new IllegalStateException("already encoded");
+ }
+ encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+ input.convertValue(converter);
+ byte[] result = encodedPage.compress(compressor);
+ encodedPage.freeMemory();
+ return result;
+ }
+
+ @Override
+ protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
+ return new AdaptiveDeltaIntegralEncoderMeta(
+ compressor.getName(), targetDataType, inputPage.getStatistics());
+ }
+
+ @Override
+ protected List<Encoding> getEncodingList() {
+ List<Encoding> encodings = new ArrayList<>();
+ encodings.add(Encoding.ADAPTIVE_DELTA_INTEGRAL);
+ return encodings;
+ }
+
+ };
+ }
+
+ @Override
+ public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
+ AdaptiveDeltaIntegralEncoderMeta codecMeta = (AdaptiveDeltaIntegralEncoderMeta) meta;
+ final Compressor compressor = CompressorFactory.getInstance().getCompressor(
+ codecMeta.getCompressorName());
+ return new ColumnPageDecoder() {
+ @Override
+ public ColumnPage decode(byte[] input, int offset, int length)
+ throws MemoryException, IOException {
+ ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+ DeltaIntegralConverter converter = new DeltaIntegralConverter(page, targetDataType,
+ srcDataType, stats.getMax());
+ return LazyColumnPage.newPage(page, converter);
+ }
+ };
+ }
+
+ private ColumnPageValueConverter converter = new ColumnPageValueConverter() {
+ @Override
+ public void encode(int rowId, byte value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(max - value));
+ break;
+ default:
+ throw new RuntimeException("internal error");
+ }
+ }
+
+ @Override
+ public void encode(int rowId, short value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(max - value));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(max - value));
+ break;
+ default:
+ throw new RuntimeException("internal error");
+ }
+ }
+
+ @Override
+ public void encode(int rowId, int value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(max - value));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(max - value));
+ break;
+ case SHORT_INT:
+ encodedPage.putShortInt(rowId, (int)(max - value));
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int)(max - value));
+ break;
+ default:
+ throw new RuntimeException("internal error");
+ }
+ }
+
+ @Override
+ public void encode(int rowId, long value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(max - value));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(max - value));
+ break;
+ case SHORT_INT:
+ encodedPage.putShortInt(rowId, (int)(max - value));
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int)(max - value));
+ break;
+ case LONG:
+ encodedPage.putLong(rowId, max - value);
+ break;
+ default:
+ throw new RuntimeException("internal error");
+ }
+ }
+
+ @Override
+ public void encode(int rowId, float value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(max - value));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(max - value));
+ break;
+ case SHORT_INT:
+ encodedPage.putShortInt(rowId, (int)(max - value));
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int)(max - value));
+ break;
+ case LONG:
+ encodedPage.putLong(rowId, (long)(max - value));
+ break;
+ default:
+ throw new RuntimeException("internal error");
+ }
+ }
+
+ @Override
+ public void encode(int rowId, double value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(max - value));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(max - value));
+ break;
+ case SHORT_INT:
+ encodedPage.putShortInt(rowId, (int)(max - value));
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int)(max - value));
+ break;
+ case LONG:
+ encodedPage.putLong(rowId, (long)(max - value));
+ break;
+ default:
+ throw new RuntimeException("internal error");
+ }
+ }
+
+ @Override
+ public long decodeLong(byte value) {
+ return max - value;
+ }
+
+ @Override
+ public long decodeLong(short value) {
+ return max - value;
+ }
+
+ @Override
+ public long decodeLong(int value) {
+ return max - value;
+ }
+
+ @Override
+ public double decodeDouble(byte value) {
+ return max - value;
+ }
+
+ @Override
+ public double decodeDouble(short value) {
+ return max - value;
+ }
+
+ @Override
+ public double decodeDouble(int value) {
+ return max - value;
+ }
+
+ @Override
+ public double decodeDouble(long value) {
+ return max - value;
+ }
+
+ @Override
+ public double decodeDouble(float value) {
+ // this codec is for integer type only
+ throw new RuntimeException("internal error");
+ }
+
+ @Override
+ public double decodeDouble(double value) {
+ // this codec is for integer type only
+ throw new RuntimeException("internal error");
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java
new file mode 100644
index 0000000..1d20481
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralEncoderMeta.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.adaptive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+
+public class AdaptiveDeltaIntegralEncoderMeta extends AdaptiveEncoderMeta implements Writable {
+
+ public AdaptiveDeltaIntegralEncoderMeta() {
+ }
+
+ AdaptiveDeltaIntegralEncoderMeta(String compressorName, DataType targetDataType,
+ SimpleStatsResult stats) {
+ super(targetDataType, stats, compressorName);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java
new file mode 100644
index 0000000..3104dd6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveEncoderMeta.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.adaptive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+
+/**
+ * Metadata for AdaptiveIntegralCodec and DeltaIntegralCodec
+ */
+public class AdaptiveEncoderMeta extends ColumnPageEncoderMeta implements Writable {
+
+ private DataType targetDataType;
+ private String compressorName;
+
+ AdaptiveEncoderMeta() {
+
+ }
+
+ AdaptiveEncoderMeta(DataType targetDataType, SimpleStatsResult stats,
+ String compressorName) {
+ super(stats.getDataType(), stats);
+ this.targetDataType = targetDataType;
+ this.compressorName = compressorName;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeByte(targetDataType.ordinal());
+ out.writeUTF(compressorName);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ this.targetDataType = DataType.valueOf(in.readByte());
+ this.compressorName = in.readUTF();
+ }
+
+ public DataType getTargetDataType() {
+ return targetDataType;
+ }
+
+ public String getCompressorName() {
+ return compressorName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
new file mode 100644
index 0000000..ccd92d1
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.adaptive;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.format.Encoding;
+
+/**
+ * Codec for integer (byte, short, int, long) data type page.
+ * This converter will do type casting on page data to make storage minimum.
+ */
+public class AdaptiveIntegralCodec extends AdaptiveCodec {
+
+ private ColumnPage encodedPage;
+
+ public AdaptiveIntegralCodec(DataType srcDataType, DataType targetDataType,
+ SimpleStatsResult stats) {
+ super(srcDataType, targetDataType, stats);
+ }
+
+ @Override
+ public String getName() {
+ return "AdaptiveIntegralCodec";
+ }
+
+ @Override
+ public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
+ final Compressor compressor = CompressorFactory.getInstance().getCompressor();
+ return new ColumnPageEncoder() {
+ @Override
+ protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
+ if (encodedPage != null) {
+ throw new IllegalStateException("already encoded");
+ }
+ encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+ input.convertValue(converter);
+ byte[] result = encodedPage.compress(compressor);
+ encodedPage.freeMemory();
+ return result;
+ }
+
+ @Override
+ protected List<Encoding> getEncodingList() {
+ List<Encoding> encodings = new ArrayList<Encoding>();
+ encodings.add(Encoding.ADAPTIVE_INTEGRAL);
+ return encodings;
+ }
+
+ @Override
+ protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
+ return new AdaptiveIntegralEncoderMeta(targetDataType, stats, compressor.getName());
+ }
+
+ };
+ }
+
+ @Override
+ public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
+ AdaptiveIntegralEncoderMeta codecMeta = (AdaptiveIntegralEncoderMeta) meta;
+ final Compressor compressor = CompressorFactory.getInstance().getCompressor(
+ codecMeta.getCompressorName());
+ final DataType targetDataType = codecMeta.getTargetDataType();
+ return new ColumnPageDecoder() {
+ @Override
+ public ColumnPage decode(byte[] input, int offset, int length)
+ throws MemoryException, IOException {
+ ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+ return LazyColumnPage.newPage(page, converter);
+ }
+ };
+ }
+
+ // encoded value = (type cast page value to target data type)
+ private ColumnPageValueConverter converter = new ColumnPageValueConverter() {
+ @Override
+ public void encode(int rowId, byte value) {
+ switch (targetDataType) {
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public void encode(int rowId, short value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte) value);
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public void encode(int rowId, int value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte) value);
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short) value);
+ break;
+ case SHORT_INT:
+ encodedPage.putShortInt(rowId, value);
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public void encode(int rowId, long value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte) value);
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short) value);
+ break;
+ case SHORT_INT:
+ encodedPage.putShortInt(rowId, (int) value);
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int) value);
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public void encode(int rowId, float value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte) value);
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short) value);
+ break;
+ case SHORT_INT:
+ encodedPage.putShortInt(rowId, (int) value);
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int) value);
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public void encode(int rowId, double value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte) value);
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short) value);
+ break;
+ case SHORT_INT:
+ encodedPage.putShortInt(rowId, (int) value);
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int) value);
+ break;
+ case LONG:
+ encodedPage.putLong(rowId, (long) value);
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public long decodeLong(byte value) {
+ return value;
+ }
+
+ @Override
+ public long decodeLong(short value) {
+ return value;
+ }
+
+ @Override
+ public long decodeLong(int value) {
+ return value;
+ }
+
+ @Override
+ public double decodeDouble(byte value) {
+ return value;
+ }
+
+ @Override
+ public double decodeDouble(short value) {
+ return value;
+ }
+
+ @Override
+ public double decodeDouble(int value) {
+ return value;
+ }
+
+ @Override
+ public double decodeDouble(long value) {
+ return value;
+ }
+
+ @Override
+ public double decodeDouble(float value) {
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public double decodeDouble(double value) {
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ };
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java
new file mode 100644
index 0000000..3025303
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralEncoderMeta.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.adaptive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+
+public class AdaptiveIntegralEncoderMeta extends AdaptiveEncoderMeta implements Writable {
+
+ public AdaptiveIntegralEncoderMeta() {
+ }
+
+ public AdaptiveIntegralEncoderMeta(DataType targetDataType, SimpleStatsResult stats,
+ String compressorName) {
+ super(targetDataType, stats, compressorName);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/DeltaIntegralConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/DeltaIntegralConverter.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/DeltaIntegralConverter.java
new file mode 100644
index 0000000..8a2bf6d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/DeltaIntegralConverter.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.adaptive;
+
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+public class DeltaIntegralConverter implements ColumnPageValueConverter {
+ private DataType targetDataType;
+ private ColumnPage encodedPage;
+ private long max;
+
+ public DeltaIntegralConverter(ColumnPage encodedPage, DataType targetDataType,
+ DataType srcDataType, Object max) {
+ this.targetDataType = targetDataType;
+ this.encodedPage = encodedPage;
+ switch (srcDataType) {
+ case BYTE:
+ this.max = (byte) max;
+ break;
+ case SHORT:
+ this.max = (short) max;
+ break;
+ case INT:
+ this.max = (int) max;
+ break;
+ case LONG:
+ this.max = (long) max;
+ break;
+ case FLOAT:
+ case DOUBLE:
+ this.max = (long)(max);
+ break;
+ }
+ }
+
+ @Override
+ public void encode(int rowId, byte value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(max - value));
+ break;
+ default:
+ throw new RuntimeException("internal error");
+ }
+ }
+
+ @Override
+ public void encode(int rowId, short value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(max - value));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(max - value));
+ break;
+ default:
+ throw new RuntimeException("internal error");
+ }
+ }
+
+ @Override
+ public void encode(int rowId, int value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(max - value));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(max - value));
+ break;
+ case SHORT_INT:
+ encodedPage.putShortInt(rowId, (int)(max - value));
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int)(max - value));
+ break;
+ default:
+ throw new RuntimeException("internal error");
+ }
+ }
+
+ @Override
+ public void encode(int rowId, long value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(max - value));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(max - value));
+ break;
+ case SHORT_INT:
+ encodedPage.putShortInt(rowId, (int)(max - value));
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int)(max - value));
+ break;
+ case LONG:
+ encodedPage.putLong(rowId, max - value);
+ break;
+ default:
+ throw new RuntimeException("internal error");
+ }
+ }
+
+ @Override
+ public void encode(int rowId, float value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(max - value));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(max - value));
+ break;
+ case SHORT_INT:
+ encodedPage.putShortInt(rowId, (int)(max - value));
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int)(max - value));
+ break;
+ case LONG:
+ encodedPage.putLong(rowId, (long)(max - value));
+ break;
+ default:
+ throw new RuntimeException("internal error");
+ }
+ }
+
+ @Override
+ public void encode(int rowId, double value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(max - value));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(max - value));
+ break;
+ case SHORT_INT:
+ encodedPage.putShortInt(rowId, (int)(max - value));
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int)(max - value));
+ break;
+ case LONG:
+ encodedPage.putLong(rowId, (long)(max - value));
+ break;
+ default:
+ throw new RuntimeException("internal error");
+ }
+ }
+
+ @Override
+ public long decodeLong(byte value) {
+ return max - value;
+ }
+
+ @Override
+ public long decodeLong(short value) {
+ return max - value;
+ }
+
+ @Override
+ public long decodeLong(int value) {
+ return max - value;
+ }
+
+ @Override
+ public double decodeDouble(byte value) {
+ return max - value;
+ }
+
+ @Override
+ public double decodeDouble(short value) {
+ return max - value;
+ }
+
+ @Override
+ public double decodeDouble(int value) {
+ return max - value;
+ }
+
+ @Override
+ public double decodeDouble(long value) {
+ return max - value;
+ }
+
+ @Override
+ public double decodeDouble(float value) {
+ // this codec is for integer type only
+ throw new RuntimeException("internal error");
+ }
+
+ @Override
+ public double decodeDouble(double value) {
+ // this codec is for integer type only
+ throw new RuntimeException("internal error");
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
new file mode 100644
index 0000000..1fc61ac
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.compress;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.format.Encoding;
+
+/**
+ * This codec directly apply compression on the input data
+ */
+public class DirectCompressCodec implements ColumnPageCodec {
+
+ private DataType dataType;
+
+ public DirectCompressCodec(DataType dataType) {
+ this.dataType = dataType;
+ }
+
+ @Override
+ public String getName() {
+ return "DirectCompressCodec";
+ }
+
+ @Override
+ public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
+ // TODO: make compressor configurable in create table
+ return new DirectCompressor(CarbonCommonConstants.DEFAULT_COMPRESSOR);
+ }
+
+ @Override
+ public ColumnPageDecoder createDecoder(ColumnPageEncoderMeta meta) {
+ DirectCompressorEncoderMeta codecMeta = (DirectCompressorEncoderMeta) meta;
+ return new DirectDecompressor(codecMeta.getCompressorName(),
+ codecMeta.getScale(), codecMeta.getPrecision());
+ }
+
+ private class DirectCompressor extends ColumnPageEncoder {
+
+ private Compressor compressor;
+
+ DirectCompressor(String compressorName) {
+ this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
+ }
+
+ @Override
+ protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
+ return input.compress(compressor);
+ }
+
+ @Override
+ protected List<Encoding> getEncodingList() {
+ List<Encoding> encodings = new ArrayList<>();
+ encodings.add(Encoding.DIRECT_COMPRESS);
+ return encodings;
+ }
+
+ @Override
+ protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
+ return new DirectCompressorEncoderMeta(compressor.getName(), inputPage.getDataType(),
+ inputPage.getStatistics());
+ }
+
+ }
+
+ private class DirectDecompressor implements ColumnPageDecoder {
+
+ private Compressor compressor;
+ private int scale;
+ private int precision;
+
+ DirectDecompressor(String compressorName, int scale, int precision) {
+ this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
+ this.scale = scale;
+ this.precision = precision;
+ }
+
+ @Override
+ public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
+ ColumnPage decodedPage;
+ if (dataType == DataType.DECIMAL) {
+ decodedPage = ColumnPage.decompressDecimalPage(compressor, input, offset, length,
+ scale, precision);
+ } else {
+ decodedPage = ColumnPage.decompress(compressor, dataType, input, offset, length);
+ }
+ return LazyColumnPage.newPage(decodedPage, converter);
+ }
+ }
+
+ private ColumnPageValueConverter converter = new ColumnPageValueConverter() {
+ @Override
+ public void encode(int rowId, byte value) {
+ throw new RuntimeException("internal error");
+ }
+
+ @Override
+ public void encode(int rowId, short value) {
+ throw new RuntimeException("internal error");
+ }
+
+ @Override
+ public void encode(int rowId, int value) {
+ throw new RuntimeException("internal error");
+ }
+
+ @Override
+ public void encode(int rowId, long value) {
+ throw new RuntimeException("internal error");
+ }
+
+ @Override
+ public void encode(int rowId, float value) {
+ throw new RuntimeException("internal error");
+ }
+
+ @Override
+ public void encode(int rowId, double value) {
+ throw new RuntimeException("internal error");
+ }
+
+ @Override
+ public long decodeLong(byte value) {
+ return value;
+ }
+
+ @Override
+ public long decodeLong(short value) {
+ return value;
+ }
+
+ @Override
+ public long decodeLong(int value) {
+ return value;
+ }
+
+ @Override
+ public double decodeDouble(byte value) {
+ return value;
+ }
+
+ @Override
+ public double decodeDouble(short value) {
+ return value;
+ }
+
+ @Override
+ public double decodeDouble(int value) {
+ return value;
+ }
+
+ @Override
+ public double decodeDouble(long value) {
+ return value;
+ }
+
+ @Override
+ public double decodeDouble(float value) {
+ return value;
+ }
+
+ @Override
+ public double decodeDouble(double value) {
+ return value;
+ }
+ };
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java
new file mode 100644
index 0000000..cf19259
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressorEncoderMeta.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.compress;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+
+public class DirectCompressorEncoderMeta extends ColumnPageEncoderMeta implements Writable {
+ private String compressorName;
+
+ public DirectCompressorEncoderMeta() {
+ }
+
+ public DirectCompressorEncoderMeta(String compressorName, final DataType dataType,
+ SimpleStatsResult stats) {
+ super(dataType, stats);
+ this.compressorName = compressorName;
+ }
+
+ public String getCompressorName() {
+ return compressorName;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeUTF(compressorName);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ compressorName = in.readUTF();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
new file mode 100644
index 0000000..e37b8f6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/ComplexDimensionIndexCodec.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.format.Encoding;
+
+public class ComplexDimensionIndexCodec extends IndexStorageCodec {
+
+ public ComplexDimensionIndexCodec(boolean isSort, boolean isInvertedIndex,
+ Compressor compressor) {
+ super(isSort, isInvertedIndex, compressor);
+ }
+
+ @Override
+ public String getName() {
+ return "ComplexDimensionIndexCodec";
+ }
+
+ @Override
+ public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
+ return new IndexStorageEncoder() {
+ @Override
+ void encodeIndexStorage(ColumnPage inputPage) {
+ IndexStorage indexStorage =
+ new BlockIndexerStorageForShort(inputPage.getByteArrayPage(), false, false, false);
+ byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+ byte[] compressed = compressor.compressByte(flattened);
+ super.indexStorage = indexStorage;
+ super.compressedDataPage = compressed;
+ }
+
+ @Override
+ protected List<Encoding> getEncodingList() {
+ List<Encoding> encodings = new ArrayList<>();
+ encodings.add(Encoding.DICTIONARY);
+ encodings.add(Encoding.INVERTED_INDEX);
+ return encodings;
+ }
+
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
new file mode 100644
index 0000000..d157654
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DictDimensionIndexCodec.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.format.Encoding;
+
+public class DictDimensionIndexCodec extends IndexStorageCodec {
+
+ public DictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex, Compressor compressor) {
+ super(isSort, isInvertedIndex, compressor);
+ }
+
+ @Override
+ public String getName() {
+ return "DictDimensionIndexCodec";
+ }
+
+ @Override
+ public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
+ return new IndexStorageEncoder() {
+ @Override
+ void encodeIndexStorage(ColumnPage inputPage) {
+ IndexStorage indexStorage;
+ byte[][] data = inputPage.getByteArrayPage();
+ if (isInvertedIndex) {
+ indexStorage = new BlockIndexerStorageForShort(data, true, false, isSort);
+ } else {
+ indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+ }
+ byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+ super.compressedDataPage = compressor.compressByte(flattened);
+ super.indexStorage = indexStorage;
+ }
+
+ @Override
+ protected List<Encoding> getEncodingList() {
+ List<Encoding> encodings = new ArrayList<>();
+ encodings.add(Encoding.DICTIONARY);
+ encodings.add(Encoding.RLE);
+ if (isInvertedIndex) {
+ encodings.add(Encoding.INVERTED_INDEX);
+ }
+ return encodings;
+ }
+
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6a4f641/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
new file mode 100644
index 0000000..1e5015b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/DirectDictDimensionIndexCodec.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.dimension.legacy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.format.Encoding;
+
+public class DirectDictDimensionIndexCodec extends IndexStorageCodec {
+
+ public DirectDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex,
+ Compressor compressor) {
+ super(isSort, isInvertedIndex, compressor);
+ }
+
+ @Override
+ public String getName() {
+ return "DirectDictDimensionIndexCodec";
+ }
+
+ @Override
+ public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
+ return new IndexStorageEncoder() {
+ @Override
+ void encodeIndexStorage(ColumnPage inputPage) {
+ IndexStorage indexStorage;
+ byte[][] data = inputPage.getByteArrayPage();
+ if (isInvertedIndex) {
+ indexStorage = new BlockIndexerStorageForShort(data, false, false, isSort);
+ } else {
+ indexStorage = new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+ }
+ byte[] flattened = ByteUtil.flatten(indexStorage.getDataPage());
+ super.compressedDataPage = compressor.compressByte(flattened);
+ super.indexStorage = indexStorage;
+ }
+
+ @Override
+ protected List<Encoding> getEncodingList() {
+ List<Encoding> encodings = new ArrayList<>();
+ encodings.add(Encoding.DICTIONARY);
+ encodings.add(Encoding.RLE);
+ if (isInvertedIndex) {
+ encodings.add(Encoding.INVERTED_INDEX);
+ }
+ return encodings;
+ }
+ };
+ }
+
+}