You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/07 07:18:40 UTC

carbondata git commit: [CARBONDATA-1537] Added back Adaptive delta encoding for floating type for backward compatibility

Repository: carbondata
Updated Branches:
  refs/heads/master d6967bffc -> 11661eb69


[CARBONDATA-1537] Added back Adaptive delta encoding for floating type for backward compatibility

Currently, backward compatibility is broken because of missing adaptive delta floating encoding which was present in older versions but it is removed in the latest version.
Added back this encoding to keep the compatibility

This closes #1400


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/11661eb6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/11661eb6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/11661eb6

Branch: refs/heads/master
Commit: 11661eb69d8a484747cba30abac88eb4c0cb2125
Parents: d6967bf
Author: Ravindra Pesala <ra...@gmail.com>
Authored: Sun Oct 1 22:53:12 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Tue Nov 7 15:17:50 2017 +0800

----------------------------------------------------------------------
 .../page/encoding/DefaultEncodingFactory.java   |   8 +-
 .../page/encoding/EncodingFactory.java          |  19 +-
 .../adaptive/AdaptiveDeltaFloatingCodec.java    | 216 +++++++++++++++++++
 format/src/main/thrift/schema.thrift            |   1 +
 .../CarbonV1toV3CompatabilityTestCase.scala     |   6 +
 5 files changed, 242 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/11661eb6/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
index 518573d..54467b2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaFloatingCodec;
 import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec;
 import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingCodec;
 import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec;
