You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by rv...@apache.org on 2014/10/26 02:21:52 UTC

[09/47] git commit: updated refs/heads/release-1.1 to 4c139ee

GIRAPH-713: Provide an option to do request compression (pavanka)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4223ccc0
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4223ccc0
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4223ccc0

Branch: refs/heads/release-1.1
Commit: 4223ccc08bcd3689bddb310dddedab0485f7a6bd
Parents: 666d5fd
Author: Pavan Kumar <pa...@fb.com>
Authored: Mon Jul 7 16:48:08 2014 -0700
Committer: Pavan Kumar <pa...@fb.com>
Committed: Mon Jul 7 16:48:08 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  2 +
 .../apache/giraph/comm/netty/NettyClient.java   | 21 +++++++-
 .../apache/giraph/comm/netty/NettyServer.java   | 20 ++++++++
 .../org/apache/giraph/conf/GiraphConstants.java |  5 ++
 .../ImmutableClassesGiraphConfiguration.java    | 54 ++++++++++++++++++++
 5 files changed, 101 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/4223ccc0/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 43aea7a..ea2f911 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-713: Provide an option to do request compression (pavanka)
+
   GIRAPH-923: Upgrade Netty version to a latest stable one (pavanka)
 
   GIRAPH-916: Wrong number of vertices stored reported to command line (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/4223ccc0/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index ae40c3b..5bb5545 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -268,11 +268,20 @@ public class NettyClient {
               PipelineUtils.addLastWithExecutorCheck("clientInboundByteCounter",
                   inboundByteCounter, handlerToUseExecutionGroup,
                   executionGroup, ch);
+              if (conf.doCompression()) {
+                PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
+                    conf.getNettyCompressionDecoder(),
+                    handlerToUseExecutionGroup, executionGroup, ch);
+              }
               PipelineUtils.addLastWithExecutorCheck(
                   "clientOutboundByteCounter",
                   outboundByteCounter, handlerToUseExecutionGroup,
                   executionGroup, ch);
-
+              if (conf.doCompression()) {
+                PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
+                    conf.getNettyCompressionEncoder(),
+                    handlerToUseExecutionGroup, executionGroup, ch);
+              }
               // The following pipeline component is needed to decode the
               // server's SASL tokens. It is replaced with a
               // FixedLengthFrameDecoder (same as used with the
@@ -303,10 +312,20 @@ public class NettyClient {
               PipelineUtils.addLastWithExecutorCheck("clientInboundByteCounter",
                   inboundByteCounter, handlerToUseExecutionGroup,
                   executionGroup, ch);
+              if (conf.doCompression()) {
+                PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
+                    conf.getNettyCompressionDecoder(),
+                    handlerToUseExecutionGroup, executionGroup, ch);
+              }
               PipelineUtils.addLastWithExecutorCheck(
                   "clientOutboundByteCounter",
                   outboundByteCounter, handlerToUseExecutionGroup,
                   executionGroup, ch);
+              if (conf.doCompression()) {
+                PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
+                    conf.getNettyCompressionEncoder(),
+                    handlerToUseExecutionGroup, executionGroup, ch);
+              }
               PipelineUtils.addLastWithExecutorCheck(
                   "fixed-length-frame-decoder",
                   new FixedLengthFrameDecoder(

http://git-wip-us.apache.org/repos/asf/giraph/blob/4223ccc0/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
index 14d4ea8..8162857 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
@@ -240,8 +240,18 @@ public class NettyServer {
           // configuration except for the presence of the Authorize component.
           PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
               inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
+          if (conf.doCompression()) {
+            PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
+                conf.getNettyCompressionDecoder(),
+                handlerToUseExecutionGroup, executionGroup, ch);
+          }
           PipelineUtils.addLastWithExecutorCheck("serverOutboundByteCounter",
               outByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
+          if (conf.doCompression()) {
+            PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
+                conf.getNettyCompressionEncoder(),
+                handlerToUseExecutionGroup, executionGroup, ch);
+          }
           PipelineUtils.addLastWithExecutorCheck("requestFrameDecoder",
               new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
               handlerToUseExecutionGroup, executionGroup, ch);
@@ -280,8 +290,18 @@ public class NettyServer {
               });
           PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
               inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
+          if (conf.doCompression()) {
+            PipelineUtils.addLastWithExecutorCheck("compressionDecoder",
+                conf.getNettyCompressionDecoder(),
+                handlerToUseExecutionGroup, executionGroup, ch);
+          }
           PipelineUtils.addLastWithExecutorCheck("serverOutboundByteCounter",
               outByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
+          if (conf.doCompression()) {
+            PipelineUtils.addLastWithExecutorCheck("compressionEncoder",
+                conf.getNettyCompressionEncoder(),
+                handlerToUseExecutionGroup, executionGroup, ch);
+          }
           PipelineUtils.addLastWithExecutorCheck("requestFrameDecoder",
               new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
               handlerToUseExecutionGroup, executionGroup, ch);

http://git-wip-us.apache.org/repos/asf/giraph/blob/4223ccc0/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 7d7ceb2..1879a25 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -613,6 +613,11 @@ public interface GiraphConstants {
       new BooleanConfOption("giraph.nettySimulateFirstResponseFailed", false,
           "Netty simulate a first response failed");
 
+  /** Netty - set which compression to use */
+  StrConfOption NETTY_COMPRESSION_ALGORITHM =
+      new StrConfOption("giraph.nettyCompressionAlgorithm", "",
+          "Which compression algorithm to use in netty");
+
   /** Max resolve address attempts */
   IntConfOption MAX_RESOLVE_ADDRESS_ATTEMPTS =
       new IntConfOption("giraph.maxResolveAddressAttempts", 5,

http://git-wip-us.apache.org/repos/asf/giraph/blob/4223ccc0/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 3d7b3db..3121fa8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -19,6 +19,12 @@
 package org.apache.giraph.conf;
 
 import com.google.common.base.Preconditions;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.handler.codec.compression.JdkZlibDecoder;
+import io.netty.handler.codec.compression.JdkZlibEncoder;
+import io.netty.handler.codec.compression.SnappyFramedDecoder;
+import io.netty.handler.codec.compression.SnappyFramedEncoder;
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.edge.Edge;
@@ -1220,4 +1226,52 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
     }
     classes.setMessageCombiner(superstepClasses.getMessageCombinerClass());
   }
+
+  /**
+   * Has the user enabled compression in netty client & server
+   *
+   * @return true if ok to do compression of netty requests
+   */
+  public boolean doCompression() {
+    switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
+    case "SNAPPY":
+      return true;
+    case "INFLATE":
+      return true;
+    default:
+      return false;
+    }
+  }
+
+  /**
+   * Get encoder for message compression in netty
+   *
+   * @return message to byte encoder
+   */
+  public MessageToByteEncoder getNettyCompressionEncoder() {
+    switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
+    case "SNAPPY":
+      return new SnappyFramedEncoder();
+    case "INFLATE":
+      return new JdkZlibEncoder();
+    default:
+      return null;
+    }
+  }
+
+  /**
+   * Get decoder for message decompression in netty
+   *
+   * @return byte to message decoder
+   */
+  public ByteToMessageDecoder getNettyCompressionDecoder() {
+    switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
+    case "SNAPPY":
+      return new SnappyFramedDecoder(true);
+    case "INFLATE":
+      return new JdkZlibDecoder();
+    default:
+      return null;
+    }
+  }
 }