You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2019/08/30 22:05:14 UTC

[orc] branch master updated: ORC-363: Enable zStandard codec

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/master by this push:
     new 93b61d9  ORC-363: Enable zStandard codec
93b61d9 is described below

commit 93b61d935376e3297c7dcaedc763be8050c3bf89
Author: Gang Wu <ga...@alibaba-inc.com>
AuthorDate: Fri Jul 12 14:12:36 2019 -0700

    ORC-363: Enable zStandard codec
    
    Support ZSTD codec for java ORC writer and reader. The implementation
    under the hood leverages airlift compressor library.
    
    This also adds a drop in replacement for airlift slice until airlift
    compressor releases a version that doesn't depend on slice. (Slice depends
    on jot-core, which is GPL'ed.)
    
    Fixes #412
    
    Signed-off-by: Owen O'Malley <om...@apache.org>
---
 examples/TestVectorOrcFile.testZstd.0.12.orc       | Bin 0 -> 121017 bytes
 .../airlift/slice/Slice.java}                      |  23 +++-
 .../airlift/slice/Slices.java}                     |  11 +-
 .../airlift/slice/UnsafeSliceFactory.java}         |  18 ++-
 .../core/src/java/io/airlift/slice/UnsafeUtil.java |  66 ++++++++++
 java/core/src/java/io/airlift/slice/XxHash64.java  | 139 +++++++++++++++++++++
 .../src/java/org/apache/orc/CompressionKind.java   |   2 +-
 .../src/java/org/apache/orc/impl/ReaderImpl.java   |   1 +
 .../src/java/org/apache/orc/impl/WriterImpl.java   |   6 +
 .../src/test/org/apache/orc/TestVectorOrcFile.java |  50 +++++++-
 .../org/apache/orc/impl/TestZstd.java}             |  32 +++--
 java/pom.xml                                       |   2 +-
 12 files changed, 322 insertions(+), 28 deletions(-)

diff --git a/examples/TestVectorOrcFile.testZstd.0.12.orc b/examples/TestVectorOrcFile.testZstd.0.12.orc
new file mode 100644
index 0000000..d264419
Binary files /dev/null and b/examples/TestVectorOrcFile.testZstd.0.12.orc differ
diff --git a/java/core/src/java/org/apache/orc/CompressionKind.java b/java/core/src/java/io/airlift/slice/Slice.java
similarity index 69%
copy from java/core/src/java/org/apache/orc/CompressionKind.java
copy to java/core/src/java/io/airlift/slice/Slice.java
index 3cffe57..e768a2c 100644
--- a/java/core/src/java/org/apache/orc/CompressionKind.java
+++ b/java/core/src/java/io/airlift/slice/Slice.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,12 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.orc;
+package io.airlift.slice;
 
 /**
- * An enumeration that lists the generic compression algorithms that
- * can be applied to ORC files.
+ * A stub to avoid the jol-core dependency of slice.
  */
