You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/10/15 08:54:57 UTC
git commit: TEZ-561. Change MRHelper byte helpers to use compression.
(sseth)
Updated Branches:
refs/heads/master b719d2d44 -> 59539dde2
TEZ-561. Change MRHelper byte helpers to use compression. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/59539dde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/59539dde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/59539dde
Branch: refs/heads/master
Commit: 59539dde2e9dcbf280d7865fca6533fd55a584cd
Parents: b719d2d
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Oct 14 23:54:42 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Oct 14 23:54:42 2013 -0700
----------------------------------------------------------------------
pom.xml | 5 +
tez-common/pom.xml | 6 ++
.../java/org/apache/tez/common/TezUtils.java | 100 +++++++++++++++++--
.../apache/tez/mapreduce/hadoop/MRHelpers.java | 1 -
4 files changed, 103 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/59539dde/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ad22ead..3addf16 100644
--- a/pom.xml
+++ b/pom.xml
@@ -246,6 +246,11 @@
<artifactId>guice</artifactId>
<version>3.0</version>
</dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.0.4.1</version>
+ </dependency>
</dependencies>
</dependencyManagement>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/59539dde/tez-common/pom.xml
----------------------------------------------------------------------
diff --git a/tez-common/pom.xml b/tez-common/pom.xml
index 457d6e1..91c9d91 100644
--- a/tez-common/pom.xml
+++ b/tez-common/pom.xml
@@ -41,6 +41,12 @@
<groupId>org.apache.tez</groupId>
<artifactId>tez-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/59539dde/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index 65ac1a4..4e433aa 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -17,24 +17,35 @@
package org.apache.tez.common;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
import com.google.protobuf.ByteString;
public class TezUtils {
+ private static final Log LOG = LogFactory.getLog(TezUtils.class);
+
public static void addUserSpecifiedTezConfiguration(Configuration conf)
throws IOException {
FileInputStream confPBBinaryStream = null;
@@ -63,26 +74,30 @@ public class TezUtils {
throws IOException {
Preconditions.checkNotNull(conf, "Configuration must be specified");
ByteString.Output os = ByteString.newOutput();
- DataOutputStream dos = new DataOutputStream(os);
+ //SnappyOutputStream compressOs = new SnappyOutputStream(os);
+ DeflaterOutputStream compressOs = new DeflaterOutputStream(os, new Deflater(Deflater.BEST_SPEED));
+ DataOutputStream dos = new DataOutputStream(compressOs);
conf.write(dos);
+ dos.close();
return os.toByteString();
}
-
+
public static byte[] createUserPayloadFromConf(Configuration conf)
throws IOException {
Preconditions.checkNotNull(conf, "Configuration must be specified");
DataOutputBuffer dob = new DataOutputBuffer();
conf.write(dob);
- return dob.getData();
+ return compressBytes(dob.getData());
}
public static Configuration createConfFromByteString(ByteString byteString)
throws IOException {
Preconditions.checkNotNull(byteString, "ByteString must be specified");
- DataInputByteBuffer dibb = new DataInputByteBuffer();
- dibb.reset(byteString.asReadOnlyByteBuffer());
+// SnappyInputStream uncompressIs = new SnappyInputStream(byteString.newInput());
+ InflaterInputStream uncompressIs = new InflaterInputStream(byteString.newInput());
+ DataInputStream dataInputStream = new DataInputStream(uncompressIs);
Configuration conf = new Configuration(false);
- conf.readFields(dibb);
+ conf.readFields(dataInputStream);
return conf;
}
@@ -90,11 +105,80 @@ public class TezUtils {
throws IOException {
// TODO Avoid copy ?
Preconditions.checkNotNull(bb, "Bytes must be specified");
+ byte[] uncompressed = uncompressBytes(bb);
DataInputBuffer dib = new DataInputBuffer();
- dib.reset(bb, 0, bb.length);
+ dib.reset(uncompressed, 0, uncompressed.length);
Configuration conf = new Configuration(false);
conf.readFields(dib);
return conf;
}
+ public static byte[] compressBytes(byte[] inBytes) throws IOException {
+ Stopwatch sw = null;
+ if (LOG.isDebugEnabled()) {
+ sw = new Stopwatch().start();
+ }
+ byte[] compressed = compressBytesInflateDeflate(inBytes);
+ if (LOG.isDebugEnabled()) {
+ sw.stop();
+ LOG.debug("UncompressedSize: " + inBytes.length + ", CompressedSize: "
+ + compressed.length + ", CompressTime: " + sw.elapsedMillis());
+ }
+ return compressed;
+ }
+
+ public static byte[] uncompressBytes(byte[] inBytes) throws IOException {
+ Stopwatch sw = null;
+ if (LOG.isDebugEnabled()) {
+ sw = new Stopwatch().start();
+ }
+ byte[] uncompressed = uncompressBytesInflateDeflate(inBytes);
+ if (LOG.isDebugEnabled()) {
+ sw.stop();
+ LOG.debug("CompressedSize: " + inBytes.length + ", UncompressedSize: "
+ + uncompressed.length + ", UncompressTimeTaken: "
+ + sw.elapsedMillis());
+ }
+ return uncompressed;
+ }
+
+// private static byte[] compressBytesSnappy(byte[] inBytes) throws IOException {
+// return Snappy.compress(inBytes);
+// }
+//
+// private static byte[] uncompressBytesSnappy(byte[] inBytes) throws IOException {
+// return Snappy.uncompress(inBytes);
+// }
+
+ private static byte[] compressBytesInflateDeflate(byte[] inBytes) {
+ Deflater deflater = new Deflater(Deflater.BEST_SPEED);
+ deflater.setInput(inBytes);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length);
+ deflater.finish();
+ byte[] buffer = new byte[1024 * 8];
+ while (!deflater.finished()) {
+ int count = deflater.deflate(buffer);
+ bos.write(buffer, 0, count);
+ }
+ byte[] output = bos.toByteArray();
+ return output;
+ }
+
+ private static byte[] uncompressBytesInflateDeflate(byte[] inBytes) throws IOException {
+ Inflater inflater = new Inflater();
+ inflater.setInput(inBytes);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length);
+ byte[] buffer = new byte[1024 * 8];
+ while (!inflater.finished()) {
+ int count;
+ try {
+ count = inflater.inflate(buffer);
+ } catch (DataFormatException e) {
+ throw new IOException(e);
+ }
+ bos.write(buffer, 0, count);
+ }
+ byte[] output = bos.toByteArray();
+ return output;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/59539dde/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 6736467..98645f2 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -19,7 +19,6 @@
package org.apache.tez.mapreduce.hadoop;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;