You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2019/08/30 22:05:14 UTC
[orc] branch master updated: ORC-363: Enable zStandard codec
This is an automated email from the ASF dual-hosted git repository.
omalley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/master by this push:
new 93b61d9 ORC-363: Enable zStandard codec
93b61d9 is described below
commit 93b61d935376e3297c7dcaedc763be8050c3bf89
Author: Gang Wu <ga...@alibaba-inc.com>
AuthorDate: Fri Jul 12 14:12:36 2019 -0700
ORC-363: Enable zStandard codec
Support ZSTD codec for java ORC writer and reader. The implementation
under the hood leverages airlift compressor library.
This also adds a drop in replacement for airlift slice until airlift
compressor releases a version that doesn't depend on slice. (Slice depends
on jot-core, which is GPL'ed.)
Fixes #412
Signed-off-by: Owen O'Malley <om...@apache.org>
---
examples/TestVectorOrcFile.testZstd.0.12.orc | Bin 0 -> 121017 bytes
.../airlift/slice/Slice.java} | 23 +++-
.../airlift/slice/Slices.java} | 11 +-
.../airlift/slice/UnsafeSliceFactory.java} | 18 ++-
.../core/src/java/io/airlift/slice/UnsafeUtil.java | 66 ++++++++++
java/core/src/java/io/airlift/slice/XxHash64.java | 139 +++++++++++++++++++++
.../src/java/org/apache/orc/CompressionKind.java | 2 +-
.../src/java/org/apache/orc/impl/ReaderImpl.java | 1 +
.../src/java/org/apache/orc/impl/WriterImpl.java | 6 +
.../src/test/org/apache/orc/TestVectorOrcFile.java | 50 +++++++-
.../org/apache/orc/impl/TestZstd.java} | 32 +++--
java/pom.xml | 2 +-
12 files changed, 322 insertions(+), 28 deletions(-)
diff --git a/examples/TestVectorOrcFile.testZstd.0.12.orc b/examples/TestVectorOrcFile.testZstd.0.12.orc
new file mode 100644
index 0000000..d264419
Binary files /dev/null and b/examples/TestVectorOrcFile.testZstd.0.12.orc differ
diff --git a/java/core/src/java/org/apache/orc/CompressionKind.java b/java/core/src/java/io/airlift/slice/Slice.java
similarity index 69%
copy from java/core/src/java/org/apache/orc/CompressionKind.java
copy to java/core/src/java/io/airlift/slice/Slice.java
index 3cffe57..e768a2c 100644
--- a/java/core/src/java/org/apache/orc/CompressionKind.java
+++ b/java/core/src/java/io/airlift/slice/Slice.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,12 +16,23 @@
* limitations under the License.
*/
-package org.apache.orc;
+package io.airlift.slice;
/**
- * An enumeration that lists the generic compression algorithms that
- * can be applied to ORC files.
+ * A stub to avoid the jol-core dependency of slice.
*/
-public enum CompressionKind {
- NONE, ZLIB, SNAPPY, LZO, LZ4
+public final class Slice {
+ final Object base;
+ final long address;
+ final int length;
+
+ public Slice() {
+ this(null, 0, 0);
+ }
+
+ public Slice(Object base, long address, int length) {
+ this.base = base;
+ this.address = address;
+ this.length = length;
+ }
}
diff --git a/java/core/src/java/org/apache/orc/CompressionKind.java b/java/core/src/java/io/airlift/slice/Slices.java
similarity index 81%
copy from java/core/src/java/org/apache/orc/CompressionKind.java
copy to java/core/src/java/io/airlift/slice/Slices.java
index 3cffe57..320e8c2 100644
--- a/java/core/src/java/org/apache/orc/CompressionKind.java
+++ b/java/core/src/java/io/airlift/slice/Slices.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,12 +16,11 @@
* limitations under the License.
*/
-package org.apache.orc;
+package io.airlift.slice;
/**
- * An enumeration that lists the generic compression algorithms that
- * can be applied to ORC files.
+ * A stub to avoid the jol-core dependency of slice.
*/
-public enum CompressionKind {
- NONE, ZLIB, SNAPPY, LZO, LZ4
+public final class Slices {
+ public static final Slice EMPTY_SLICE = new Slice();
}
diff --git a/java/core/src/java/org/apache/orc/CompressionKind.java b/java/core/src/java/io/airlift/slice/UnsafeSliceFactory.java
similarity index 71%
copy from java/core/src/java/org/apache/orc/CompressionKind.java
copy to java/core/src/java/io/airlift/slice/UnsafeSliceFactory.java
index 3cffe57..654045e 100644
--- a/java/core/src/java/org/apache/orc/CompressionKind.java
+++ b/java/core/src/java/io/airlift/slice/UnsafeSliceFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,12 +16,18 @@
* limitations under the License.
*/
-package org.apache.orc;
+package io.airlift.slice;
/**
- * An enumeration that lists the generic compression algorithms that
- * can be applied to ORC files.
+ * A stub to avoid the jol-core dependency of slice.
*/
-public enum CompressionKind {
- NONE, ZLIB, SNAPPY, LZO, LZ4
+public final class UnsafeSliceFactory {
+
+ public static UnsafeSliceFactory getInstance() {
+ return new UnsafeSliceFactory();
+ }
+
+ public Slice newSlice(Object base, long address, int length) {
+ return new Slice(base, address, length);
+ }
}
diff --git a/java/core/src/java/io/airlift/slice/UnsafeUtil.java b/java/core/src/java/io/airlift/slice/UnsafeUtil.java
new file mode 100644
index 0000000..61b4608
--- /dev/null
+++ b/java/core/src/java/io/airlift/slice/UnsafeUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.airlift.slice;
+
+import io.airlift.compress.IncompatibleJvmException;
+import sun.misc.Unsafe;
+
+import java.lang.reflect.Field;
+import java.nio.Buffer;
+import java.nio.ByteOrder;
+
+import static java.lang.String.format;
+
+final class UnsafeUtil
+{
+ public static final Unsafe UNSAFE;
+ private static final Field ADDRESS_ACCESSOR;
+
+ private UnsafeUtil() {}
+
+ static {
+ ByteOrder order = ByteOrder.nativeOrder();
+ if (!order.equals(ByteOrder.LITTLE_ENDIAN)) {
+ throw new IncompatibleJvmException(format("Zstandard requires a little endian platform (found %s)", order));
+ }
+
+ try {
+ Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
+ theUnsafe.setAccessible(true);
+ UNSAFE = (Unsafe) theUnsafe.get(null);
+ }
+ catch (Exception e) {
+ throw new IncompatibleJvmException("Zstandard requires access to sun.misc.Unsafe");
+ }
+
+ try {
+ Field field = Buffer.class.getDeclaredField("address");
+ field.setAccessible(true);
+ ADDRESS_ACCESSOR = field;
+ }
+ catch (Exception e) {
+ throw new IncompatibleJvmException("Zstandard requires access to java.nio.Buffer raw address field");
+ }
+ }
+
+ public static long getAddress(Buffer buffer)
+ {
+ try {
+ return (long) ADDRESS_ACCESSOR.get(buffer);
+ }
+ catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/java/core/src/java/io/airlift/slice/XxHash64.java b/java/core/src/java/io/airlift/slice/XxHash64.java
new file mode 100644
index 0000000..6cb5bfc
--- /dev/null
+++ b/java/core/src/java/io/airlift/slice/XxHash64.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.airlift.slice;
+
+import static io.airlift.slice.UnsafeUtil.UNSAFE;
+import static java.lang.Long.rotateLeft;
+
+// forked from https://github.com/airlift/slice
+public final class XxHash64
+{
+ private static final long PRIME64_1 = 0x9E3779B185EBCA87L;
+ private static final long PRIME64_2 = 0xC2B2AE3D27D4EB4FL;
+ private static final long PRIME64_3 = 0x165667B19E3779F9L;
+ private static final long PRIME64_4 = 0x85EBCA77C2b2AE63L;
+ private static final long PRIME64_5 = 0x27D4EB2F165667C5L;
+
+ private XxHash64() {}
+
+ public static long hash(long seed, Slice slice)
+ {
+ long hash;
+ if (slice.length >= 32) {
+ hash = updateBody(seed, slice.base, slice.address, slice.length);
+ }
+ else {
+ hash = seed + PRIME64_5;
+ }
+
+ hash += slice.length;
+
+ // round to the closest 32 byte boundary
+ // this is the point up to which updateBody() processed
+ int index = slice.length & 0xFFFFFFE0;
+
+ return updateTail(hash, slice.base, slice.address, index, slice.length);
+ }
+
+ private static long updateTail(long hash, Object base, long address, int index, int length)
+ {
+ while (index <= length - 8) {
+ hash = updateTail(hash, UNSAFE.getLong(base, address + index));
+ index += 8;
+ }
+
+ if (index <= length - 4) {
+ hash = updateTail(hash, UNSAFE.getInt(base, address + index));
+ index += 4;
+ }
+
+ while (index < length) {
+ hash = updateTail(hash, UNSAFE.getByte(base, address + index));
+ index++;
+ }
+
+ hash = finalShuffle(hash);
+
+ return hash;
+ }
+
+ private static long updateBody(long seed, Object base, long address, int length)
+ {
+ long v1 = seed + PRIME64_1 + PRIME64_2;
+ long v2 = seed + PRIME64_2;
+ long v3 = seed;
+ long v4 = seed - PRIME64_1;
+
+ int remaining = length;
+ while (remaining >= 32) {
+ v1 = mix(v1, UNSAFE.getLong(base, address));
+ v2 = mix(v2, UNSAFE.getLong(base, address + 8));
+ v3 = mix(v3, UNSAFE.getLong(base, address + 16));
+ v4 = mix(v4, UNSAFE.getLong(base, address + 24));
+
+ address += 32;
+ remaining -= 32;
+ }
+
+ long hash = rotateLeft(v1, 1) + rotateLeft(v2, 7) + rotateLeft(v3, 12) + rotateLeft(v4, 18);
+
+ hash = update(hash, v1);
+ hash = update(hash, v2);
+ hash = update(hash, v3);
+ hash = update(hash, v4);
+
+ return hash;
+ }
+
+ private static long mix(long current, long value)
+ {
+ return rotateLeft(current + value * PRIME64_2, 31) * PRIME64_1;
+ }
+
+ private static long update(long hash, long value)
+ {
+ long temp = hash ^ mix(0, value);
+ return temp * PRIME64_1 + PRIME64_4;
+ }
+
+ private static long updateTail(long hash, long value)
+ {
+ long temp = hash ^ mix(0, value);
+ return rotateLeft(temp, 27) * PRIME64_1 + PRIME64_4;
+ }
+
+ private static long updateTail(long hash, int value)
+ {
+ long unsigned = value & 0xFFFF_FFFFL;
+ long temp = hash ^ (unsigned * PRIME64_1);
+ return rotateLeft(temp, 23) * PRIME64_2 + PRIME64_3;
+ }
+
+ private static long updateTail(long hash, byte value)
+ {
+ int unsigned = value & 0xFF;
+ long temp = hash ^ (unsigned * PRIME64_5);
+ return rotateLeft(temp, 11) * PRIME64_1;
+ }
+
+ private static long finalShuffle(long hash)
+ {
+ hash ^= hash >>> 33;
+ hash *= PRIME64_2;
+ hash ^= hash >>> 29;
+ hash *= PRIME64_3;
+ hash ^= hash >>> 32;
+ return hash;
+ }
+}
diff --git a/java/core/src/java/org/apache/orc/CompressionKind.java b/java/core/src/java/org/apache/orc/CompressionKind.java
index 3cffe57..4a1cd5c 100644
--- a/java/core/src/java/org/apache/orc/CompressionKind.java
+++ b/java/core/src/java/org/apache/orc/CompressionKind.java
@@ -23,5 +23,5 @@ package org.apache.orc;
* can be applied to ORC files.
*/
public enum CompressionKind {
- NONE, ZLIB, SNAPPY, LZO, LZ4
+ NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD
}
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index 10989a3..645f23f 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -577,6 +577,7 @@ public class ReaderImpl implements Reader {
case SNAPPY:
case LZO:
case LZ4:
+ case ZSTD:
break;
default:
throw new IllegalArgumentException("Unknown compression");
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index a4b5b36..9e65f2c 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -32,6 +32,8 @@ import io.airlift.compress.lz4.Lz4Compressor;
import io.airlift.compress.lz4.Lz4Decompressor;
import io.airlift.compress.lzo.LzoCompressor;
import io.airlift.compress.lzo.LzoDecompressor;
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
import org.apache.orc.ColumnStatistics;
import org.apache.orc.CompressionCodec;
import org.apache.orc.CompressionKind;
@@ -273,6 +275,9 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
case LZ4:
return new AircompressorCodec(kind, new Lz4Compressor(),
new Lz4Decompressor());
+ case ZSTD:
+ return new AircompressorCodec(kind, new ZstdCompressor(),
+ new ZstdDecompressor());
default:
throw new IllegalArgumentException("Unknown compression codec: " +
kind);
@@ -534,6 +539,7 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
case SNAPPY: return OrcProto.CompressionKind.SNAPPY;
case LZO: return OrcProto.CompressionKind.LZO;
case LZ4: return OrcProto.CompressionKind.LZ4;
+ case ZSTD: return OrcProto.CompressionKind.ZSTD;
default:
throw new IllegalArgumentException("Unknown compression " + kind);
}
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index e858917..d1bb0d9 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -2051,7 +2051,7 @@ public class TestVectorOrcFile {
}
/**
- * Read and write a randomly generated lzo file.
+ * Read and write a randomly generated lz4 file.
* @throws Exception
*/
@Test
@@ -2101,6 +2101,54 @@ public class TestVectorOrcFile {
}
/**
+ * Read and write a randomly generated zstd file.
+ */
+ @Test
+ public void testZstd() throws Exception {
+ TypeDescription schema =
+ TypeDescription.fromString("struct<x:bigint,y:int,z:bigint>");
+ try (Writer writer = OrcFile.createWriter(testFilePath,
+ OrcFile.writerOptions(conf)
+ .setSchema(schema)
+ .compress(CompressionKind.ZSTD)
+ .bufferSize(1000)
+ .version(fileFormat))) {
+ VectorizedRowBatch batch = schema.createRowBatch();
+ Random rand = new Random(3);
+ batch.size = 1000;
+ for (int b = 0; b < 10; ++b) {
+ for (int r = 0; r < 1000; ++r) {
+ ((LongColumnVector) batch.cols[0]).vector[r] = rand.nextInt();
+ ((LongColumnVector) batch.cols[1]).vector[r] = b * 1000 + r;
+ ((LongColumnVector) batch.cols[2]).vector[r] = rand.nextLong();
+ }
+ writer.addRowBatch(batch);
+ }
+ }
+ try (Reader reader = OrcFile.createReader(testFilePath,
+ OrcFile.readerOptions(conf).filesystem(fs));
+ RecordReader rows = reader.rows()) {
+ assertEquals(CompressionKind.ZSTD, reader.getCompressionKind());
+ VectorizedRowBatch batch = reader.getSchema().createRowBatch(1000);
+ Random rand = new Random(3);
+ for (int b = 0; b < 10; ++b) {
+ rows.nextBatch(batch);
+ assertEquals(1000, batch.size);
+ for (int r = 0; r < batch.size; ++r) {
+ assertEquals(rand.nextInt(),
+ ((LongColumnVector) batch.cols[0]).vector[r]);
+ assertEquals(b * 1000 + r,
+ ((LongColumnVector) batch.cols[1]).vector[r]);
+ assertEquals(rand.nextLong(),
+ ((LongColumnVector) batch.cols[2]).vector[r]);
+ }
+ }
+ rows.nextBatch(batch);
+ assertEquals(0, batch.size);
+ }
+ }
+
+ /**
* Read and write a file; verify codec usage.
* @throws Exception
*/
diff --git a/java/core/src/java/org/apache/orc/CompressionKind.java b/java/core/src/test/org/apache/orc/impl/TestZstd.java
similarity index 51%
copy from java/core/src/java/org/apache/orc/CompressionKind.java
copy to java/core/src/test/org/apache/orc/impl/TestZstd.java
index 3cffe57..8a6bbea 100644
--- a/java/core/src/java/org/apache/orc/CompressionKind.java
+++ b/java/core/src/test/org/apache/orc/impl/TestZstd.java
@@ -16,12 +16,30 @@
* limitations under the License.
*/
-package org.apache.orc;
+package org.apache.orc.impl;
+
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestZstd {
+
+ @Test
+ public void testNoOverflow() throws Exception {
+ ByteBuffer in = ByteBuffer.allocate(10);
+ ByteBuffer out = ByteBuffer.allocate(10);
+ in.put(new byte[]{1,2,3,4,5,6,7,10});
+ in.flip();
+ CompressionCodec codec = new AircompressorCodec(
+ CompressionKind.ZSTD, new ZstdCompressor(), new ZstdDecompressor());
+ assertEquals(false, codec.compress(in, out, null,
+ codec.getDefaultOptions()));
+ }
-/**
- * An enumeration that lists the generic compression algorithms that
- * can be applied to ORC files.
- */
-public enum CompressionKind {
- NONE, ZLIB, SNAPPY, LZO, LZ4
}
diff --git a/java/pom.xml b/java/pom.xml
index 79ee5be..05d6d29 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -431,7 +431,7 @@
<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
- <version>0.10</version>
+ <version>0.15</version>
<exclusions>
<exclusion>
<groupId>io.airlift</groupId>