-public enum CompressionKind {
-  NONE, ZLIB, SNAPPY, LZO, LZ4
+public final class Slice {
+  final Object base;
+  final long address;
+  final int length;
+
+  public Slice() {
+    this(null, 0, 0);
+  }
+
+  public Slice(Object base, long address, int length) {
+    this.base = base;
+    this.address = address;
+    this.length = length;
+  }
 }
diff --git a/java/core/src/java/org/apache/orc/CompressionKind.java b/java/core/src/java/io/airlift/slice/Slices.java
similarity index 81%
copy from java/core/src/java/org/apache/orc/CompressionKind.java
copy to java/core/src/java/io/airlift/slice/Slices.java
index 3cffe57..320e8c2 100644
--- a/java/core/src/java/org/apache/orc/CompressionKind.java
+++ b/java/core/src/java/io/airlift/slice/Slices.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,12 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.orc;
+package io.airlift.slice;
 
 /**
- * An enumeration that lists the generic compression algorithms that
- * can be applied to ORC files.
+ * A stub to avoid the jol-core dependency of slice.
  */
-public enum CompressionKind {
-  NONE, ZLIB, SNAPPY, LZO, LZ4
+public final class Slices {
+  public static final Slice EMPTY_SLICE = new Slice();
 }
diff --git a/java/core/src/java/org/apache/orc/CompressionKind.java b/java/core/src/java/io/airlift/slice/UnsafeSliceFactory.java
similarity index 71%
copy from java/core/src/java/org/apache/orc/CompressionKind.java
copy to java/core/src/java/io/airlift/slice/UnsafeSliceFactory.java
index 3cffe57..654045e 100644
--- a/java/core/src/java/org/apache/orc/CompressionKind.java
+++ b/java/core/src/java/io/airlift/slice/UnsafeSliceFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,12 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.orc;
+package io.airlift.slice;
 
 /**
- * An enumeration that lists the generic compression algorithms that
- * can be applied to ORC files.
+ * A stub to avoid the jol-core dependency of slice.
  */
-public enum CompressionKind {
-  NONE, ZLIB, SNAPPY, LZO, LZ4
+public final class UnsafeSliceFactory {
+
+  public static UnsafeSliceFactory getInstance() {
+    return new UnsafeSliceFactory();
+  }
+
+  public Slice newSlice(Object base, long address, int length) {
+    return new Slice(base, address, length);
+  }
 }
diff --git a/java/core/src/java/io/airlift/slice/UnsafeUtil.java b/java/core/src/java/io/airlift/slice/UnsafeUtil.java
new file mode 100644
index 0000000..61b4608
--- /dev/null
+++ b/java/core/src/java/io/airlift/slice/UnsafeUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.airlift.slice;
+
+import io.airlift.compress.IncompatibleJvmException;
+import sun.misc.Unsafe;
+
+import java.lang.reflect.Field;
+import java.nio.Buffer;
+import java.nio.ByteOrder;
+
+import static java.lang.String.format;
+
+final class UnsafeUtil
+{
+    public static final Unsafe UNSAFE;
+    private static final Field ADDRESS_ACCESSOR;
+
+    private UnsafeUtil() {}
+
+    static {
+        ByteOrder order = ByteOrder.nativeOrder();
+        if (!order.equals(ByteOrder.LITTLE_ENDIAN)) {
+            throw new IncompatibleJvmException(format("Zstandard requires a little endian platform (found %s)", order));
+        }
+
+        try {
+            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
+            theUnsafe.setAccessible(true);
+            UNSAFE = (Unsafe) theUnsafe.get(null);
+        }
+        catch (Exception e) {
+            throw new IncompatibleJvmException("Zstandard requires access to sun.misc.Unsafe");
+        }
+
+        try {
+            Field field = Buffer.class.getDeclaredField("address");
+            field.setAccessible(true);
+            ADDRESS_ACCESSOR = field;
+        }
+        catch (Exception e) {
+            throw new IncompatibleJvmException("Zstandard requires access to java.nio.Buffer raw address field");
+        }
+    }
+
+    public static long getAddress(Buffer buffer)
+    {
+        try {
+            return (long) ADDRESS_ACCESSOR.get(buffer);
+        }
+        catch (IllegalAccessException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/java/core/src/java/io/airlift/slice/XxHash64.java b/java/core/src/java/io/airlift/slice/XxHash64.java
new file mode 100644
index 0000000..6cb5bfc
--- /dev/null
+++ b/java/core/src/java/io/airlift/slice/XxHash64.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.airlift.slice;
+
+import static io.airlift.slice.UnsafeUtil.UNSAFE;
+import static java.lang.Long.rotateLeft;
+
+// forked from https://github.com/airlift/slice
+public final class XxHash64
+{
+    private static final long PRIME64_1 = 0x9E3779B185EBCA87L;
+    private static final long PRIME64_2 = 0xC2B2AE3D27D4EB4FL;
+    private static final long PRIME64_3 = 0x165667B19E3779F9L;
+    private static final long PRIME64_4 = 0x85EBCA77C2b2AE63L;
+    private static final long PRIME64_5 = 0x27D4EB2F165667C5L;
+
+    private XxHash64() {}
+
+    public static long hash(long seed, Slice slice)
+    {
+        long hash;
+        if (slice.length >= 32) {
+            hash = updateBody(seed, slice.base, slice.address, slice.length);
+        }
+        else {
+            hash = seed + PRIME64_5;
+        }
+
+        hash += slice.length;
+
+        // round to the closest 32 byte boundary
+        // this is the point up to which updateBody() processed
+        int index = slice.length & 0xFFFFFFE0;
+
+        return updateTail(hash, slice.base, slice.address, index, slice.length);
+    }
+
+    private static long updateTail(long hash, Object base, long address, int index, int length)
+    {
+        while (index <= length - 8) {
+            hash = updateTail(hash, UNSAFE.getLong(base, address + index));
+            index += 8;
+        }
+
+        if (index <= length - 4) {
+            hash = updateTail(hash, UNSAFE.getInt(base, address + index));
+            index += 4;
+        }
+
+        while (index < length) {
+            hash = updateTail(hash, UNSAFE.getByte(base, address + index));
+            index++;
+        }
+
+        hash = finalShuffle(hash);
+
+        return hash;
+    }
+
+    private static long updateBody(long seed, Object base, long address, int length)
+    {
+        long v1 = seed + PRIME64_1 + PRIME64_2;
+        long v2 = seed + PRIME64_2;
+        long v3 = seed;
+        long v4 = seed - PRIME64_1;
+
+        int remaining = length;
+        while (remaining >= 32) {
+            v1 = mix(v1, UNSAFE.getLong(base, address));
+            v2 = mix(v2, UNSAFE.getLong(base, address + 8));
+            v3 = mix(v3, UNSAFE.getLong(base, address + 16));
+            v4 = mix(v4, UNSAFE.getLong(base, address + 24));
+
+            address += 32;
+            remaining -= 32;
+        }
+
+        long hash = rotateLeft(v1, 1) + rotateLeft(v2, 7) + rotateLeft(v3, 12) + rotateLeft(v4, 18);
+
+        hash = update(hash, v1);
+        hash = update(hash, v2);
+        hash = update(hash, v3);
+        hash = update(hash, v4);
+
+        return hash;
+    }
+
+    private static long mix(long current, long value)
+    {
+        return rotateLeft(current + value * PRIME64_2, 31) * PRIME64_1;
+    }
+
+    private static long update(long hash, long value)
+    {
+        long temp = hash ^ mix(0, value);
+        return temp * PRIME64_1 + PRIME64_4;
+    }
+
+    private static long updateTail(long hash, long value)
+    {
+        long temp = hash ^ mix(0, value);
+        return rotateLeft(temp, 27) * PRIME64_1 + PRIME64_4;
+    }
+
+    private static long updateTail(long hash, int value)
+    {
+        long unsigned = value & 0xFFFF_FFFFL;
+        long temp = hash ^ (unsigned * PRIME64_1);
+        return rotateLeft(temp, 23) * PRIME64_2 + PRIME64_3;
+    }
+
+    private static long updateTail(long hash, byte value)
+    {
+        int unsigned = value & 0xFF;
+        long temp = hash ^ (unsigned * PRIME64_5);
+        return rotateLeft(temp, 11) * PRIME64_1;
+    }
+
+    private static long finalShuffle(long hash)
+    {
+        hash ^= hash >>> 33;
+        hash *= PRIME64_2;
+        hash ^= hash >>> 29;
+        hash *= PRIME64_3;
+        hash ^= hash >>> 32;
+        return hash;
+    }
+}
diff --git a/java/core/src/java/org/apache/orc/CompressionKind.java b/java/core/src/java/org/apache/orc/CompressionKind.java
index 3cffe57..4a1cd5c 100644
--- a/java/core/src/java/org/apache/orc/CompressionKind.java
+++ b/java/core/src/java/org/apache/orc/CompressionKind.java
@@ -23,5 +23,5 @@ package org.apache.orc;
  * can be applied to ORC files.
  */
 public enum CompressionKind {
-  NONE, ZLIB, SNAPPY, LZO, LZ4
+  NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD
 }
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index 10989a3..645f23f 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -577,6 +577,7 @@ public class ReaderImpl implements Reader {
       case SNAPPY:
       case LZO:
       case LZ4:
+      case ZSTD:
         break;
       default:
         throw new IllegalArgumentException("Unknown compression");
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index a4b5b36..9e65f2c 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -32,6 +32,8 @@ import io.airlift.compress.lz4.Lz4Compressor;
 import io.airlift.compress.lz4.Lz4Decompressor;
 import io.airlift.compress.lzo.LzoCompressor;
 import io.airlift.compress.lzo.LzoDecompressor;
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
 import org.apache.orc.ColumnStatistics;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.CompressionKind;
@@ -273,6 +275,9 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
       case LZ4:
         return new AircompressorCodec(kind, new Lz4Compressor(),
             new Lz4Decompressor());
+      case ZSTD:
+        return new AircompressorCodec(kind, new ZstdCompressor(),
+            new ZstdDecompressor());
       default:
         throw new IllegalArgumentException("Unknown compression codec: " +
             kind);
@@ -534,6 +539,7 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
       case SNAPPY: return OrcProto.CompressionKind.SNAPPY;
       case LZO: return OrcProto.CompressionKind.LZO;
       case LZ4: return OrcProto.CompressionKind.LZ4;
+      case ZSTD: return OrcProto.CompressionKind.ZSTD;
       default:
         throw new IllegalArgumentException("Unknown compression " + kind);
     }
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index e858917..d1bb0d9 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -2051,7 +2051,7 @@ public class TestVectorOrcFile {
   }
 
   /**
-   * Read and write a randomly generated lzo file.
+   * Read and write a randomly generated lz4 file.
    * @throws Exception
    */
   @Test
@@ -2101,6 +2101,54 @@ public class TestVectorOrcFile {
   }
 
   /**
+   * Read and write a randomly generated zstd file.
+   */
+  @Test
+  public void testZstd() throws Exception {
+    TypeDescription schema =
+        TypeDescription.fromString("struct<x:bigint,y:int,z:bigint>");
+    try (Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .compress(CompressionKind.ZSTD)
+            .bufferSize(1000)
+            .version(fileFormat))) {
+      VectorizedRowBatch batch = schema.createRowBatch();
+      Random rand = new Random(3);
+      batch.size = 1000;
+      for (int b = 0; b < 10; ++b) {
+        for (int r = 0; r < 1000; ++r) {
+          ((LongColumnVector) batch.cols[0]).vector[r] = rand.nextInt();
+          ((LongColumnVector) batch.cols[1]).vector[r] = b * 1000 + r;
+          ((LongColumnVector) batch.cols[2]).vector[r] = rand.nextLong();
+        }
+        writer.addRowBatch(batch);
+      }
+    }
+    try (Reader reader = OrcFile.createReader(testFilePath,
+           OrcFile.readerOptions(conf).filesystem(fs));
+         RecordReader rows = reader.rows()) {
+      assertEquals(CompressionKind.ZSTD, reader.getCompressionKind());
+      VectorizedRowBatch batch = reader.getSchema().createRowBatch(1000);
+      Random rand = new Random(3);
+      for (int b = 0; b < 10; ++b) {
+        rows.nextBatch(batch);
+        assertEquals(1000, batch.size);
+        for (int r = 0; r < batch.size; ++r) {
+          assertEquals(rand.nextInt(),
+              ((LongColumnVector) batch.cols[0]).vector[r]);
+          assertEquals(b * 1000 + r,
+              ((LongColumnVector) batch.cols[1]).vector[r]);
+          assertEquals(rand.nextLong(),
+              ((LongColumnVector) batch.cols[2]).vector[r]);
+        }
+      }
+      rows.nextBatch(batch);
+      assertEquals(0, batch.size);
+    }
+  }
+
+  /**
    * Read and write a file; verify codec usage.
    * @throws Exception
    */
diff --git a/java/core/src/java/org/apache/orc/CompressionKind.java b/java/core/src/test/org/apache/orc/impl/TestZstd.java
similarity index 51%
copy from java/core/src/java/org/apache/orc/CompressionKind.java
copy to java/core/src/test/org/apache/orc/impl/TestZstd.java
index 3cffe57..8a6bbea 100644
--- a/java/core/src/java/org/apache/orc/CompressionKind.java
+++ b/java/core/src/test/org/apache/orc/impl/TestZstd.java
@@ -16,12 +16,30 @@
  * limitations under the License.
  */
 
-package org.apache.orc;
+package org.apache.orc.impl;
+
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestZstd {
+
+  @Test
+  public void testNoOverflow() throws Exception {
+    ByteBuffer in = ByteBuffer.allocate(10);
+    ByteBuffer out = ByteBuffer.allocate(10);
+    in.put(new byte[]{1,2,3,4,5,6,7,10});
+    in.flip();
+    CompressionCodec codec = new AircompressorCodec(
+        CompressionKind.ZSTD, new ZstdCompressor(), new ZstdDecompressor());
+    assertEquals(false, codec.compress(in, out, null,
+        codec.getDefaultOptions()));
+  }
 
-/**
- * An enumeration that lists the generic compression algorithms that
- * can be applied to ORC files.
- */
-public enum CompressionKind {
-  NONE, ZLIB, SNAPPY, LZO, LZ4
 }
diff --git a/java/pom.xml b/java/pom.xml
index 79ee5be..05d6d29 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -431,7 +431,7 @@
       <dependency>
         <groupId>io.airlift</groupId>
         <artifactId>aircompressor</artifactId>
-        <version>0.10</version>
+        <version>0.15</version>
         <exclusions>
           <exclusion>
             <groupId>io.airlift</groupId>