You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/06/20 07:29:12 UTC
[06/56] [abbrv] carbondata git commit: add EncodingStrategy
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/PrimitiveCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/PrimitiveCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/PrimitiveCodec.java
new file mode 100644
index 0000000..31eb7ac
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/PrimitiveCodec.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+// Transformation type that can be applied to ColumnPage
+public interface PrimitiveCodec {
+ void encode(int rowId, byte value);
+ void encode(int rowId, short value);
+ void encode(int rowId, int value);
+ void encode(int rowId, long value);
+ void encode(int rowId, float value);
+ void encode(int rowId, double value);
+
+ long decodeLong(byte value);
+ long decodeLong(short value);
+ long decodeLong(int value);
+ double decodeDouble(byte value);
+ double decodeDouble(short value);
+ double decodeDouble(int value);
+ double decodeDouble(long value);
+ double decodeDouble(float value);
+ double decodeDouble(double value);
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
new file mode 100644
index 0000000..c843b55
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Subclass of this codec depends on statistics of the column page (adaptive) to perform apply
+ * and decode, it also employs compressor to compress the encoded data
+ */
+public abstract class AdaptiveCompressionCodec implements ColumnPageCodec {
+
+ // TODO: cache and reuse the same encoder since snappy is thread-safe
+
+ // compressor that can be used by subclass
+ protected final Compressor compressor;
+
+ // statistics of this page, can be used by subclass
+ protected final ColumnPageStatsVO stats;
+
+ // the data type used for storage
+ protected final DataType targetDataType;
+
+ // the data type specified in schema
+ protected final DataType srcDataType;
+
+ protected AdaptiveCompressionCodec(DataType srcDataType, DataType targetDataType,
+ ColumnPageStatsVO stats, Compressor compressor) {
+ this.stats = stats;
+ this.srcDataType = srcDataType;
+ this.targetDataType = targetDataType;
+ this.compressor = compressor;
+ }
+
+ public abstract String getName();
+
+ public abstract byte[] encode(ColumnPage input);
+
+ public abstract ColumnPage decode(byte[] input, int offset, int length);
+
+ @Override
+ public String toString() {
+ return String.format("%s[src type: %s, target type: %s, stats(%s)]",
+ getClass().getName(), srcDataType, targetDataType, stats);
+ }
+
+ protected String debugInfo() {
+ return this.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
new file mode 100644
index 0000000..f768a14
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Codec for integer (byte, short, int, long) data type page.
+ * This codec will do type casting on page data to make storage minimum.
+ */
+class AdaptiveIntegerCodec extends AdaptiveCompressionCodec {
+
+ private ColumnPage encodedPage;
+
+ public static ColumnPageCodec newInstance(DataType srcDataType, DataType targetDataType,
+ ColumnPageStatsVO stats, Compressor compressor) {
+ return new AdaptiveIntegerCodec(srcDataType, targetDataType, stats, compressor);
+ }
+
+ private AdaptiveIntegerCodec(DataType srcDataType, DataType targetDataType,
+ ColumnPageStatsVO stats, Compressor compressor) {
+ super(srcDataType, targetDataType, stats, compressor);
+ }
+
+ @Override
+ public String getName() {
+ return "AdaptiveIntegerCodec";
+ }
+
+ @Override
+ public byte[] encode(ColumnPage input) {
+ if (srcDataType.equals(targetDataType)) {
+ return input.compress(compressor);
+ } else {
+ encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+ input.encode(codec);
+ return encodedPage.compress(compressor);
+ }
+ }
+
+ @Override
+ public ColumnPage decode(byte[] input, int offset, int length) {
+ if (srcDataType.equals(targetDataType)) {
+ return ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+ } else {
+ ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+ return LazyColumnPage.newPage(page, codec);
+ }
+ }
+
+ // encoded value = (type cast page value to target data type)
+ private PrimitiveCodec codec = new PrimitiveCodec() {
+ @Override
+ public void encode(int rowId, byte value) {
+ switch (targetDataType) {
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public void encode(int rowId, short value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte) value);
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public void encode(int rowId, int value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte) value);
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short) value);
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public void encode(int rowId, long value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte) value);
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short) value);
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int) value);
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public void encode(int rowId, float value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte) value);
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short) value);
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int) value);
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public void encode(int rowId, double value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte) value);
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short) value);
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int) value);
+ break;
+ case LONG:
+ encodedPage.putLong(rowId, (long) value);
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public long decodeLong(byte value) {
+ return value;
+ }
+
+ @Override
+ public long decodeLong(short value) {
+ return value;
+ }
+
+ @Override
+ public long decodeLong(int value) {
+ return value;
+ }
+
+ @Override
+ public double decodeDouble(byte value) {
+ return value;
+ }
+
+ @Override
+ public double decodeDouble(short value) {
+ return value;
+ }
+
+ @Override
+ public double decodeDouble(int value) {
+ return value;
+ }
+
+ @Override
+ public double decodeDouble(long value) {
+ return value;
+ }
+
+ @Override
+ public double decodeDouble(float value) {
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public double decodeDouble(double value) {
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ };
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnCodec.java
deleted file mode 100644
index e870ad6..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnCodec.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-
-/**
- * Codec for a column page data, implementation should not keep state across pages,
- * caller will use the same object to encode multiple pages.
- */
-public interface ColumnCodec {
-
- /** Codec name will be stored in BlockletHeader (DataChunk3) */
- String getName();
-
- byte[] encode(ColumnPage columnPage);
-
- ColumnPage decode(byte[] encoded);
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
new file mode 100644
index 0000000..21913be
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+
+/**
+ * Codec for a column page data, implementation should not keep state across pages,
+ * caller may use the same object to apply multiple pages.
+ */
+public interface ColumnPageCodec {
+
+ /**
+ * Codec name will be stored in BlockletHeader (DataChunk3)
+ */
+ String getName();
+
+ /**
+ * apply a column page and output encoded byte array
+ * @param input column page to apply
+ * @return encoded data
+ */
+ byte[] encode(ColumnPage input);
+
+ /**
+ * decode byte array from offset to a column page
+ * @param input encoded byte array
+ * @param offset startoffset of the input to decode
+ * @param length length of data to decode
+ * @return decoded data
+ */
+ ColumnPage decode(byte[] input, int offset, int length);
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
new file mode 100644
index 0000000..4568503
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Codec for variable length data type (decimal, string).
+ * This codec will flatten the variable length data before applying compression.
+ */
+public class CompressionCodec implements ColumnPageCodec {
+
+ private Compressor compressor;
+ private DataType dataType;
+
+ protected CompressionCodec(DataType dataType, Compressor compressor) {
+ this.compressor = compressor;
+ this.dataType = dataType;
+ }
+
+ public static CompressionCodec newInstance(DataType dataType, Compressor compressor) {
+ return new CompressionCodec(dataType, compressor);
+ }
+
+ @Override
+ public String getName() {
+ return "CompressionCodec";
+ }
+
+ @Override
+ public byte[] encode(ColumnPage input) {
+ return input.compress(compressor);
+ }
+
+ @Override
+ public ColumnPage decode(byte[] input, int offset, int length) {
+ return ColumnPage.decompress(compressor, dataType, input, offset, length);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
new file mode 100644
index 0000000..3dfcf94
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Default strategy will select encoding base on column page data type and statistics
+ */
+public class DefaultEncodingStrategy extends EncodingStrategy {
+
+ private static final Compressor compressor = CompressorFactory.getInstance().getCompressor();
+
+ // fit the long input value into minimum data type
+ public static DataType fitDataType(long value) {
+ if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
+ return DataType.BYTE;
+ } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
+ return DataType.SHORT;
+ } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
+ return DataType.INT;
+ } else {
+ return DataType.LONG;
+ }
+ }
+
+ protected DataType fitDataType(long max, long min) {
+ if (max <= Byte.MAX_VALUE && min >= Byte.MIN_VALUE) {
+ return DataType.BYTE;
+ } else if (max <= Short.MAX_VALUE && min >= Short.MIN_VALUE) {
+ return DataType.SHORT;
+ } else if (max <= Integer.MAX_VALUE && min >= Integer.MIN_VALUE) {
+ return DataType.INT;
+ } else {
+ return DataType.LONG;
+ }
+ }
+
+ // fit the input double value into minimum data type
+ protected DataType fitDataType(double value, int decimal) {
+ DataType dataType = DataType.DOUBLE;
+ if (decimal == 0) {
+ if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
+ dataType = DataType.BYTE;
+ } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
+ dataType = DataType.SHORT;
+ } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
+ dataType = DataType.INT;
+ } else if (value <= Long.MAX_VALUE && value >= Long.MIN_VALUE) {
+ dataType = DataType.LONG;
+ }
+ }
+ return dataType;
+ }
+
+ // choose between adaptive encoder or delta adaptive encoder, based on whose target data type
+ // size is smaller
+ @Override
+ ColumnPageCodec newCodecForIntegerType(ColumnPageStatsVO stats) {
+ DataType adaptiveDataType = fitDataType((long)stats.getMax(), (long)stats.getMin());
+ DataType deltaDataType;
+
+ // TODO: this handling is for data compatibility, change to Override check when implementing
+ // encoding override feature
+ if (adaptiveDataType == DataType.LONG) {
+ deltaDataType = DataType.LONG;
+ } else {
+ deltaDataType = fitDataType((long) stats.getMax() - (long) stats.getMin());
+ }
+ if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
+ // choose adaptive encoding
+ return AdaptiveIntegerCodec.newInstance(
+ stats.getDataType(), adaptiveDataType, stats, compressor);
+ } else {
+ // choose delta adaptive encoding
+ return DeltaIntegerCodec.newInstance(stats.getDataType(), deltaDataType, stats, compressor);
+ }
+ }
+
+ // choose between upscale adaptive encoder or upscale delta adaptive encoder,
+ // based on whose target data type size is smaller
+ @Override
+ ColumnPageCodec newCodecForFloatingType(ColumnPageStatsVO stats) {
+ DataType srcDataType = stats.getDataType();
+ double maxValue = (double) stats.getMax();
+ double minValue = (double) stats.getMin();
+ int decimal = stats.getDecimal();
+
+ //Here we should use the Max abs as max to getDatatype, let's say -1 and -10000000, -1 is max,
+ //but we can't use -1 to getDatatype, we should use -10000000.
+ double absMaxValue = Math.abs(maxValue) >= Math.abs(minValue) ? maxValue : minValue;
+
+ if (decimal == 0) {
+ // short, int, long
+ DataType adaptiveDataType = fitDataType(absMaxValue, decimal);
+ DataType deltaDataType = fitDataType(maxValue - minValue, decimal);
+ if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
+ // choose adaptive encoding
+ return AdaptiveIntegerCodec.newInstance(srcDataType, adaptiveDataType, stats, compressor);
+ } else {
+ // choose delta adaptive encoding
+ return DeltaIntegerCodec.newInstance(srcDataType, deltaDataType, stats, compressor);
+ }
+ } else {
+ // double
+ DataType upscaleAdaptiveDataType = fitDataType(Math.pow(10, decimal) * absMaxValue, 0);
+ DataType upscaleDiffDataType = fitDataType(Math.pow(10, decimal) * (maxValue - minValue), 0);
+ if (upscaleAdaptiveDataType.getSizeInBytes() <= upscaleDiffDataType.getSizeInBytes()) {
+ return UpscaleFloatingCodec.newInstance(
+ srcDataType, upscaleAdaptiveDataType, stats, compressor);
+ } else {
+ return UpscaleDeltaFloatingCodec.newInstance(
+ srcDataType, upscaleDiffDataType, stats, compressor);
+ }
+ }
+ }
+
+ // for decimal, currently it is a very basic implementation
+ @Override
+ ColumnPageCodec newCodecForDecimalType(ColumnPageStatsVO stats) {
+ return CompressionCodec.newInstance(stats.getDataType(), compressor);
+ }
+
+ @Override
+ ColumnPageCodec newCodecForByteArrayType(ColumnPageStatsVO stats) {
+ return CompressionCodec.newInstance(stats.getDataType(), compressor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
new file mode 100644
index 0000000..e8e7779
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Codec for integer (byte, short, int, long) data type page.
+ * This codec will calculate delta of page max value and page value,
+ * and do type casting of the diff to make storage minimum.
+ */
+public class DeltaIntegerCodec extends AdaptiveCompressionCodec {
+
+ private ColumnPage encodedPage;
+
+ private long max;
+
+ public static DeltaIntegerCodec newInstance(DataType srcDataType, DataType targetDataType,
+ ColumnPageStatsVO stats, Compressor compressor) {
+ return new DeltaIntegerCodec(srcDataType, targetDataType, stats, compressor);
+ }
+
+ private DeltaIntegerCodec(DataType srcDataType, DataType targetDataType,
+ ColumnPageStatsVO stats, Compressor compressor) {
+ super(srcDataType, targetDataType, stats, compressor);
+ switch (srcDataType) {
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ max = (long) stats.getMax();
+ break;
+ case FLOAT:
+ case DOUBLE:
+ max = (long)((double) stats.getMax());
+ break;
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "DeltaIntegerCodec";
+ }
+
+ @Override
+ public byte[] encode(ColumnPage input) {
+ encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+ input.encode(codec);
+ return encodedPage.compress(compressor);
+ }
+
+ @Override
+ public ColumnPage decode(byte[] input, int offset, int length) {
+ ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+ return LazyColumnPage.newPage(page, codec);
+ }
+
+ // encoded value = (max value of page) - (page value)
+ private PrimitiveCodec codec = new PrimitiveCodec() {
+ @Override
+ public void encode(int rowId, byte value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(max - value));
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public void encode(int rowId, short value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(max - value));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(max - value));
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public void encode(int rowId, int value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(max - value));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(max - value));
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int)(max - value));
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public void encode(int rowId, long value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(max - value));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(max - value));
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int)(max - value));
+ break;
+ case LONG:
+ encodedPage.putLong(rowId, max - value);
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public void encode(int rowId, float value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(max - value));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(max - value));
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int)(max - value));
+ break;
+ case LONG:
+ encodedPage.putLong(rowId, (long)(max - value));
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public void encode(int rowId, double value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(max - value));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(max - value));
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int)(max - value));
+ break;
+ case LONG:
+ encodedPage.putLong(rowId, (long)(max - value));
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public long decodeLong(byte value) {
+ return max - value;
+ }
+
+ @Override
+ public long decodeLong(short value) {
+ return max - value;
+ }
+
+ @Override
+ public long decodeLong(int value) {
+ return max - value;
+ }
+
+ @Override
+ public double decodeDouble(byte value) {
+ return max - value;
+ }
+
+ @Override
+ public double decodeDouble(short value) {
+ return max - value;
+ }
+
+ @Override
+ public double decodeDouble(int value) {
+ return max - value;
+ }
+
+ @Override
+ public double decodeDouble(long value) {
+ return max - value;
+ }
+
+ @Override
+ public double decodeDouble(float value) {
+ // this codec is for integer type only
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public double decodeDouble(double value) {
+ // this codec is for integer type only
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DummyCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DummyCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DummyCodec.java
deleted file mode 100644
index 0dd23c7..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DummyCodec.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-
-public class DummyCodec implements ColumnCodec {
- @Override
- public String getName() {
- return "DummyCodec";
- }
-
- @Override
- public byte[] encode(ColumnPage columnPage) {
- return null;
- }
-
- @Override
- public ColumnPage decode(byte[] encoded) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedData.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedData.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedData.java
new file mode 100644
index 0000000..0d1b2e4
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodedData.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+
+// result result of all columns
+public class EncodedData {
+ // dimension data that include rowid (index)
+ public IndexStorage[] indexStorages;
+
+ // encoded and compressed dimension data
+ public byte[][] dimensions;
+
+ // encoded and compressed measure data
+ public byte[][] measures;
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
new file mode 100644
index 0000000..49fb625
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingStrategy.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+
+/**
+ * Base class for encoding strategy implementation.
+ */
+public abstract class EncodingStrategy {
+
+ /**
+ * create codec based on the page data type and statistics
+ */
+ public ColumnPageCodec createCodec(ColumnPageStatsVO stats) {
+ switch (stats.getDataType()) {
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ return newCodecForIntegerType(stats);
+ case FLOAT:
+ case DOUBLE:
+ return newCodecForFloatingType(stats);
+ case DECIMAL:
+ return newCodecForDecimalType(stats);
+ case BYTE_ARRAY:
+ // no dictionary dimension
+ return newCodecForByteArrayType(stats);
+ default:
+ throw new RuntimeException("unsupported data type: " + stats.getDataType());
+ }
+ }
+
+ /**
+ * create codec based on the page data type and statistics contained by ValueEncoderMeta
+ */
+ public ColumnPageCodec createCodec(ValueEncoderMeta meta) {
+ ColumnPageStatsVO stats = ColumnPageStatsVO.copyFrom(meta);
+ return createCodec(stats);
+ }
+
+ // for byte, short, int, long
+ abstract ColumnPageCodec newCodecForIntegerType(ColumnPageStatsVO stats);
+
+ // for float, double
+ abstract ColumnPageCodec newCodecForFloatingType(ColumnPageStatsVO stats);
+
+ // for decimal
+ abstract ColumnPageCodec newCodecForDecimalType(ColumnPageStatsVO stats);
+
+ // for byte array
+ abstract ColumnPageCodec newCodecForByteArrayType(ColumnPageStatsVO stats);
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java
new file mode 100644
index 0000000..c58a96f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleDeltaFloatingCodec.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Codec for floating point (float, double) data type page.
+ * This codec will upscale (multiple page value by decimal) to integer value,
+ * and do type casting to make storage minimum.
+ */
+public class UpscaleDeltaFloatingCodec extends AdaptiveCompressionCodec {
+
+ private ColumnPage encodedPage;
+
+ private BigDecimal max;
+ private double factor;
+
+ public static ColumnPageCodec newInstance(DataType srcDataType, DataType targetDataType,
+ ColumnPageStatsVO stats, Compressor compressor) {
+ return new UpscaleDeltaFloatingCodec(srcDataType, targetDataType, stats, compressor);
+ }
+
+ private UpscaleDeltaFloatingCodec(DataType srcDataType, DataType targetDataType,
+ ColumnPageStatsVO stats, Compressor compressor) {
+ super(srcDataType, targetDataType, stats, compressor);
+ this.max = BigDecimal.valueOf((double) stats.getMax());
+ this.factor = Math.pow(10, stats.getDecimal());
+ }
+
+ @Override
+ public String getName() {
+ return "UpscaleDeltaFloatingCodec";
+ }
+
+ @Override
+ public byte[] encode(ColumnPage input) {
+ encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+ input.encode(codec);
+ return encodedPage.compress(compressor);
+ }
+
+ @Override
+ public ColumnPage decode(byte[] input, int offset, int length) {
+ ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+ return LazyColumnPage.newPage(page, codec);
+ }
+
+ // encoded value = (10 power of decimal) * ((max value of page) - (page value))
+ private PrimitiveCodec codec = new PrimitiveCodec() {
+ @Override
+ public void encode(int rowId, byte value) {
+ // this codec is for floating point type only
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public void encode(int rowId, short value) {
+ // this codec is for floating point type only
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public void encode(int rowId, int value) {
+ // this codec is for floating point type only
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public void encode(int rowId, long value) {
+ // this codec is for floating point type only
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public void encode(int rowId, float value) {
+ double diff = max.subtract(BigDecimal.valueOf(value)).doubleValue();
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(Math.round(factor * diff)));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(Math.round(factor * diff)));
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int)(Math.round(factor * diff)));
+ break;
+ case LONG:
+ encodedPage.putLong(rowId, (long)(Math.round(factor * diff)));
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public void encode(int rowId, double value) {
+ double diff = max.subtract(BigDecimal.valueOf(value)).doubleValue();
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(Math.round(factor * diff)));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(Math.round(factor * diff)));
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int)(Math.round(factor * diff)));
+ break;
+ case LONG:
+ encodedPage.putLong(rowId, (long)(Math.round(factor * diff)));
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public long decodeLong(byte value) {
+ // this codec is for floating point type only
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public long decodeLong(short value) {
+ // this codec is for floating point type only
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public long decodeLong(int value) {
+ // this codec is for floating point type only
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public double decodeDouble(byte value) {
+ return max.subtract(BigDecimal.valueOf(value / factor)).doubleValue();
+ }
+
+ @Override
+ public double decodeDouble(short value) {
+ return max.subtract(BigDecimal.valueOf(value / factor)).doubleValue();
+ }
+
+ @Override
+ public double decodeDouble(int value) {
+ return max.subtract(BigDecimal.valueOf(value / factor)).doubleValue();
+ }
+
+ @Override
+ public double decodeDouble(long value) {
+ return max.subtract(BigDecimal.valueOf(value / factor)).doubleValue();
+ }
+
+ @Override
+ public double decodeDouble(float value) {
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public double decodeDouble(double value) {
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ };
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
new file mode 100644
index 0000000..4f5ee13
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * Codec for floating point (float, double) data type page.
+ * This codec will upscale the diff from page max value to integer value,
+ * and do type casting to make storage minimum.
+ */
+public class UpscaleFloatingCodec extends AdaptiveCompressionCodec {
+
+ private ColumnPage encodedPage;
+ private double factor;
+
+ public static ColumnPageCodec newInstance(DataType srcDataType, DataType targetDataType,
+ ColumnPageStatsVO stats, Compressor compressor) {
+ return new UpscaleFloatingCodec(srcDataType, targetDataType, stats, compressor);
+ }
+
+ private UpscaleFloatingCodec(DataType srcDataType, DataType targetDataType,
+ ColumnPageStatsVO stats, Compressor compressor) {
+ super(srcDataType, targetDataType, stats, compressor);
+ this.factor = Math.pow(10, stats.getDecimal());
+ }
+
+ @Override public String getName() {
+ return "UpscaleFloatingCodec";
+ }
+
+ @Override
+ public byte[] encode(ColumnPage input) {
+ encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+ input.encode(codec);
+ return encodedPage.compress(compressor);
+ }
+
+
+ @Override
+ public ColumnPage decode(byte[] input, int offset, int length) {
+ ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
+ return LazyColumnPage.newPage(page, codec);
+ }
+
+ // encoded value = (10 power of decimal) * (page value)
+ private PrimitiveCodec codec = new PrimitiveCodec() {
+ @Override
+ public void encode(int rowId, byte value) {
+ // this codec is for floating point type only
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public void encode(int rowId, short value) {
+ // this codec is for floating point type only
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public void encode(int rowId, int value) {
+ // this codec is for floating point type only
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public void encode(int rowId, long value) {
+ // this codec is for floating point type only
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public void encode(int rowId, float value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(Math.round(factor * value)));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(Math.round(factor * value)));
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int)(Math.round(factor * value)));
+ break;
+ case LONG:
+ encodedPage.putLong(rowId, (long)(Math.round(factor * value)));
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public void encode(int rowId, double value) {
+ switch (targetDataType) {
+ case BYTE:
+ encodedPage.putByte(rowId, (byte)(Math.round(factor * value)));
+ break;
+ case SHORT:
+ encodedPage.putShort(rowId, (short)(Math.round(factor * value)));
+ break;
+ case INT:
+ encodedPage.putInt(rowId, (int)(Math.round(factor * value)));
+ break;
+ case LONG:
+ encodedPage.putLong(rowId, (long)(Math.round(factor * value)));
+ break;
+ case DOUBLE:
+ encodedPage.putDouble(rowId, (Math.round(factor * value)));
+ break;
+ default:
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ }
+
+ @Override
+ public long decodeLong(byte value) {
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public long decodeLong(short value) {
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public long decodeLong(int value) {
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public double decodeDouble(byte value) {
+ return value / factor;
+ }
+
+ @Override
+ public double decodeDouble(short value) {
+ return value / factor;
+ }
+
+ @Override
+ public double decodeDouble(int value) {
+ return value / factor;
+ }
+
+ @Override
+ public double decodeDouble(long value) {
+ return value / factor;
+ }
+
+ @Override
+ public double decodeDouble(float value) {
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+
+ @Override
+ public double decodeDouble(double value) {
+ throw new RuntimeException("internal error: " + debugInfo());
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatistics.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatistics.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatistics.java
deleted file mode 100644
index 960a530..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatistics.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.statistics;
-
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.DataTypeUtil;
-
-/** statics for one column page */
-public class ColumnPageStatistics {
- private DataType dataType;
-
- /** min and max value of the measures */
- private Object min, max;
-
- /**
- * the unique value is the non-exist value in the row,
- * and will be used as storage key for null values of measures
- */
- private Object nonExistValue;
-
- /** decimal count of the measures */
- private int decimal;
-
- public ColumnPageStatistics(DataType dataType) {
- this.dataType = dataType;
- switch (dataType) {
- case SHORT:
- case INT:
- case LONG:
- max = Long.MIN_VALUE;
- min = Long.MAX_VALUE;
- nonExistValue = Long.MIN_VALUE;
- break;
- case DOUBLE:
- max = Double.MIN_VALUE;
- min = Double.MAX_VALUE;
- nonExistValue = Double.MIN_VALUE;
- break;
- case DECIMAL:
- max = new BigDecimal(Double.MIN_VALUE);
- min = new BigDecimal(Double.MAX_VALUE);
- nonExistValue = new BigDecimal(Double.MIN_VALUE);
- break;
- }
- decimal = 0;
- }
-
- /**
- * update the statistics for the input row
- */
- public void update(Object value) {
- switch (dataType) {
- case SHORT:
- max = ((long) max > ((Short) value).longValue()) ? max : ((Short) value).longValue();
- min = ((long) min < ((Short) value).longValue()) ? min : ((Short) value).longValue();
- nonExistValue = (long) min - 1;
- break;
- case INT:
- max = ((long) max > ((Integer) value).longValue()) ? max : ((Integer) value).longValue();
- min = ((long) min < ((Integer) value).longValue()) ? min : ((Integer) value).longValue();
- nonExistValue = (long) min - 1;
- break;
- case LONG:
- max = ((long) max > (long) value) ? max : value;
- min = ((long) min < (long) value) ? min : value;
- nonExistValue = (long) min - 1;
- break;
- case DOUBLE:
- max = ((double) max > (double) value) ? max : value;
- min = ((double) min < (double) value) ? min : value;
- int num = getDecimalCount((double) value);
- decimal = decimal > num ? decimal : num;
- nonExistValue = (double) min - 1;
- break;
- case DECIMAL:
- BigDecimal decimalValue = DataTypeUtil.byteToBigDecimal((byte[]) value);
- decimal = decimalValue.scale();
- BigDecimal val = (BigDecimal) min;
- nonExistValue = (val.subtract(new BigDecimal(1.0)));
- break;
- case ARRAY:
- case STRUCT:
- // for complex type column, writer is not going to use stats, so, do nothing
- }
- }
-
- /**
- * return no of digit after decimal
- */
- private int getDecimalCount(double value) {
- String strValue = BigDecimal.valueOf(Math.abs(value)).toPlainString();
- int integerPlaces = strValue.indexOf('.');
- int decimalPlaces = 0;
- if (-1 != integerPlaces) {
- decimalPlaces = strValue.length() - integerPlaces - 1;
- }
- return decimalPlaces;
- }
-
- /**
- * return min value as byte array
- */
- public byte[] minBytes() {
- return getValueAsBytes(getMin());
- }
-
- /**
- * return max value as byte array
- */
- public byte[] maxBytes() {
- return getValueAsBytes(getMax());
- }
-
- /**
- * convert value to byte array
- */
- private byte[] getValueAsBytes(Object value) {
- ByteBuffer b;
- switch (dataType) {
- case DOUBLE:
- b = ByteBuffer.allocate(8);
- b.putDouble((Double) value);
- b.flip();
- return b.array();
- case LONG:
- case INT:
- case SHORT:
- b = ByteBuffer.allocate(8);
- b.putLong((Long) value);
- b.flip();
- return b.array();
- case DECIMAL:
- return DataTypeUtil.bigDecimalToByte((BigDecimal) value);
- default:
- throw new IllegalArgumentException("Invalid data type");
- }
- }
-
- public Object getMin() {
- return min;
- }
-
- public Object getMax() {
- return max;
- }
-
- public Object nonExistValue() {
- return nonExistValue;
- }
-
- public int getDecimal() {
- return decimal;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
new file mode 100644
index 0000000..a5b3148
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatsVO.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.statistics;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+/** statics for one column page */
+public class ColumnPageStatsVO {
+ private DataType dataType;
+
+ /** min and max value of the measures */
+ private Object min, max;
+
+ /**
+ * the unique value is the non-exist value in the row,
+ * and will be used as storage key for null values of measures
+ */
+ private Object nonExistValue;
+
+ /** decimal count of the measures */
+ private int decimal;
+
+ public ColumnPageStatsVO(DataType dataType) {
+ this.dataType = dataType;
+ switch (dataType) {
+ case SHORT:
+ case INT:
+ case LONG:
+ max = Long.MIN_VALUE;
+ min = Long.MAX_VALUE;
+ nonExistValue = Long.MIN_VALUE;
+ break;
+ case DOUBLE:
+ max = Double.MIN_VALUE;
+ min = Double.MAX_VALUE;
+ nonExistValue = Double.MIN_VALUE;
+ break;
+ case DECIMAL:
+ max = new BigDecimal(Double.MIN_VALUE);
+ min = new BigDecimal(Double.MAX_VALUE);
+ nonExistValue = new BigDecimal(Double.MIN_VALUE);
+ break;
+ }
+ decimal = 0;
+ }
+
+ public static ColumnPageStatsVO copyFrom(ValueEncoderMeta meta) {
+ ColumnPageStatsVO instance = new ColumnPageStatsVO(meta.getType());
+ instance.min = meta.getMinValue();
+ instance.max = meta.getMaxValue();
+ instance.decimal = meta.getDecimal();
+ instance.nonExistValue = meta.getUniqueValue();
+ return instance;
+ }
+
+ /**
+ * update the statistics for the input row
+ */
+ public void update(Object value) {
+ switch (dataType) {
+ case SHORT:
+ max = ((long) max > ((Short) value).longValue()) ? max : ((Short) value).longValue();
+ min = ((long) min < ((Short) value).longValue()) ? min : ((Short) value).longValue();
+ nonExistValue = (long) min - 1;
+ break;
+ case INT:
+ max = ((long) max > ((Integer) value).longValue()) ? max : ((Integer) value).longValue();
+ min = ((long) min < ((Integer) value).longValue()) ? min : ((Integer) value).longValue();
+ nonExistValue = (long) min - 1;
+ break;
+ case LONG:
+ max = ((long) max > (long) value) ? max : value;
+ min = ((long) min < (long) value) ? min : value;
+ nonExistValue = (long) min - 1;
+ break;
+ case DOUBLE:
+ max = ((double) max > (double) value) ? max : value;
+ min = ((double) min < (double) value) ? min : value;
+ int num = getDecimalCount((double) value);
+ decimal = decimal > num ? decimal : num;
+ nonExistValue = (double) min - 1;
+ break;
+ case DECIMAL:
+ BigDecimal decimalValue = DataTypeUtil.byteToBigDecimal((byte[]) value);
+ decimal = decimalValue.scale();
+ BigDecimal val = (BigDecimal) min;
+ nonExistValue = (val.subtract(new BigDecimal(1.0)));
+ break;
+ case ARRAY:
+ case STRUCT:
+ // for complex type column, writer is not going to use stats, so, do nothing
+ }
+ }
+
+ public void updateNull() {
+ switch (dataType) {
+ case SHORT:
+ max = ((long) max > 0) ? max : 0L;
+ min = ((long) min < 0) ? min : 0L;
+ nonExistValue = (long) min - 1;
+ break;
+ case INT:
+ max = ((long) max > 0) ? max : 0L;
+ min = ((long) min < 0) ? min : 0L;
+ nonExistValue = (long) min - 1;
+ break;
+ case LONG:
+ max = ((long) max > 0) ? max : 0L;
+ min = ((long) min < 0) ? min : 0L;
+ nonExistValue = (long) min - 1;
+ break;
+ case DOUBLE:
+ max = ((double) max > 0d) ? max : 0d;
+ min = ((double) min < 0d) ? min : 0d;
+ int num = getDecimalCount(0d);
+ decimal = decimal > num ? decimal : num;
+ nonExistValue = (double) min - 1;
+ break;
+ case DECIMAL:
+ BigDecimal decimalValue = BigDecimal.ZERO;
+ decimal = decimalValue.scale();
+ BigDecimal val = (BigDecimal) min;
+ nonExistValue = (val.subtract(new BigDecimal(1.0)));
+ break;
+ case ARRAY:
+ case STRUCT:
+ // for complex type column, writer is not going to use stats, so, do nothing
+ }
+ }
+
+ /**
+ * return no of digit after decimal
+ */
+ private int getDecimalCount(double value) {
+ return BigDecimal.valueOf(value).scale();
+ }
+
+ /**
+ * return min value as byte array
+ */
+ public byte[] minBytes() {
+ return getValueAsBytes(getMin());
+ }
+
+ /**
+ * return max value as byte array
+ */
+ public byte[] maxBytes() {
+ return getValueAsBytes(getMax());
+ }
+
+ /**
+ * convert value to byte array
+ */
+ private byte[] getValueAsBytes(Object value) {
+ ByteBuffer b;
+ switch (dataType) {
+ case DOUBLE:
+ b = ByteBuffer.allocate(8);
+ b.putDouble((Double) value);
+ b.flip();
+ return b.array();
+ case LONG:
+ case INT:
+ case SHORT:
+ b = ByteBuffer.allocate(8);
+ b.putLong((Long) value);
+ b.flip();
+ return b.array();
+ case DECIMAL:
+ return DataTypeUtil.bigDecimalToByte((BigDecimal) value);
+ default:
+ throw new IllegalArgumentException("Invalid data type");
+ }
+ }
+
+ public Object getMin() {
+ return min;
+ }
+
+ public Object getMax() {
+ return max;
+ }
+
+ public Object nonExistValue() {
+ return nonExistValue;
+ }
+
+ public int getDecimal() {
+ return decimal;
+ }
+
+ public DataType getDataType() {
+ return dataType;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("min: %s, max: %s, decimal: %s ", min, max, decimal);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/MeasurePageStatsVO.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/MeasurePageStatsVO.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/MeasurePageStatsVO.java
index 33440f5..865ae04 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/MeasurePageStatsVO.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/MeasurePageStatsVO.java
@@ -40,7 +40,7 @@ public class MeasurePageStatsVO {
dataType = new DataType[measurePages.length];
selectedDataType = new byte[measurePages.length];
for (int i = 0; i < measurePages.length; i++) {
- ColumnPageStatistics stats = measurePages[i].getStatistics();
+ ColumnPageStatsVO stats = measurePages[i].getStatistics();
min[i] = stats.getMin();
max[i] = stats.getMax();
nonExistValue[i] = stats.nonExistValue();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
index 107c52f..08dd800 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
@@ -67,7 +67,7 @@ public class WriteStepRowUtil {
return new CarbonRow(converted);
}
- private static int[] getDictDimension(CarbonRow row) {
+ public static int[] getDictDimension(CarbonRow row) {
return (int[]) row.getData()[DICTIONARY_DIMENSION];
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
new file mode 100644
index 0000000..5519f2d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/memory/IntPointerBuffer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.memory;
+
+/**
+ * Holds the pointers for rows.
+ */
+public class IntPointerBuffer {
+
+ private int length;
+
+ private int actualSize;
+
+ private int[] pointerBlock;
+
+ private MemoryBlock baseBlock;
+
+ private MemoryBlock pointerMemoryBlock;
+
+ public IntPointerBuffer(MemoryBlock baseBlock) {
+ // TODO can be configurable, it is initial size and it can grow automatically.
+ this.length = 100000;
+ pointerBlock = new int[length];
+ this.baseBlock = baseBlock;
+ }
+
+ public IntPointerBuffer(int length) {
+ this.length = length;
+ pointerBlock = new int[length];
+ }
+
+ public void set(int index, int value) {
+ pointerBlock[index] = value;
+ }
+
+ public void set(int value) {
+ ensureMemory();
+ pointerBlock[actualSize] = value;
+ actualSize++;
+ }
+
+ /**
+ * Returns the value at position {@code index}.
+ */
+ public int get(int index) {
+ assert index >= 0 : "index (" + index + ") should >= 0";
+ assert index < length : "index (" + index + ") should < length (" + length + ")";
+ if (pointerBlock == null) {
+ return CarbonUnsafe.unsafe.getInt(pointerMemoryBlock.getBaseObject(),
+ pointerMemoryBlock.getBaseOffset() + (index * 4));
+ }
+ return pointerBlock[index];
+ }
+
+ public void loadToUnsafe() throws MemoryException {
+ pointerMemoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(pointerBlock.length * 4);
+ for (int i = 0; i < pointerBlock.length; i++) {
+ CarbonUnsafe.unsafe
+ .putInt(pointerMemoryBlock.getBaseObject(), pointerMemoryBlock.getBaseOffset() + i * 4,
+ pointerBlock[i]);
+ }
+ pointerBlock = null;
+ }
+
+ public int getActualSize() {
+ return actualSize;
+ }
+
+ public MemoryBlock getBaseBlock() {
+ return baseBlock;
+ }
+
+ public int[] getPointerBlock() {
+ return pointerBlock;
+ }
+
+ private void ensureMemory() {
+ if (actualSize >= length) {
+ // Expand by quarter, may be we can correct the logic later
+ int localLength = length + (int) (length * (0.25));
+ int[] memoryAddress = new int[localLength];
+ System.arraycopy(pointerBlock, 0, memoryAddress, 0, length);
+ pointerBlock = memoryAddress;
+ length = localLength;
+ }
+ }
+
+ public void freeMemory() {
+ pointerBlock = null;
+ if (pointerMemoryBlock != null) {
+ UnsafeMemoryManager.INSTANCE.freeMemory(pointerMemoryBlock);
+ }
+ if (baseBlock != null) {
+ UnsafeMemoryManager.INSTANCE.freeMemory(baseBlock);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/memory/MemoryException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/MemoryException.java b/core/src/main/java/org/apache/carbondata/core/memory/MemoryException.java
new file mode 100644
index 0000000..af8e6e0
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/memory/MemoryException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.memory;
+
+public class MemoryException extends Exception {
+
+ public MemoryException(String message) {
+ super(message);
+ }
+
+ public MemoryException(Exception e) {
+ super(e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
new file mode 100644
index 0000000..132a3fa
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.memory;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * Manages memory for instance.
+ */
+public class UnsafeMemoryManager {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(UnsafeMemoryManager.class.getName());
+
+ static {
+ long size;
+ try {
+ size = Long.parseLong(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
+ CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
+ } catch (Exception e) {
+ size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
+ LOGGER.info("Wrong memory size given, "
+ + "so setting default value to " + size);
+ }
+ if (size < 1024) {
+ size = 1024;
+ LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, "
+ + "so setting default value to " + size);
+ }
+
+ boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+ CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
+ long takenSize = size * 1024 * 1024;
+ MemoryAllocator allocator;
+ if (offHeap) {
+ allocator = MemoryAllocator.UNSAFE;
+ } else {
+ long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
+ if (takenSize > maxMemory) {
+ takenSize = maxMemory;
+ }
+ allocator = MemoryAllocator.HEAP;
+ }
+ INSTANCE = new UnsafeMemoryManager(takenSize, allocator);
+ }
+
+ public static final UnsafeMemoryManager INSTANCE;
+
+ private long totalMemory;
+
+ private long memoryUsed;
+
+ private MemoryAllocator allocator;
+
+ private long minimumMemory;
+
+ private UnsafeMemoryManager(long totalMemory, MemoryAllocator allocator) {
+ this.totalMemory = totalMemory;
+ this.allocator = allocator;
+ long numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
+ long sortMemoryChunkSize = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
+ sortMemoryChunkSize = sortMemoryChunkSize * 1024 * 1024;
+ long totalWorkingMemoryForAllThreads = sortMemoryChunkSize * numberOfCores;
+ if (totalWorkingMemoryForAllThreads >= totalMemory) {
+ throw new RuntimeException("Working memory should be less than total memory configured, "
+ + "so either reduce the loading threads or increase the memory size. "
+ + "(Number of threads * number of threads) should be less than total unsafe memory");
+ }
+ minimumMemory = totalWorkingMemoryForAllThreads;
+ LOGGER.info("Memory manager is created with size " + totalMemory + " with " + allocator
+ + " and minimum reserve memory " + minimumMemory);
+ }
+
+ public synchronized MemoryBlock allocateMemory(long memoryRequested) {
+ if (memoryUsed + memoryRequested <= totalMemory) {
+ MemoryBlock allocate = allocator.allocate(memoryRequested);
+ memoryUsed += allocate.size();
+ LOGGER.info("Memory block is created with size " + allocate.size() +
+ " Total memory used " + memoryUsed + " memory left " + (getAvailableMemory()));
+ return allocate;
+ }
+ return null;
+ }
+
+ public synchronized void freeMemory(MemoryBlock memoryBlock) {
+ allocator.free(memoryBlock);
+ memoryUsed -= memoryBlock.size();
+ memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
+ LOGGER.info(
+ "Memory released, memory used " + memoryUsed + " memory left " + (getAvailableMemory()));
+ }
+
+ public synchronized long getAvailableMemory() {
+ return totalMemory - memoryUsed;
+ }
+
+ public boolean isMemoryAvailable() {
+ return getAvailableMemory() > minimumMemory;
+ }
+
+ public long getUsableMemory() {
+ return totalMemory - minimumMemory;
+ }
+
+ /**
+ * It tries to allocate memory of `size` bytes, keep retry until it allocates successfully.
+ */
+ public static MemoryBlock allocateMemoryWithRetry(long size) throws MemoryException {
+ MemoryBlock baseBlock = null;
+ int tries = 0;
+ while (tries < 100) {
+ baseBlock = INSTANCE.allocateMemory(size);
+ if (baseBlock == null) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ throw new MemoryException(e);
+ }
+ } else {
+ break;
+ }
+ tries++;
+ }
+ if (baseBlock == null) {
+ throw new MemoryException("Not enough memory to create page");
+ }
+ return baseBlock;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
index 204ac1c..b5d175d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
@@ -19,28 +19,35 @@ package org.apache.carbondata.core.metadata.datatype;
public enum DataType {
- STRING(0, "STRING"),
- DATE(1, "DATE"),
- TIMESTAMP(2, "TIMESTAMP"),
- BOOLEAN(1, "BOOLEAN"),
- SHORT(2, "SMALLINT"),
- INT(3, "INT"),
- FLOAT(4, "FLOAT"),
- LONG(5, "BIGINT"),
- DOUBLE(6, "DOUBLE"),
- NULL(7, "NULL"),
- DECIMAL(8, "DECIMAL"),
- ARRAY(9, "ARRAY"),
- STRUCT(10, "STRUCT"),
- MAP(11, "MAP"),
- BYTE(12, "BYTE");
+ STRING(0, "STRING", -1),
+ DATE(1, "DATE", -1),
+ TIMESTAMP(2, "TIMESTAMP", -1),
+ BOOLEAN(1, "BOOLEAN", 1),
+ SHORT(2, "SMALLINT", 2),
+ INT(3, "INT", 4),
+ FLOAT(4, "FLOAT", 4),
+ LONG(5, "BIGINT", 8),
+ DOUBLE(6, "DOUBLE", 8),
+ NULL(7, "NULL", 1),
+ DECIMAL(8, "DECIMAL", -1),
+ ARRAY(9, "ARRAY", -1),
+ STRUCT(10, "STRUCT", -1),
+ MAP(11, "MAP", -1),
+ BYTE(12, "BYTE", 1),
+
+ // internal use only
+ BYTE_ARRAY(13, "BYTE_ARRAY", -1);
private int precedenceOrder;
- private String name ;
+ private String name;
+
+ // size of the value of this data type, negative value means variable length
+ private int sizeInBytes;
- DataType(int value ,String name) {
+ DataType(int value ,String name, int sizeInBytes) {
this.precedenceOrder = value;
this.name = name;
+ this.sizeInBytes = sizeInBytes;
}
public int getPrecedenceOrder() {
@@ -54,4 +61,9 @@ public enum DataType {
public boolean isComplexType() {
return precedenceOrder >= 9 && precedenceOrder <= 11;
}
+
+ public int getSizeInBytes() {
+ return sizeInBytes;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
index 1726fb7..0fb3860 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
@@ -35,7 +35,7 @@ public class CarbonImplicitDimension extends CarbonDimension {
private static final long serialVersionUID = 3648269871656322681L;
/**
- * List of encoding that are chained to encode the data for this column
+ * List of encoding that are chained to apply the data for this column
*/
private List<Encoding> encodingList;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
index eb4bf03..f5b8116 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -63,7 +63,7 @@ public class ColumnSchema implements Serializable {
private boolean isColumnar = true;
/**
- * List of encoding that are chained to encode the data for this column
+ * List of encoding that are chained to apply the data for this column
*/
private List<Encoding> encodingList;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
index 8fa0348..ad17240 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
@@ -96,22 +96,21 @@ public abstract class AbstractScannedResultCollector implements ScannedResultCol
if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
switch (carbonMeasure.getDataType()) {
case SHORT:
- return (short)dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
+ return (short)dataChunk.getColumnPage().getLong(index);
case INT:
- return (int)dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
+ return (int)dataChunk.getColumnPage().getLong(index);
case LONG:
- return dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
+ return dataChunk.getColumnPage().getLong(index);
case DECIMAL:
BigDecimal bigDecimalMsrValue =
- dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
+ dataChunk.getColumnPage().getDecimal(index);
if (null != bigDecimalMsrValue && carbonMeasure.getScale() > bigDecimalMsrValue.scale()) {
bigDecimalMsrValue =
bigDecimalMsrValue.setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP);
}
- return org.apache.spark.sql.types.Decimal
- .apply(bigDecimalMsrValue);
+ return org.apache.spark.sql.types.Decimal.apply(bigDecimalMsrValue);
default:
- return dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index);
+ return dataChunk.getColumnPage().getDouble(index);
}
}
return null;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
index c5b5ade..f139802 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
@@ -56,7 +56,6 @@ public class NotEqualsExpression extends BinaryConditionalExpression {
}
//default implementation if the data types are different for the resultsets
if (elRes.getDataType() != erRes.getDataType()) {
- // result = elRes.getString().equals(erRes.getString());
if (elRes.getDataType().getPrecedenceOrder() < erRes.getDataType().getPrecedenceOrder()) {
val1 = erRes;
val2 = elRes;