You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/05 00:44:23 UTC

[14/50] [abbrv] carbondata git commit: use raw compression

use raw compression


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/eadfea78
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/eadfea78
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/eadfea78

Branch: refs/heads/streaming_ingest
Commit: eadfea789b0fd63c4adcd4f7f335530a98dfbb78
Parents: a459dea
Author: jackylk <ja...@huawei.com>
Authored: Tue Jun 27 16:54:54 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Tue Jun 27 23:56:05 2017 +0800

----------------------------------------------------------------------
 .../core/datastore/compression/Compressor.java  |   5 +
 .../datastore/compression/SnappyCompressor.java |  10 +
 .../core/datastore/page/ColumnPage.java         |   3 +-
 .../page/UnsafeFixLengthColumnPage.java         |  20 +-
 .../page/encoding/AdaptiveCompressionCodec.java |   4 +-
 .../page/encoding/AdaptiveIntegerCodec.java     |  18 +-
 .../page/encoding/ColumnPageCodec.java          |   4 +-
 .../page/encoding/CompressionCodec.java         |  57 ------
 .../page/encoding/DefaultEncodingStrategy.java  |  58 +-----
 .../page/encoding/DeltaIntegerCodec.java        |  18 +-
 .../page/encoding/DirectCompressCodec.java      |  58 ++++++
 .../page/encoding/UpscaleFloatingCodec.java     | 202 -------------------
 .../core/memory/UnsafeMemoryManager.java        |   9 +-
 .../store/CarbonFactDataHandlerColumnar.java    |   3 +-
 .../processing/store/TablePageEncoder.java      |  10 +-
 15 files changed, 137 insertions(+), 342 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
index 8da7c8b..2bc8678 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.datastore.compression;
 
