You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/05 00:44:23 UTC
[14/50] [abbrv] carbondata git commit: use raw compression
use raw compression
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/eadfea78
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/eadfea78
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/eadfea78
Branch: refs/heads/streaming_ingest
Commit: eadfea789b0fd63c4adcd4f7f335530a98dfbb78
Parents: a459dea
Author: jackylk <ja...@huawei.com>
Authored: Tue Jun 27 16:54:54 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Tue Jun 27 23:56:05 2017 +0800
----------------------------------------------------------------------
.../core/datastore/compression/Compressor.java | 5 +
.../datastore/compression/SnappyCompressor.java | 10 +
.../core/datastore/page/ColumnPage.java | 3 +-
.../page/UnsafeFixLengthColumnPage.java | 20 +-
.../page/encoding/AdaptiveCompressionCodec.java | 4 +-
.../page/encoding/AdaptiveIntegerCodec.java | 18 +-
.../page/encoding/ColumnPageCodec.java | 4 +-
.../page/encoding/CompressionCodec.java | 57 ------
.../page/encoding/DefaultEncodingStrategy.java | 58 +-----
.../page/encoding/DeltaIntegerCodec.java | 18 +-
.../page/encoding/DirectCompressCodec.java | 58 ++++++
.../page/encoding/UpscaleFloatingCodec.java | 202 -------------------
.../core/memory/UnsafeMemoryManager.java | 9 +-
.../store/CarbonFactDataHandlerColumnar.java | 3 +-
.../processing/store/TablePageEncoder.java | 10 +-
15 files changed, 137 insertions(+), 342 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
index 8da7c8b..2bc8678 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/Compressor.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.core.datastore.compression;
+import java.io.IOException;
+
public interface Compressor {
byte[] compressByte(byte[] unCompInput);
@@ -55,4 +57,7 @@ public interface Compressor {
double[] unCompressDouble(byte[] compInput, int offset, int length);
+ long rawCompress(long inputAddress, int inputSize, long outputAddress) throws IOException;
+
+ int maxCompressedLength(int inputSize);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
index f255339..f8a2f4f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
@@ -217,4 +217,14 @@ public class SnappyCompressor implements Compressor {
}
return null;
}
+
+ @Override
+ public long rawCompress(long inputAddress, int inputSize, long outputAddress) throws IOException {
+ return snappyNative.rawCompress(inputAddress, inputSize, outputAddress);
+ }
+
+ @Override
+ public int maxCompressedLength(int inputSize) {
+ return snappyNative.maxCompressedLength(inputSize);
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 155b4ee..730243c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.core.datastore.page;
+import java.io.IOException;
import java.math.BigDecimal;
import java.util.BitSet;
@@ -474,7 +475,7 @@ public abstract class ColumnPage {
/**
* Compress page data using specified compressor
*/
- public byte[] compress(Compressor compressor) {
+ public byte[] compress(Compressor compressor) throws MemoryException, IOException {
switch (dataType) {
case BYTE:
return compressor.compressByte(getBytePage());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index 6bd6d31..9f71768 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.core.datastore.page;
+import java.io.IOException;
import java.math.BigDecimal;
import org.apache.carbondata.core.datastore.compression.Compressor;
@@ -354,9 +355,22 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
}
@Override
- public byte[] compress(Compressor compressor) {
- // TODO: use zero-copy raw compression
- return super.compress(compressor);
+ public byte[] compress(Compressor compressor) throws MemoryException, IOException {
+ if (UnsafeMemoryManager.isOffHeap()) {
+ // use raw compression and copy to byte[]
+ int inputSize = pageSize << dataType.getSizeBits();
+ int compressedMaxSize = compressor.maxCompressedLength(inputSize);
+ MemoryBlock compressed = UnsafeMemoryManager.allocateMemoryWithRetry(compressedMaxSize);
+ long outSize = compressor.rawCompress(baseOffset, inputSize, compressed.getBaseOffset());
+ assert outSize < Integer.MAX_VALUE;
+ byte[] output = new byte[(int) outSize];
+ CarbonUnsafe.unsafe.copyMemory(compressed.getBaseObject(), compressed.getBaseOffset(), output,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, outSize);
+ UnsafeMemoryManager.INSTANCE.freeMemory(compressed);
+ return output;
+ } else {
+ return super.compress(compressor);
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
index 6127583..2e8eff2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveCompressionCodec.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.core.datastore.page.encoding;
+import java.io.IOException;
+
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
@@ -53,7 +55,7 @@ public abstract class AdaptiveCompressionCodec implements ColumnPageCodec {
public abstract String getName();
- public abstract byte[] encode(ColumnPage input) throws MemoryException;
+ public abstract byte[] encode(ColumnPage input) throws MemoryException, IOException;
public abstract ColumnPage decode(byte[] input, int offset, int length) throws MemoryException;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
index a12ce00..3d56f0c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/AdaptiveIntegerCodec.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.core.datastore.page.encoding;
+import java.io.IOException;
+
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.LazyColumnPage;
@@ -49,16 +51,12 @@ class AdaptiveIntegerCodec extends AdaptiveCompressionCodec {
}
@Override
- public byte[] encode(ColumnPage input) throws MemoryException {
- if (srcDataType.equals(targetDataType)) {
- return input.compress(compressor);
- } else {
- encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
- input.encode(codec);
- byte[] result = encodedPage.compress(compressor);
- encodedPage.freeMemory();
- return result;
- }
+ public byte[] encode(ColumnPage input) throws MemoryException, IOException {
+ encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+ input.encode(codec);
+ byte[] result = encodedPage.compress(compressor);
+ encodedPage.freeMemory();
+ return result;
}
@Override
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
index afba173..36d5989 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageCodec.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.core.datastore.page.encoding;
+import java.io.IOException;
+
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.memory.MemoryException;
@@ -36,7 +38,7 @@ public interface ColumnPageCodec {
* @param input column page to apply
* @return encoded data
*/
- byte[] encode(ColumnPage input) throws MemoryException;
+ byte[] encode(ColumnPage input) throws MemoryException, IOException;
/**
* decode byte array from offset to a column page
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
deleted file mode 100644
index 722ba21..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/CompressionCodec.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-
-/**
- * Codec for variable length data type (decimal, string).
- * This codec will flatten the variable length data before applying compression.
- */
-public class CompressionCodec implements ColumnPageCodec {
-
- private Compressor compressor;
- private DataType dataType;
-
- private CompressionCodec(DataType dataType, Compressor compressor) {
- this.compressor = compressor;
- this.dataType = dataType;
- }
-
- public static CompressionCodec newInstance(DataType dataType, Compressor compressor) {
- return new CompressionCodec(dataType, compressor);
- }
-
- @Override
- public String getName() {
- return "CompressionCodec";
- }
-
- @Override
- public byte[] encode(ColumnPage input) {
- return input.compress(compressor);
- }
-
- @Override
- public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
- return ColumnPage.decompress(compressor, dataType, input, offset, length);
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
index f8e43fc..3818263 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingStrategy.java
@@ -61,29 +61,11 @@ public class DefaultEncodingStrategy extends EncodingStrategy {
}
}
- // fit the input double value into minimum data type
- private DataType fitDataType(double value, int decimal) {
- DataType dataType = DataType.DOUBLE;
- if (decimal == 0) {
- if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
- dataType = DataType.BYTE;
- } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
- dataType = DataType.SHORT;
- } else if (value <= THREE_BYTES_MAX && value >= THREE_BYTES_MIN) {
- return DataType.SHORT_INT;
- } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
- dataType = DataType.INT;
- } else if (value <= Long.MAX_VALUE && value >= Long.MIN_VALUE) {
- dataType = DataType.LONG;
- }
- }
- return dataType;
- }
-
// choose between adaptive encoder or delta adaptive encoder, based on whose target data type
// size is smaller
@Override
ColumnPageCodec newCodecForIntegerType(ColumnPageStatsVO stats) {
+ DataType srcDataType = stats.getDataType();
DataType adaptiveDataType = fitDataType((long)stats.getMax(), (long)stats.getMin());
DataType deltaDataType;
@@ -94,6 +76,11 @@ public class DefaultEncodingStrategy extends EncodingStrategy {
} else {
deltaDataType = fitDataType((long) stats.getMax() - (long) stats.getMin());
}
+ if (Math.min(adaptiveDataType.getSizeInBytes(), deltaDataType.getSizeInBytes()) ==
+ srcDataType.getSizeInBytes()) {
+ // no effect to use adaptive or delta, use compression only
+ return DirectCompressCodec.newInstance(srcDataType, compressor);
+ }
if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
// choose adaptive encoding
return AdaptiveIntegerCodec.newInstance(
@@ -104,46 +91,19 @@ public class DefaultEncodingStrategy extends EncodingStrategy {
}
}
- // choose between upscale adaptive encoder or upscale delta adaptive encoder,
- // based on whose target data type size is smaller
@Override
ColumnPageCodec newCodecForFloatingType(ColumnPageStatsVO stats) {
- DataType srcDataType = stats.getDataType();
- double maxValue = (double) stats.getMax();
- double minValue = (double) stats.getMin();
- int decimal = stats.getDecimal();
-
- //Here we should use the Max abs as max to getDatatype, let's say -1 and -10000000, -1 is max,
- //but we can't use -1 to getDatatype, we should use -10000000.
- double absMaxValue = Math.abs(maxValue) >= Math.abs(minValue) ? maxValue : minValue;
-
- if (decimal == 0) {
- // short, int, long
- DataType adaptiveDataType = fitDataType(absMaxValue, decimal);
- DataType deltaDataType = fitDataType(maxValue - minValue, decimal);
- if (adaptiveDataType.getSizeInBytes() <= deltaDataType.getSizeInBytes()) {
- // choose adaptive encoding
- return AdaptiveIntegerCodec.newInstance(srcDataType, adaptiveDataType, stats, compressor);
- } else {
- // choose delta adaptive encoding
- return DeltaIntegerCodec.newInstance(srcDataType, deltaDataType, stats, compressor);
- }
- } else {
- // double
- DataType upscaleAdaptiveDataType = fitDataType(Math.pow(10, decimal) * absMaxValue, decimal);
- return UpscaleFloatingCodec.newInstance(
- srcDataType, upscaleAdaptiveDataType, stats, compressor);
- }
+ return DirectCompressCodec.newInstance(stats.getDataType(), compressor);
}
// for decimal, currently it is a very basic implementation
@Override
ColumnPageCodec newCodecForDecimalType(ColumnPageStatsVO stats) {
- return CompressionCodec.newInstance(stats.getDataType(), compressor);
+ return DirectCompressCodec.newInstance(stats.getDataType(), compressor);
}
@Override
ColumnPageCodec newCodecForByteArrayType(ColumnPageStatsVO stats) {
- return CompressionCodec.newInstance(stats.getDataType(), compressor);
+ return DirectCompressCodec.newInstance(stats.getDataType(), compressor);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
index 2036df5..b77f7a2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DeltaIntegerCodec.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.core.datastore.page.encoding;
+import java.io.IOException;
+
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.LazyColumnPage;
@@ -64,16 +66,12 @@ public class DeltaIntegerCodec extends AdaptiveCompressionCodec {
}
@Override
- public byte[] encode(ColumnPage input) throws MemoryException {
- if (srcDataType.equals(targetDataType)) {
- return input.compress(compressor);
- } else {
- encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
- input.encode(codec);
- byte[] result = encodedPage.compress(compressor);
- encodedPage.freeMemory();
- return result;
- }
+ public byte[] encode(ColumnPage input) throws MemoryException, IOException {
+ encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
+ input.encode(codec);
+ byte[] result = encodedPage.compress(compressor);
+ encodedPage.freeMemory();
+ return result;
}
@Override
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
new file mode 100644
index 0000000..dcb9b7c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DirectCompressCodec.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * This codec directly apply compression on the input data
+ */
+public class DirectCompressCodec implements ColumnPageCodec {
+
+ private Compressor compressor;
+ private DataType dataType;
+
+ private DirectCompressCodec(DataType dataType, Compressor compressor) {
+ this.compressor = compressor;
+ this.dataType = dataType;
+ }
+
+ public static DirectCompressCodec newInstance(DataType dataType, Compressor compressor) {
+ return new DirectCompressCodec(dataType, compressor);
+ }
+
+ @Override
+ public String getName() {
+ return "DirectCompressCodec";
+ }
+
+ @Override
+ public byte[] encode(ColumnPage input) throws IOException, MemoryException {
+ return input.compress(compressor);
+ }
+
+ @Override
+ public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
+ return ColumnPage.decompress(compressor, dataType, input, offset, length);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
deleted file mode 100644
index 73898af..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/UpscaleFloatingCodec.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.encoding;
-
-import java.math.BigDecimal;
-
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.LazyColumnPage;
-import org.apache.carbondata.core.datastore.page.PrimitiveCodec;
-import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-
-/**
- * Codec for floating point (float, double) data type page.
- * This codec will upscale the diff from page max value to integer value,
- * and do type casting to make storage minimum.
- */
-public class UpscaleFloatingCodec extends AdaptiveCompressionCodec {
-
- private ColumnPage encodedPage;
- private double factor;
-
- public static ColumnPageCodec newInstance(DataType srcDataType, DataType targetDataType,
- ColumnPageStatsVO stats, Compressor compressor) {
- return new UpscaleFloatingCodec(srcDataType, targetDataType, stats, compressor);
- }
-
- private UpscaleFloatingCodec(DataType srcDataType, DataType targetDataType,
- ColumnPageStatsVO stats, Compressor compressor) {
- super(srcDataType, targetDataType, stats, compressor);
- this.factor = Math.pow(10, stats.getDecimal());
- }
-
- @Override
- public String getName() {
- return "UpscaleFloatingCodec";
- }
-
- @Override
- public byte[] encode(ColumnPage input) throws MemoryException {
- if (targetDataType.equals(srcDataType)) {
- return input.compress(compressor);
- } else {
- encodedPage = ColumnPage.newPage(targetDataType, input.getPageSize());
- input.encode(codec);
- byte[] result = encodedPage.compress(compressor);
- encodedPage.freeMemory();
- return result;
- }
- }
-
-
- @Override
- public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
- if (srcDataType.equals(targetDataType)) {
- return ColumnPage.decompress(compressor, targetDataType, input, offset, length);
- } else {
- ColumnPage page = ColumnPage.decompress(compressor, targetDataType, input, offset, length);
- return LazyColumnPage.newPage(page, codec);
- }
- }
-
- // encoded value = (10 power of decimal) * (page value)
- private PrimitiveCodec codec = new PrimitiveCodec() {
- @Override
- public void encode(int rowId, byte value) {
- // this codec is for floating point type only
- throw new RuntimeException("internal error: " + debugInfo());
- }
-
- @Override
- public void encode(int rowId, short value) {
- // this codec is for floating point type only
- throw new RuntimeException("internal error: " + debugInfo());
- }
-
- @Override
- public void encode(int rowId, int value) {
- // this codec is for floating point type only
- throw new RuntimeException("internal error: " + debugInfo());
- }
-
- @Override
- public void encode(int rowId, long value) {
- // this codec is for floating point type only
- throw new RuntimeException("internal error: " + debugInfo());
- }
-
- @Override
- public void encode(int rowId, float value) {
- switch (targetDataType) {
- case BYTE:
- encodedPage.putByte(rowId,
- BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).byteValue());
- break;
- case SHORT:
- encodedPage.putShort(rowId,
- BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).shortValue());
- break;
- case INT:
- encodedPage.putInt(rowId,
- BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).intValue());
- break;
- case LONG:
- encodedPage.putLong(rowId,
- BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).longValue());
- break;
- default:
- throw new RuntimeException("internal error: " + debugInfo());
- }
- }
-
- @Override
- public void encode(int rowId, double value) {
- switch (targetDataType) {
- case BYTE:
- encodedPage.putByte(rowId,
- BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).byteValue());
- break;
- case SHORT:
- encodedPage.putShort(rowId,
- BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).shortValue());
- break;
- case INT:
- encodedPage.putInt(rowId,
- BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).intValue());
- break;
- case LONG:
- encodedPage.putLong(rowId,
- BigDecimal.valueOf(value).multiply(BigDecimal.valueOf(factor)).longValue());
- break;
- case DOUBLE:
- encodedPage.putDouble(rowId, value);
- break;
- default:
- throw new RuntimeException("internal error: " + debugInfo());
- }
- }
-
- @Override
- public long decodeLong(byte value) {
- throw new RuntimeException("internal error: " + debugInfo());
- }
-
- @Override
- public long decodeLong(short value) {
- throw new RuntimeException("internal error: " + debugInfo());
- }
-
- @Override
- public long decodeLong(int value) {
- throw new RuntimeException("internal error: " + debugInfo());
- }
-
- @Override
- public double decodeDouble(byte value) {
- return BigDecimal.valueOf(value).divide(BigDecimal.valueOf(factor)).doubleValue();
- }
-
- @Override
- public double decodeDouble(short value) {
- return BigDecimal.valueOf(value).divide(BigDecimal.valueOf(factor)).doubleValue();
- }
-
- @Override
- public double decodeDouble(int value) {
- return BigDecimal.valueOf(value).divide(BigDecimal.valueOf(factor)).doubleValue();
- }
-
- @Override
- public double decodeDouble(long value) {
- return BigDecimal.valueOf(value).divide(BigDecimal.valueOf(factor)).doubleValue();
- }
-
- @Override
- public double decodeDouble(float value) {
- throw new RuntimeException("internal error: " + debugInfo());
- }
-
- @Override
- public double decodeDouble(double value) {
- throw new RuntimeException("internal error: " + debugInfo());
- }
- };
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index 90cbe75..28e63a9 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -33,6 +33,9 @@ public class UnsafeMemoryManager {
private static final LogService LOGGER =
LogServiceFactory.getLogService(UnsafeMemoryManager.class.getName());
+ private static boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+ CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
static {
long size;
try {
@@ -50,9 +53,6 @@ public class UnsafeMemoryManager {
+ "so setting default value to " + size);
}
- boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
- CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
long takenSize = size * 1024 * 1024;
MemoryAllocator allocator;
if (offHeap) {
@@ -159,4 +159,7 @@ public class UnsafeMemoryManager {
return baseBlock;
}
+ public static boolean isOffHeap() {
+ return offHeap;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 300ff0c..01e3ab6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -18,6 +18,7 @@
package org.apache.carbondata.processing.store;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
@@ -336,7 +337,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* generate the NodeHolder from the input rows (one page in case of V3 format)
*/
private NodeHolder processDataRows(List<CarbonRow> dataRows)
- throws CarbonDataWriterException, KeyGenException, MemoryException {
+ throws CarbonDataWriterException, KeyGenException, MemoryException, IOException {
if (dataRows.size() == 0) {
return new NodeHolder();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eadfea78/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
index 608f578..8547845 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.processing.store;
+import java.io.IOException;
import java.util.Iterator;
import org.apache.carbondata.core.datastore.TableSpec;
@@ -39,7 +40,7 @@ import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-public class TablePageEncoder {
+class TablePageEncoder {
private ColumnarFormatVersion version;
@@ -49,14 +50,15 @@ public class TablePageEncoder {
private static final EncodingStrategy encodingStrategy = new DefaultEncodingStrategy();
- public TablePageEncoder(CarbonFactDataHandlerModel model) {
+ TablePageEncoder(CarbonFactDataHandlerModel model) {
this.version = CarbonProperties.getInstance().getFormatVersion();
this.model = model;
this.isUseInvertedIndex = model.getIsUseInvertedIndex();
}
// function to apply all columns in one table page
- public EncodedData encode(TablePage tablePage) throws KeyGenException, MemoryException {
+ EncodedData encode(TablePage tablePage)
+ throws KeyGenException, MemoryException, IOException {
EncodedData encodedData = new EncodedData();
encodeAndCompressDimensions(tablePage, encodedData);
encodeAndCompressMeasures(tablePage, encodedData);
@@ -65,7 +67,7 @@ public class TablePageEncoder {
// apply measure and set encodedData in `encodedData`
private void encodeAndCompressMeasures(TablePage tablePage, EncodedData encodedData)
- throws MemoryException {
+ throws MemoryException, IOException {
ColumnPage[] measurePage = tablePage.getMeasurePage();
byte[][] encodedMeasures = new byte[measurePage.length][];
for (int i = 0; i < measurePage.length; i++) {