You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ap...@apache.org on 2014/11/18 03:18:01 UTC

[3/3] phoenix git commit: PHOENIX-1455 Replace org.xerial.snappy with org.iq80.snappy pure Java snappy implementation

PHOENIX-1455 Replace org.xerial.snappy with org.iq80.snappy pure Java snappy implementation

Conflicts:
	phoenix-core/pom.xml
	phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
	pom.xml


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/43abdea7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/43abdea7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/43abdea7

Branch: refs/heads/3.2
Commit: 43abdea7d67f0de11a0d23f7c1d5fddadf039607
Parents: b805aa6
Author: Andrew Purtell <ap...@apache.org>
Authored: Mon Nov 17 18:05:33 2014 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon Nov 17 18:10:49 2014 -0800

----------------------------------------------------------------------
 phoenix-core/pom.xml                            |  5 ++++
 .../DistinctValueWithCountClientAggregator.java | 17 ++++++++++----
 .../DistinctValueWithCountServerAggregator.java | 24 ++++++--------------
 .../apache/phoenix/join/HashCacheClient.java    |  3 ++-
 .../apache/phoenix/join/HashCacheFactory.java   | 15 ++++++++----
 pom.xml                                         | 14 +++++++++---
 6 files changed, 47 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/43abdea7/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index e331db3..9c6d183 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -272,6 +272,11 @@
       <artifactId>jackson-xc</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.iq80.snappy</groupId>
+      <artifactId>snappy</artifactId>
+      <version>${snappy.version}</version>
+    </dependency>
+    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/43abdea7/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
index f29f46a..56ca000 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
@@ -35,6 +35,7 @@ import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.iq80.snappy.Snappy;
 
 /**
  * Client side Aggregator which will aggregate data and find distinct values with number of occurrences for each.
@@ -59,14 +60,20 @@ public abstract class DistinctValueWithCountClientAggregator extends BaseAggrega
             PDataType resultDataType = getResultDataType();
             cachedResult = resultDataType.toObject(ptr, resultDataType, sortOrder);
         } else {
-            InputStream is = new ByteArrayInputStream(ptr.get(), ptr.getOffset() + 1, ptr.getLength() - 1);
+            InputStream is;
             try {
                 if (Bytes.equals(ptr.get(), ptr.getOffset(), 1, DistinctValueWithCountServerAggregator.COMPRESS_MARKER,
                         0, 1)) {
-                    InputStream decompressionStream = DistinctValueWithCountServerAggregator.COMPRESS_ALGO
-                            .createDecompressionStream(is,
-                                    DistinctValueWithCountServerAggregator.COMPRESS_ALGO.getDecompressor(), 0);
-                    is = decompressionStream;
+                    // This reads the uncompressed length from the front of the compressed input
+                    int uncompressedLength = Snappy.getUncompressedLength(ptr.get(), ptr.getOffset() + 1);
+                    byte[] uncompressed = new byte[uncompressedLength];
+                    // This will throw CorruptionException, a RuntimeException if the snappy data is invalid.
+                    // We're making a RuntimeException out of a checked IOException below so assume it's ok
+                    // to let any CorruptionException escape.
+                    Snappy.uncompress(ptr.get(), ptr.getOffset() + 1, ptr.getLength() - 1, uncompressed, 0);
+                    is = new ByteArrayInputStream(uncompressed, 0, uncompressedLength);
+                } else {
+                    is = new ByteArrayInputStream(ptr.get(), ptr.getOffset() + 1, ptr.getLength() - 1);
                 }
                 DataInputStream in = new DataInputStream(is);
                 int mapSize = WritableUtils.readVInt(in);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/43abdea7/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
index 8d4e727..a3141b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
@@ -17,16 +17,12 @@
  */
 package org.apache.phoenix.expression.aggregator;
 
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.hfile.Compression;
-import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +36,8 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.SizedUtil;
 