+import java.io.IOException;
+
 public interface Compressor {
 
   byte[] compressByte(byte[] unCompInput);
@@ -55,4 +57,7 @@ public interface Compressor {
 
   double[] unCompressDouble(byte[] compInput, int offset, int length);
 
+  long rawCompress(long inputAddress, int inputSize, long outputAddress) throws IOException;
+
+  int maxCompressedLength(int inputSize);
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
index f255339..f8a2f4f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
@@ -217,4 +217,14 @@ public class SnappyCompressor implements Compressor {
     }
     return null;
   }
+
+  @Override
+  public long rawCompress(long inputAddress, int inputSize, long outputAddress) throws IOException {
+    return snappyNative.rawCompress(inputAddress, inputSize, outputAddress);
+  }
+
+  @Override
+  public int maxCompressedLength(int inputSize) {
+    return snappyNative.maxCompressedLength(inputSize);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 155b4ee..730243c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.datastore.page;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.BitSet;
 
@@ -474,7 +475,7 @@ public abstract class ColumnPage {
   /**
    * Compress page data using specified compressor
    */
-  public byte[] compress(Compressor compressor) {
+  public byte[] compress(Compressor compressor) throws MemoryException, IOException {
     switch (dataType) {
       case BYTE:
         return compressor.compressByte(getBytePage());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index 6bd6d31..9f71768 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.datastore.page;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 
 import org.apache.carbondata.core.datastore.compression.Compressor;
@@ -354,9 +355,22 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
   }
 
   @Override
-  public byte[] compress(Compressor compressor) {
-    // TODO: use zero-copy raw compression
-    return super.compress(compressor);
+  public byte[] compress(Compressor compressor) throws MemoryException, IOException {
+    if (UnsafeMemoryManager.isOffHeap()) {
+      // use raw compression and copy to byte[]
+      int inputSize = pageSize << dataType.getSizeBits();
+      int compressedMaxSize = compressor.maxCompressedLength(inputSize);
+      MemoryBlock compressed = UnsafeMemoryManager.allocateMemoryWithRetry(compressedMaxSize);
+      long outSize = compressor.rawCompress(baseOffset, inputSize, compressed.getBaseOffset());
+      assert outSize < Integer.MAX_VALUE;
+      byte[] output = new byte[(int) outSize];
+      CarbonUnsafe.unsafe.copyMemory(compressed.getBaseObject(), compressed.getBaseOffset(), output,
+          CarbonUnsafe.BYTE_ARRAY_OFFSET, outSize);
+      UnsafeMemoryManager.INSTANCE.freeMemory(compressed);
+      return output;
+    } else {
+      return super.compress(compressor);
+    }
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
index 6127583..2e8eff2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.datastore.page.encoding;
 
+import java.io.IOException;
+
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
@@ -53,7 +55,7 @@ public abstract class AdaptiveCompressionCodec implements ColumnPageCodec {
 
   public abstract String getName();
 
-  public abstract byte[] encode(ColumnPage input) throws MemoryException;
+  public abstract byte[] encode(ColumnPage input) throws MemoryException, IOException;
 
   public abstract ColumnPage decode(byte[] input, int offset, int length) throws MemoryException;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
index a12ce00..3d56f0c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.datastore.page.encoding;
 
+import java.io.IOException;
+
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
@@ -49,16 +51,12 @@ class AdaptiveIntegerCodec extends AdaptiveCompressionCodec {
   }
 
   @Override
-  public byte[] encode(ColumnPage input) throws MemoryException {
-    if (srcDataType.equals(targetDataType)) {
-      return input.compress(compressor);
-    } else {
-      encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
-      input.encode(codec);
-      byte[] result = encodedPage.compress(compressor);
-      encodedPage.freeMemory();
-      return result;
-    }
+  public byte[] encode(ColumnPage input) throws MemoryException, IOException {
+    encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+    input.encode(codec);
+    byte[] result = encodedPage.compress(compressor);
+    encodedPage.freeMemory();
+    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
index afba173..36d5989 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.datastore.page.encoding;
 
+import java.io.IOException;
+
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.memory.MemoryException;
 
@@ -36,7 +38,7 @@ public interface ColumnPageCodec {
    * @param input column page to apply
    * @return encoded data
    */
-  byte[] encode(ColumnPage input) throws MemoryException;
+  byte[] encode(ColumnPage input) throws MemoryException, IOException;
 
   /**
    * decode byte array from offset to a column page

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
deleted file mode 100644
index 722ba21..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-
-/**
- * Codec for variable length data type (decimal, string).
- * This codec will flatten the variable length data before applying compression.
- */
-public class CompressionCodec implements ColumnPageCodec {
-
-  private Compressor compressor;
-  private DataType dataType;
-
-  private CompressionCodec(DataType dataType, Compressor compressor) {
-    this.compressor = compressor;
-    this.dataType = dataType;
-  }
-
-  public static CompressionCodec newInstance(DataType dataType, Compressor compressor) {
-    return new CompressionCodec(dataType, compressor);
-  }
-
-  @Override
-  public String getName() {
-    return "CompressionCodec";
-  }
-
-  @Override
-  public byte[] encode(ColumnPage input) {
-    return input.compress(compressor);
-  }
-
-  @Override
-  public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
-    return ColumnPage.decompress(compressor, dataType, input, offset, length);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
index f8e43fc..3818263 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
@@ -61,29 +61,11 @@ public class DefaultEncodingStrategy extends EncodingStrategy {
     }
   }
 
-  // fit the input double value into minimum data type
-  private DataType fitDataType(double value, int decimal) {
-    DataType dataType = DataType.DOUBLE;
-    if (decimal == 0) {
-      if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
-        dataType = DataType.BYTE;
-      } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
-        dataType = DataType.SHORT;
-      } else if (value <= THREE_BYTES_MAX && value >= THREE_BYTES_MIN) {
-        return DataType.SHORT_INT;
-      } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
-        dataType = DataType.INT;
-      } else if (value <= Long.MAX_VALUE && value >= Long.MIN_VALUE) {
-        dataType = DataType.LONG;
-      }
-    }
-    return dataType;
-  }
-
   // choose between adaptive encoder or delta adaptive encoder, based on whose target data type
   // size is smaller
   @Override
   ColumnPageCodec newCodecForIntegerType(ColumnPageStatsVO stats) {
+    DataType srcDataType = stats.getDataType();
     DataType adaptiveDataType = fitDataType((long)stats.getMax(), (long)stats.getMin());
     DataType deltaDataType;
 
@@ -94,6 +76,11 @@ public class DefaultEncodingStrategy extends EncodingStrategy {
     } else {
       deltaDataType = fitDataType((long) stats.getMax() - (long) stats.getMin());
     }
+    if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) ==
+        srcDataType.getSizeInBytes()) {
+      // no effect to use adaptive or delta, use compression only
+      return DirectCompressCodec.newInstance(srcDataType, compressor);
+    }
     if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
       // choose adaptive encoding
       return AdaptiveIntegerCodec.newInstance(
@@ -104,46 +91,19 @@ public class DefaultEncodingStrategy extends EncodingStrategy {
     }
   }
 
-  // choose between upscale adaptive encoder or upscale delta adaptive encoder,
-  // based on whose target data type size is smaller
   @Override
   ColumnPageCodec newCodecForFloatingType(ColumnPageStatsVO stats) {
-    DataType srcDataType = stats.getDataType();
-    double maxValue = (double) stats.getMax();
-    double minValue = (double) stats.getMin();
-    int decimal = stats.getDecimal();
-
-    //Here we should use the Max abs as max to getDatatype, let's say -1 and -10000000, -1 is max,
-    //but we can't use -1 to getDatatype, we should use -10000000.
-    double absMaxValue = Math.abs(maxValue) >= Math.abs(minValue) ? maxValue : minValue;
-
-    if (decimal == 0) {
-      // short, int, long
-      DataType adaptiveDataType = fitDataType(absMaxValue, decimal);
-      DataType deltaDataType = fitDataType(maxValue - minValue, decimal);
-      if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
-        // choose adaptive encoding
-        return AdaptiveIntegerCodec.newInstance(srcDataType, adaptiveDataType, stats, compressor);
-      } else {
-        // choose delta adaptive encoding
-        return DeltaIntegerCodec.newInstance(srcDataType, deltaDataType, stats, compressor);
-      }
-    } else {
-      // double
-      DataType upscaleAdaptiveDataType = fitDataType(Math.pow(10, decimal) * absMaxValue, decimal);
-      return UpscaleFloatingCodec.newInstance(
-          srcDataType, upscaleAdaptiveDataType, stats, compressor);
-    }
+    return DirectCompressCodec.newInstance(stats.getDataType(), compressor);
   }
 
   // for decimal, currently it is a very basic implementation
   @Override
   ColumnPageCodec newCodecForDecimalType(ColumnPageStatsVO stats) {
-    return CompressionCodec.newInstance(stats.getDataType(), compressor);
+    return DirectCompressCodec.newInstance(stats.getDataType(), compressor);
   }
 
   @Override
   ColumnPageCodec newCodecForByteArrayType(ColumnPageStatsVO stats) {
-    return CompressionCodec.newInstance(stats.getDataType(), compressor);
+    return DirectCompressCodec.newInstance(stats.getDataType(), compressor);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
index 2036df5..b77f7a2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.datastore.page.encoding;
 
+import java.io.IOException;
+
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
@@ -64,16 +66,12 @@ public class DeltaIntegerCodec extends AdaptiveCompressionCodec {
   }
 
   @Override
-  public byte[] encode(ColumnPage input) throws MemoryException {
-    if (srcDataType.equals(targetDataType)) {
-      return input.compress(compressor);
-    } else {
-      encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
-      input.encode(codec);
-      byte[] result = encodedPage.compress(compressor);
-      encodedPage.freeMemory();
-      return result;
-    }
+  public byte[] encode(ColumnPage input) throws MemoryException, IOException {
+    encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+    input.encode(codec);
+    byte[] result = encodedPage.compress(compressor);
+    encodedPage.freeMemory();
+    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
new file mode 100644
index 0000000..dcb9b7c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * This codec directly apply compression on the input data
+ */
+public class DirectCompressCodec implements ColumnPageCodec {
+
+  private Compressor compressor;
+  private DataType dataType;
+
+  private DirectCompressCodec(DataType dataType, Compressor compressor) {
+    this.compressor = compressor;
+    this.dataType = dataType;
+  }
+
+  public static DirectCompressCodec newInstance(DataType dataType, Compressor compressor) {
+    return new DirectCompressCodec(dataType, compressor);
+  }
+
+  @Override
+  public String getName() {
+    return "DirectCompressCodec";
+  }
+
+  @Override
+  public byte[] encode(ColumnPage input) throws IOException, MemoryException {
+    return input.compress(compressor);
+  }
+
+  @Override
+  public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
+    return ColumnPage.decompress(compressor, dataType, input, offset, length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
deleted file mode 100644
index 73898af..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import java.math.BigDecimal;
-
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.LazyColumnPage;
-import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
-import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-
-/**
- * Codec for floating point (float, double) data type page.
- * This codec will upscale the diff from page max value to integer value,
- * and do type casting to make storage minimum.
- */
-public class UpscaleFloatingCodec extends AdaptiveCompressionCodec {
-
-  private ColumnPage encodedPage;
-  private double factor;
-
-  public static ColumnPageCodec newInstance(DataType srcDataType, DataType targetDataType,
-      ColumnPageStatsVO stats, Compressor compressor) {
-    return new UpscaleFloatingCodec(srcDataType, targetDataType, stats, compressor);
-  }
-
-  private UpscaleFloatingCodec(DataType srcDataType, DataType targetDataType,
-      ColumnPageStatsVO stats, Compressor compressor) {
-    super(srcDataType, targetDataType, stats, compressor);
-    this.factor = Math.pow(10, stats.getDecimal());
-  }
-
-  @Override
-  public String getName() {
-    return "UpscaleFloatingCodec";
-  }
-
-  @Override
-  public byte[] encode(ColumnPage input) throws MemoryException {
-    if (targetDataType.equals(srcDataType)) {
-      return input.compress(compressor);
-    } else {
-      encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
-      input.encode(codec);
-      byte[] result = encodedPage.compress(compressor);
-      encodedPage.freeMemory();
-      return result;
-    }
-  }
-
-
-  @Override
-  public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
-    if (srcDataType.equals(targetDataType)) {
-      return ColumnPage.decompress(compressor, targetDataType, input, offset, length);
-    } else {
-      ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
-      return LazyColumnPage.newPage(page, codec);
-    }
-  }
-
-  // encoded value = (10 power of decimal) * (page value)
-  private PrimitiveCodec codec = new PrimitiveCodec() {
-    @Override
-    public void encode(int rowId, byte value) {
-      // this codec is for floating point type only
-      throw new RuntimeException("internal error: " + debugInfo());
-    }
-
-    @Override
-    public void encode(int rowId, short value) {
-      // this codec is for floating point type only
-      throw new RuntimeException("internal error: " + debugInfo());
-    }
-
-    @Override
-    public void encode(int rowId, int value) {
-      // this codec is for floating point type only
-      throw new RuntimeException("internal error: " + debugInfo());
-    }
-
-    @Override
-    public void encode(int rowId, long value) {
-      // this codec is for floating point type only
-      throw new RuntimeException("internal error: " + debugInfo());
-    }
-
-    @Override
-    public void encode(int rowId, float value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId,
-              BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).byteValue());
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId,
-              BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).shortValue());
-          break;
-        case INT:
-          encodedPage.putInt(rowId,
-              BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).intValue());
-          break;
-        case LONG:
-          encodedPage.putLong(rowId,
-              BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).longValue());
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
-      }
-    }
-
-    @Override
-    public void encode(int rowId, double value) {
-      switch (targetDataType) {
-        case BYTE:
-          encodedPage.putByte(rowId,
-              BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).byteValue());
-          break;
-        case SHORT:
-          encodedPage.putShort(rowId,
-              BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).shortValue());
-          break;
-        case INT:
-          encodedPage.putInt(rowId,
-              BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).intValue());
-          break;
-        case LONG:
-          encodedPage.putLong(rowId,
-              BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).longValue());
-          break;
-        case DOUBLE:
-          encodedPage.putDouble(rowId, value);
-          break;
-        default:
-          throw new RuntimeException("internal error: " + debugInfo());
-      }
-    }
-
-    @Override
-    public long decodeLong(byte value) {
-      throw new RuntimeException("internal error: " + debugInfo());
-    }
-
-    @Override
-    public long decodeLong(short value) {
-      throw new RuntimeException("internal error: " + debugInfo());
-    }
-
-    @Override
-    public long decodeLong(int value) {
-      throw new RuntimeException("internal error: " + debugInfo());
-    }
-
-    @Override
-    public double decodeDouble(byte value) {
-      return BigDecimal.valueOf(value).divide(BigDecimal.valueOf(factor)).doubleValue();
-    }
-
-    @Override
-    public double decodeDouble(short value) {
-      return BigDecimal.valueOf(value).divide(BigDecimal.valueOf(factor)).doubleValue();
-    }
-
-    @Override
-    public double decodeDouble(int value) {
-      return BigDecimal.valueOf(value).divide(BigDecimal.valueOf(factor)).doubleValue();
-    }
-
-    @Override
-    public double decodeDouble(long value) {
-      return BigDecimal.valueOf(value).divide(BigDecimal.valueOf(factor)).doubleValue();
-    }
-
-    @Override
-    public double decodeDouble(float value) {
-      throw new RuntimeException("internal error: " + debugInfo());
-    }
-
-    @Override
-    public double decodeDouble(double value) {
-      throw new RuntimeException("internal error: " + debugInfo());
-    }
-  };
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index 90cbe75..28e63a9 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -33,6 +33,9 @@ public class UnsafeMemoryManager {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(UnsafeMemoryManager.class.getName());
 
+  private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+          CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
   static {
     long size;
     try {
@@ -50,9 +53,6 @@ public class UnsafeMemoryManager {
           + "so setting default value to " + size);
     }
 
-    boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
-            CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
     long takenSize = size * 1024 * 1024;
     MemoryAllocator allocator;
     if (offHeap) {
@@ -159,4 +159,7 @@ public class UnsafeMemoryManager {
     return baseBlock;
   }
 
+  public static boolean isOffHeap() {
+    return offHeap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 300ff0c..01e3ab6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.processing.store;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -336,7 +337,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * generate the NodeHolder from the input rows (one page in case of V3 format)
    */
   private NodeHolder processDataRows(List<CarbonRow> dataRows)
-      throws CarbonDataWriterException, KeyGenException, MemoryException {
+      throws CarbonDataWriterException, KeyGenException, MemoryException, IOException {
     if (dataRows.size() == 0) {
       return new NodeHolder();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
index 608f578..8547845 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.processing.store;
 
+import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.carbondata.core.datastore.TableSpec;
@@ -39,7 +40,7 @@ import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
-public class TablePageEncoder {
+class TablePageEncoder {
 
   private ColumnarFormatVersion version;
 
@@ -49,14 +50,15 @@ public class TablePageEncoder {
 
   private static final EncodingStrategy encodingStrategy = new DefaultEncodingStrategy();
 
-  public TablePageEncoder(CarbonFactDataHandlerModel model) {
+  TablePageEncoder(CarbonFactDataHandlerModel model) {
     this.version = CarbonProperties.getInstance().getFormatVersion();
     this.model = model;
     this.isUseInvertedIndex = model.getIsUseInvertedIndex();
   }
 
   // function to apply all columns in one table page
-  public EncodedData encode(TablePage tablePage) throws KeyGenException, MemoryException {
+  EncodedData encode(TablePage tablePage)
+      throws KeyGenException, MemoryException, IOException {
     EncodedData encodedData = new EncodedData();
     encodeAndCompressDimensions(tablePage, encodedData);
     encodeAndCompressMeasures(tablePage, encodedData);
@@ -65,7 +67,7 @@ public class TablePageEncoder {
 
   // apply measure and set encodedData in `encodedData`
   private void encodeAndCompressMeasures(TablePage tablePage, EncodedData encodedData)
-      throws MemoryException {
+      throws MemoryException, IOException {
     ColumnPage[] measurePage = tablePage.getMeasurePage();
     byte[][] encodedMeasures = new byte[measurePage.length][];
     for (int i = 0; i < measurePage.length; i++) {