You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/10/15 08:54:57 UTC

git commit: TEZ-561. Change MRHelper byte helpers to use compression. (sseth)

Updated Branches:
  refs/heads/master b719d2d44 -> 59539dde2


TEZ-561. Change MRHelper byte helpers to use compression. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/59539dde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/59539dde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/59539dde

Branch: refs/heads/master
Commit: 59539dde2e9dcbf280d7865fca6533fd55a584cd
Parents: b719d2d
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Oct 14 23:54:42 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Oct 14 23:54:42 2013 -0700

----------------------------------------------------------------------
 pom.xml                                         |   5 +
 tez-common/pom.xml                              |   6 ++
 .../java/org/apache/tez/common/TezUtils.java    | 100 +++++++++++++++++--
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  |   1 -
 4 files changed, 103 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/59539dde/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ad22ead..3addf16 100644
--- a/pom.xml
+++ b/pom.xml
@@ -246,6 +246,11 @@
         <artifactId>guice</artifactId>
         <version>3.0</version>
       </dependency>
+      <dependency>
+        <groupId>org.xerial.snappy</groupId>
+        <artifactId>snappy-java</artifactId>
+        <version>1.0.4.1</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/59539dde/tez-common/pom.xml
----------------------------------------------------------------------
diff --git a/tez-common/pom.xml b/tez-common/pom.xml
index 457d6e1..91c9d91 100644
--- a/tez-common/pom.xml
+++ b/tez-common/pom.xml
@@ -41,6 +41,12 @@
       <groupId>org.apache.tez</groupId>
       <artifactId>tez-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.xerial.snappy</groupId>
+      <artifactId>snappy-java</artifactId>
+      <type>jar</type>
+      <scope>compile</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/59539dde/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index 65ac1a4..4e433aa 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -17,24 +17,35 @@
 
 package org.apache.tez.common;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.List;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
 import com.google.protobuf.ByteString;
 
 public class TezUtils {
   
+  private static final Log LOG = LogFactory.getLog(TezUtils.class);
+  
   public static void addUserSpecifiedTezConfiguration(Configuration conf) 
       throws IOException {
     FileInputStream confPBBinaryStream = null;
@@ -63,26 +74,30 @@ public class TezUtils {
       throws IOException {
     Preconditions.checkNotNull(conf, "Configuration must be specified");
     ByteString.Output os = ByteString.newOutput();
-    DataOutputStream dos = new DataOutputStream(os);
+    //SnappyOutputStream compressOs = new SnappyOutputStream(os);
+    DeflaterOutputStream compressOs = new DeflaterOutputStream(os, new Deflater(Deflater.BEST_SPEED));
+    DataOutputStream dos = new DataOutputStream(compressOs);
     conf.write(dos);
+    dos.close();
     return os.toByteString();
   }
-  
+
   public static byte[] createUserPayloadFromConf(Configuration conf)
       throws IOException {
     Preconditions.checkNotNull(conf, "Configuration must be specified");
     DataOutputBuffer dob = new DataOutputBuffer();
     conf.write(dob);
-    return dob.getData();
+    return compressBytes(dob.getData());
   }
 
   public static Configuration createConfFromByteString(ByteString byteString)
       throws IOException {
     Preconditions.checkNotNull(byteString, "ByteString must be specified");
-    DataInputByteBuffer dibb = new DataInputByteBuffer();
-    dibb.reset(byteString.asReadOnlyByteBuffer());
+//    SnappyInputStream uncompressIs = new SnappyInputStream(byteString.newInput());
+    InflaterInputStream uncompressIs = new InflaterInputStream(byteString.newInput());
+    DataInputStream dataInputStream = new DataInputStream(uncompressIs);
     Configuration conf = new Configuration(false);
-    conf.readFields(dibb);
+    conf.readFields(dataInputStream);
     return conf;
   }
   
@@ -90,11 +105,80 @@ public class TezUtils {
       throws IOException {
     // TODO Avoid copy ?
     Preconditions.checkNotNull(bb, "Bytes must be specified");
+    byte[] uncompressed = uncompressBytes(bb);
     DataInputBuffer dib = new DataInputBuffer();
-    dib.reset(bb, 0, bb.length);
+    dib.reset(uncompressed, 0, uncompressed.length);
     Configuration conf = new Configuration(false);
     conf.readFields(dib);
     return conf;
   }
 
+  public static byte[] compressBytes(byte[] inBytes) throws IOException {
+    Stopwatch sw = null;
+    if (LOG.isDebugEnabled()) {
+      sw = new Stopwatch().start();
+    }
+    byte[] compressed = compressBytesInflateDeflate(inBytes);
+    if (LOG.isDebugEnabled()) {
+      sw.stop();
+      LOG.debug("UncompressedSize: " + inBytes.length + ", CompressedSize: "
+          + compressed.length + ", CompressTime: " + sw.elapsedMillis());
+    }
+    return compressed;
+  }
+
+  public static byte[] uncompressBytes(byte[] inBytes) throws IOException {
+    Stopwatch sw = null;
+    if (LOG.isDebugEnabled()) {
+      sw = new Stopwatch().start();
+    }
+    byte[] uncompressed = uncompressBytesInflateDeflate(inBytes);
+    if (LOG.isDebugEnabled()) {
+      sw.stop();
+      LOG.debug("CompressedSize: " + inBytes.length + ", UncompressedSize: "
+          + uncompressed.length + ", UncompressTimeTaken: "
+          + sw.elapsedMillis());
+    }
+    return uncompressed;
+  }
+  
+//  private static byte[] compressBytesSnappy(byte[] inBytes) throws IOException {
+//    return Snappy.compress(inBytes);
+//  }
+//
+//  private static byte[] uncompressBytesSnappy(byte[] inBytes) throws IOException {
+//    return Snappy.uncompress(inBytes);
+//  }  
+  
+  private static byte[] compressBytesInflateDeflate(byte[] inBytes) {
+    Deflater deflater = new Deflater(Deflater.BEST_SPEED);
+    deflater.setInput(inBytes);
+    ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length);
+    deflater.finish();
+    byte[] buffer = new byte[1024 * 8];
+    while (!deflater.finished()) {
+      int count = deflater.deflate(buffer);
+      bos.write(buffer, 0, count);
+    }
+    byte[] output = bos.toByteArray();
+    return output;
+  }
+
+  private static byte[] uncompressBytesInflateDeflate(byte[] inBytes) throws IOException {
+    Inflater inflater = new Inflater();
+    inflater.setInput(inBytes);
+    ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length);
+    byte[] buffer = new byte[1024 * 8];
+    while (!inflater.finished()) {
+      int count;
+      try {
+        count = inflater.inflate(buffer);
+      } catch (DataFormatException e) {
+        throw new IOException(e);
+      }
+      bos.write(buffer, 0, count);
+    }
+    byte[] output = bos.toByteArray();
+    return output;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/59539dde/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 6736467..98645f2 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -19,7 +19,6 @@
 package org.apache.tez.mapreduce.hadoop;
 
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;