@@ -281,7 +282,6 @@ public class DefaultEncodingFactory extends EncodingFactory {
     //Here we should use the Max abs as max to getDatatype, let's say -1 and -10000000, -1 is max,
     //but we can't use -1 to getDatatype, we should use -10000000.
     double absMaxValue = Math.max(Math.abs(maxValue), Math.abs(minValue));
-
     if (decimalCount == 0) {
       // short, int, long
       return selectCodecByAlgorithmForIntegral(stats);
@@ -291,7 +291,11 @@ public class DefaultEncodingFactory extends EncodingFactory {
       // double
       long max = (long) (Math.pow(10, decimalCount) * absMaxValue);
       DataType adaptiveDataType = fitLongMinMax(max, 0);
-      if (adaptiveDataType.getSizeInBytes() < DataTypes.DOUBLE.getSizeInBytes()) {
+      DataType deltaDataType = compareMinMaxAndSelectDataType(
+          (long) (Math.pow(10, decimalCount) * (maxValue - minValue)));
+      if (adaptiveDataType.getSizeInBytes() > deltaDataType.getSizeInBytes()) {
+        return new AdaptiveDeltaFloatingCodec(srcDataType, deltaDataType, stats);
+      } else if (adaptiveDataType.getSizeInBytes() < DataTypes.DOUBLE.getSizeInBytes()) {
         return new AdaptiveFloatingCodec(srcDataType, adaptiveDataType, stats);
       } else {
         return new DirectCompressCodec(DataTypes.DOUBLE);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11661eb6/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
index 0f45abb..4a674e3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaFloatingCodec;
 import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveDeltaIntegralCodec;
 import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveFloatingCodec;
 import org.apache.carbondata.core.datastore.page.encoding.adaptive.AdaptiveIntegralCodec;
@@ -40,12 +41,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.Encoding;
 
-import static org.apache.carbondata.format.Encoding.ADAPTIVE_DELTA_INTEGRAL;
-import static org.apache.carbondata.format.Encoding.ADAPTIVE_FLOATING;
-import static org.apache.carbondata.format.Encoding.ADAPTIVE_INTEGRAL;
-import static org.apache.carbondata.format.Encoding.BOOL_BYTE;
-import static org.apache.carbondata.format.Encoding.DIRECT_COMPRESS;
-import static org.apache.carbondata.format.Encoding.RLE_INTEGRAL;
+import static org.apache.carbondata.format.Encoding.*;
 
 /**
  * Base class for encoding factory implementation.
@@ -91,6 +87,12 @@ public abstract class EncodingFactory {
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
       return new AdaptiveFloatingCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(),
           stats).createDecoder(metadata);
+    } else if (encoding == ADAPTIVE_DELTA_FLOATING) {
+      ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+      metadata.readFields(in);
+      SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
+      return new AdaptiveDeltaFloatingCodec(metadata.getSchemaDataType(),
+          metadata.getStoreDataType(), stats).createDecoder(metadata);
     } else if (encoding == RLE_INTEGRAL) {
       RLEEncoderMeta metadata = new RLEEncoderMeta();
       metadata.readFields(in);
@@ -152,6 +154,11 @@ public abstract class EncodingFactory {
             new ColumnPageEncoderMeta(spec, DataType.getDataType(metadata.getType()), stats,
                 compressor);
         return codec.createDecoder(meta);
+      } else if (codec instanceof AdaptiveDeltaFloatingCodec) {
+        AdaptiveDeltaFloatingCodec adaptiveCodec = (AdaptiveDeltaFloatingCodec) codec;
+        ColumnPageEncoderMeta meta =
+            new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+        return codec.createDecoder(meta);
       } else {
         throw new RuntimeException("internal error");
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11661eb6/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
new file mode 100644
index 0000000..1f00feb
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding.adaptive;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.LazyColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.format.Encoding;
+
+/**
+ * Codec for floating point (float, double) data type page.
+ * This codec will calculate delta of page max value and page value and converts to Integer value,
+ * and do type casting of the diff to make storage minimum.
+ */
+public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
+
+  private ColumnPage encodedPage;
+  private Double factor;
+  private long max;
+
+  public static ColumnPageCodec newInstance(DataType srcDataType, DataType targetDataType,
+      SimpleStatsResult stats) {
+    return new AdaptiveDeltaFloatingCodec(srcDataType, targetDataType, stats);
+  }
+
+  public AdaptiveDeltaFloatingCodec(DataType srcDataType, DataType targetDataType,
+      SimpleStatsResult stats) {
+    super(srcDataType, targetDataType, stats);
+    this.factor = Math.pow(10, stats.getDecimalCount());
+    this.max = (long) (Math.pow(10, stats.getDecimalCount()) * (double) stats.getMax());
+  }
+
+  @Override
+  public String getName() {
+    return "AdaptiveDeltaFloatingCodec";
+  }
+
+  @Override
+  public ColumnPageEncoder createEncoder(Map<String, String> parameter) {
+    final Compressor compressor = CompressorFactory.getInstance().getCompressor();
+    return new ColumnPageEncoder() {
+      @Override
+      protected byte[] encodeData(ColumnPage input) throws MemoryException, IOException {
+        if (encodedPage != null) {
+          throw new IllegalStateException("already encoded");
+        }
+        encodedPage = ColumnPage.newPage(input.getColumnSpec(), targetDataType,
+            input.getPageSize());
+        input.convertValue(converter);
+        byte[] result = encodedPage.compress(compressor);
+        encodedPage.freeMemory();
+        return result;
+      }
+
+      @Override
+      protected List<Encoding> getEncodingList() {
+        List<Encoding> encodings = new ArrayList<Encoding>();
+        encodings.add(Encoding.ADAPTIVE_DELTA_FLOATING);
+        return encodings;
+      }
+
+      @Override
+      protected ColumnPageEncoderMeta getEncoderMeta(ColumnPage inputPage) {
+        return new ColumnPageEncoderMeta(inputPage.getColumnSpec(), targetDataType, stats,
+            compressor.getName());
+      }
+
+    };
+  }
+
+  @Override
+  public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
+    return new ColumnPageDecoder() {
+      @Override
+      public ColumnPage decode(byte[] input, int offset, int length)
+          throws MemoryException, IOException {
+        ColumnPage page = ColumnPage.decompress(meta, input, offset, length);
+        return LazyColumnPage.newPage(page, converter);
+      }
+    };
+  }
+
+  // encoded value = (10 power of decimal) * (page value)
+  private ColumnPageValueConverter converter = new ColumnPageValueConverter() {
+    @Override
+    public void encode(int rowId, byte value) {
+      // this codec is for floating point type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public void encode(int rowId, short value) {
+      // this codec is for floating point type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public void encode(int rowId, int value) {
+      // this codec is for floating point type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public void encode(int rowId, long value) {
+      // this codec is for floating point type only
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public void encode(int rowId, float value) {
+      if (targetDataType.equals(DataTypes.BYTE)) {
+        encodedPage.putByte(rowId, (byte) (max - (value * factor)));
+      } else if (targetDataType.equals(DataTypes.SHORT)) {
+        encodedPage.putShort(rowId, (short) (max - (value * factor)));
+      } else if (targetDataType.equals(DataTypes.SHORT_INT)) {
+        encodedPage.putShortInt(rowId, (int) (max - (value * factor)));
+      } else if (targetDataType.equals(DataTypes.INT)) {
+        encodedPage.putInt(rowId, (int) (max - (value * factor)));
+      } else {
+        throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public void encode(int rowId, double value) {
+      if (targetDataType.equals(DataTypes.BYTE)) {
+        encodedPage.putByte(rowId, (byte) (max - (value * factor)));
+      } else if (targetDataType.equals(DataTypes.SHORT)) {
+        encodedPage.putShort(rowId, (short) (max - (value * factor)));
+      } else if (targetDataType.equals(DataTypes.SHORT_INT)) {
+        encodedPage.putShortInt(rowId, (int) (max - (value * factor)));
+      } else if (targetDataType.equals(DataTypes.INT)) {
+        encodedPage.putInt(rowId, (int) (max - (value * factor)));
+      } else if (targetDataType.equals(DataTypes.DOUBLE)) {
+        encodedPage.putDouble(rowId, value);
+      } else {
+        throw new RuntimeException("internal error: " + debugInfo());
+      }
+    }
+
+    @Override
+    public long decodeLong(byte value) {
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public long decodeLong(short value) {
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public long decodeLong(int value) {
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public double decodeDouble(byte value) {
+      return (max - value) / factor;
+    }
+
+    @Override
+    public double decodeDouble(short value) {
+      return (max - value) / factor;
+    }
+
+    @Override
+    public double decodeDouble(int value) {
+      return (max - value) / factor;
+    }
+
+    @Override
+    public double decodeDouble(long value) {
+      return (max - value) / factor;
+    }
+
+    @Override
+    public double decodeDouble(float value) {
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+
+    @Override
+    public double decodeDouble(double value) {
+      throw new RuntimeException("internal error: " + debugInfo());
+    }
+  };
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11661eb6/format/src/main/thrift/schema.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/schema.thrift b/format/src/main/thrift/schema.thrift
index 7869378..216d91f 100644
--- a/format/src/main/thrift/schema.thrift
+++ b/format/src/main/thrift/schema.thrift
@@ -55,6 +55,7 @@ enum Encoding{
 	DIRECT_STRING = 10;   // Stores string value and string length separately in page data
 	ADAPTIVE_FLOATING = 11; // Identifies that a column is encoded using AdaptiveFloatingCodec
 	BOOL_BYTE = 12;   // Identifies that a column is encoded using BooleanPageCodec
+	ADAPTIVE_DELTA_FLOATING = 13; // Identifies that a column is encoded using AdaptiveDeltaFloatingCodec
 }
 
 enum PartitionType{

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11661eb6/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
index d737092..8115e27 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala
@@ -81,6 +81,12 @@ class CarbonV1toV3CompatabilityTestCase extends QueryTest with BeforeAndAfterAll
     checkAnswer(dataFrame, Seq(Row(9281064)))
   }
 
+  test("test v1 to v3 compatabilty filter on measure with double dimension") {
+    val dataFrame = localspark
+      .sql(s"SELECT sum(salary1) FROM t3 where salary1 > 15408")
+    checkAnswer(dataFrame, Seq(Row(9281064)))
+  }
+
   override def afterAll {
     localspark.stop()
   }