+import org.iq80.snappy.Snappy;
+
 /**
  * Server side Aggregator which will aggregate data and find distinct values with number of occurrences for each.
  * 
@@ -50,7 +48,6 @@ public class DistinctValueWithCountServerAggregator extends BaseAggregator {
     private static final Logger LOG = LoggerFactory.getLogger(DistinctValueWithCountServerAggregator.class);
     public static final int DEFAULT_ESTIMATED_DISTINCT_VALUES = 10000;
     public static final byte[] COMPRESS_MARKER = new byte[] { (byte)1 };
-    public static final Algorithm COMPRESS_ALGO = Compression.Algorithm.SNAPPY;
 
     private int compressThreshold;
     private byte[] buffer = null;
@@ -101,18 +98,11 @@ public class DistinctValueWithCountServerAggregator extends BaseAggregator {
         }
         if (serializationSize > compressThreshold) {
             // The size for the map serialization is above the threshold. We will do the Snappy compression here.
-            ByteArrayOutputStream compressedByteStream = new ByteArrayOutputStream();
-            try {
-                compressedByteStream.write(COMPRESS_MARKER);
-                OutputStream compressionStream = COMPRESS_ALGO.createCompressionStream(compressedByteStream,
-                        COMPRESS_ALGO.getCompressor(), 0);
-                compressionStream.write(buffer, 1, buffer.length - 1);
-                compressionStream.flush();
-                ptr.set(compressedByteStream.toByteArray(), 0, compressedByteStream.size());
-                return true;
-            } catch (Exception e) {
-                LOG.error("Exception while Snappy compression of data.", e);
-            }
+            byte[] compressed = new byte[COMPRESS_MARKER.length + Snappy.maxCompressedLength(buffer.length)];
+            System.arraycopy(COMPRESS_MARKER, 0, compressed, 0, COMPRESS_MARKER.length);
+            int compressedLen = Snappy.compress(buffer, 1, buffer.length - 1, compressed, COMPRESS_MARKER.length);
+            ptr.set(compressed, 0, compressedLen + 1);
+            return true;
         }
         ptr.set(buffer, 0, offset);
         return true;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/43abdea7/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
index e2f57df..6494603 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
@@ -39,7 +39,8 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 import org.apache.phoenix.util.TupleUtil;
-import org.xerial.snappy.Snappy;
+
+import org.iq80.snappy.Snappy;
 
 /**
  * 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/43abdea7/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
index 6c2d7ce..5647155 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
+
 import org.apache.phoenix.cache.HashCache;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -50,7 +51,9 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.SizedUtil;
 import org.apache.phoenix.util.TupleUtil;
-import org.xerial.snappy.Snappy;
+
+import org.iq80.snappy.CorruptionException;
+import org.iq80.snappy.Snappy;
 
 public class HashCacheFactory implements ServerCacheFactory {
 
@@ -68,11 +71,13 @@ public class HashCacheFactory implements ServerCacheFactory {
     @Override
     public Closeable newCache(ImmutableBytesWritable cachePtr, MemoryChunk chunk) throws SQLException {
         try {
-            int size = Snappy.uncompressedLength(cachePtr.get());
-            byte[] uncompressed = new byte[size];
-            Snappy.uncompress(cachePtr.get(), 0, cachePtr.getLength(), uncompressed, 0);
+            // This reads the uncompressed length from the front of the compressed input
+            int uncompressedLen = Snappy.getUncompressedLength(cachePtr.get(), cachePtr.getOffset());
+            byte[] uncompressed = new byte[uncompressedLen];
+            Snappy.uncompress(cachePtr.get(), cachePtr.getOffset(), cachePtr.getLength(),
+                uncompressed, 0);
             return new HashCacheImpl(uncompressed, chunk);
-        } catch (IOException e) {
+        } catch (CorruptionException e) {
             throw ServerUtil.parseServerException(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/43abdea7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2a98d77..4420e52 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,14 @@
       <id>conjars.org</id>
       <url>http://conjars.org/repo</url>
     </repository>
+    <repository>
+      <id>sonatype-nexus-snapshots</id>
+      <name>Sonatype Nexus Snapshots</name>
+      <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+    </repository>
   </repositories>
 
   <parent>
@@ -96,7 +104,7 @@
     <flume.version>1.4.0</flume.version>
     <findbugs.version>1.3.2</findbugs.version>
     <jline.version>2.11</jline.version>
-    <snappy.version>1.1.0.1</snappy.version>
+    <snappy.version>0.3</snappy.version>
     <jodatime.version>2.3</jodatime.version>
 
     <!-- Test Dependencies -->
@@ -476,8 +484,8 @@
         <version>${findbugs.version}</version>
       </dependency>
       <dependency>
-        <groupId>org.xerial.snappy</groupId>
-        <artifactId>snappy-java</artifactId>
+        <groupId>org.iq80.snappy</groupId>
+        <artifactId>snappy</artifactId>
         <version>${snappy.version}</version>
       </dependency>
       <dependency>