You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by di...@apache.org on 2019/11/19 02:57:03 UTC

[giraph] branch trunk updated: GIRAPH-1228

This is an automated email from the ASF dual-hosted git repository.

dionysios pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/giraph.git


The following commit(s) were added to refs/heads/trunk by this push:
     new acc1f71  GIRAPH-1228
acc1f71 is described below

commit acc1f71af03dec6f1a1c83fb0fcce08451e140ea
Author: Dionysios Logothetis <dl...@gmail.com>
AuthorDate: Mon Nov 18 18:56:36 2019 -0800

    GIRAPH-1228
    
    closes #114
---
 .../org/apache/giraph/comm/netty/NettyClient.java  | 32 ++++++++++++++--------
 .../org/apache/giraph/comm/netty/NettyServer.java  |  9 ++++++
 .../comm/netty/handler/RequestServerHandler.java   | 26 ------------------
 .../apache/giraph/conf/FacebookConfiguration.java  |  2 --
 .../org/apache/giraph/conf/GiraphConstants.java    |  9 ------
 .../conf/ImmutableClassesGiraphConfiguration.java  |  2 +-
 pom.xml                                            |  2 +-
 7 files changed, 32 insertions(+), 50 deletions(-)

diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 61b7aa5..3c0363c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.comm.netty;
 
+import io.netty.handler.flush.FlushConsolidationHandler;
 import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
 import org.apache.giraph.comm.flow_control.FlowControl;
 import org.apache.giraph.comm.flow_control.NoOpFlowControl;
@@ -252,9 +253,9 @@ public class NettyClient {
    *                         terminate job.
    */
   public NettyClient(Mapper<?, ?, ?, ?>.Context context,
-                     final ImmutableClassesGiraphConfiguration conf,
-                     TaskInfo myTaskInfo,
-                     final Thread.UncaughtExceptionHandler exceptionHandler) {
+    final ImmutableClassesGiraphConfiguration conf, TaskInfo myTaskInfo,
+    final Thread.UncaughtExceptionHandler exceptionHandler) {
+
     this.context = context;
     this.myTaskInfo = myTaskInfo;
     this.channelsPerServer = GiraphConstants.CHANNELS_PER_SERVER.get(conf);
@@ -280,16 +281,13 @@ public class NettyClient {
 
     initialiseCounters();
     networkRequestsResentForTimeout =
-        new GiraphHadoopCounter(context.getCounter(
-            NETTY_COUNTERS_GROUP,
+        new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
             NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME));
     networkRequestsResentForChannelFailure =
-        new GiraphHadoopCounter(context.getCounter(
-            NETTY_COUNTERS_GROUP,
+        new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
             NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME));
     networkRequestsResentForConnectionFailure =
-      new GiraphHadoopCounter(context.getCounter(
-        NETTY_COUNTERS_GROUP,
+      new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
         NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME));
 
     maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
@@ -343,6 +341,10 @@ public class NettyClient {
             if (conf.authenticate()) {
               LOG.info("Using Netty with authentication.");
 
+              PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
+                new FlushConsolidationHandler(FlushConsolidationHandler
+                  .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
+                handlerToUseExecutionGroup, executionGroup, ch);
               // Our pipeline starts with just byteCounter, and then we use
               // addLast() to incrementally add pipeline elements, so that we
               // can name them for identification for removal or replacement
@@ -394,6 +396,10 @@ public class NettyClient {
             } else {
               LOG.info("Using Netty without authentication.");
 /*end[HADOOP_NON_SECURE]*/
+              PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
+                new FlushConsolidationHandler(FlushConsolidationHandler
+                    .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
+                handlerToUseExecutionGroup, executionGroup, ch);
               PipelineUtils.addLastWithExecutorCheck("clientInboundByteCounter",
                   inboundByteCounter, handlerToUseExecutionGroup,
                   executionGroup, ch);
@@ -864,13 +870,17 @@ public class NettyClient {
   }
 
   /**
-   * Write request to a channel for its destination
+   * Write request to a channel for its destination.
+   *
+   * Whenever we write to the channel, we also call flush, but we have added a
+   * {@link FlushConsolidationHandler} in the pipeline, which batches the
+   * flushes.
    *
    * @param requestInfo Request info
    */
   private void writeRequestToChannel(RequestInfo requestInfo) {
     Channel channel = getNextChannel(requestInfo.getDestinationAddress());
-    ChannelFuture writeFuture = channel.write(requestInfo.getRequest());
+    ChannelFuture writeFuture = channel.writeAndFlush(requestInfo.getRequest());
     requestInfo.setWriteFuture(writeFuture);
     writeFuture.addListener(logErrorListener);
   }
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
index dabd175..f44bf33 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.comm.netty;
 
+import io.netty.handler.flush.FlushConsolidationHandler;
 import org.apache.giraph.comm.flow_control.FlowControl;
 /*if_not[HADOOP_NON_SECURE]*/
 import org.apache.giraph.comm.netty.handler.AuthorizeServerHandler;
@@ -257,6 +258,10 @@ public class NettyServer {
           // pipeline components SaslServerHandler and ResponseEncoder are
           // removed, leaving the pipeline the same as in the non-authenticated
           // configuration except for the presence of the Authorize component.
+          PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
+            new FlushConsolidationHandler(FlushConsolidationHandler
+              .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
+            handlerToUseExecutionGroup, executionGroup, ch);
           PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
               inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
           if (conf.doCompression()) {
@@ -307,6 +312,10 @@ public class NettyServer {
                   ctx.fireChannelActive();
                 }
               });
+          PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
+            new FlushConsolidationHandler(FlushConsolidationHandler
+              .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
+            handlerToUseExecutionGroup, executionGroup, ch);
           PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
               inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
           if (conf.doCompression()) {
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
index 7bb4464..49b8e6d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
@@ -20,7 +20,6 @@ package org.apache.giraph.comm.netty.handler;
 
 import org.apache.giraph.comm.flow_control.FlowControl;
 import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.TaskInfo;
 import org.apache.giraph.time.SystemTime;
@@ -32,8 +31,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED;
 
 /**
@@ -64,10 +61,6 @@ public abstract class RequestServerHandler<R> extends
   private long startProcessingNanoseconds = -1;
   /** Handler for uncaught exceptions */
   private final Thread.UncaughtExceptionHandler exceptionHandler;
-  /** Whether it is the first time reading/handling a request*/
-  private final AtomicBoolean firstRead = new AtomicBoolean(true);
-  /** Cached value for NETTY_AUTO_READ configuration option */
-  private final boolean nettyAutoRead;
 
   /**
    * Constructor
@@ -86,7 +79,6 @@ public abstract class RequestServerHandler<R> extends
     closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
     this.myTaskInfo = myTaskInfo;
     this.exceptionHandler = exceptionHandler;
-    this.nettyAutoRead = GiraphConstants.NETTY_AUTO_READ.get(conf);
   }
 
   @Override
@@ -141,24 +133,6 @@ public abstract class RequestServerHandler<R> extends
         flowControl.calculateResponse(alreadyDone, request.getClientId());
     buffer.writeInt(signal);
     ctx.write(buffer);
-    // NettyServer is bootstrapped with auto-read set to true by default. After
-    // the first request is processed, we set auto-read to false. This prevents
-    // netty from reading requests continuously and putting them in off-heap
-    // memory. Instead, we will call `read` on requests one by one, so that the
-    // lower level transport layer handles the congestion if the rate of
-    // incoming requests is more than the available processing capability.
-    if (!nettyAutoRead && firstRead.compareAndSet(true, false)) {
-      ctx.channel().config().setAutoRead(false);
-    }
-  }
-
-  @Override
-  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
-    if (!nettyAutoRead) {
-      ctx.read();
-    } else {
-      super.channelReadComplete(ctx);
-    }
   }
 
   /**
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/FacebookConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/FacebookConfiguration.java
index e11f3d0..04064b9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/FacebookConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/FacebookConfiguration.java
@@ -132,8 +132,6 @@ public class FacebookConfiguration implements BulkConfigurator {
     StaticFlowControl.MAX_NUMBER_OF_OPEN_REQUESTS.setIfUnset(conf, 100);
     // Pooled allocator in netty is faster
     GiraphConstants.NETTY_USE_POOLED_ALLOCATOR.setIfUnset(conf, true);
-    // Turning off auto read is faster
-    GiraphConstants.NETTY_AUTO_READ.setIfUnset(conf, false);
 
     // Synchronize full gc calls across workers
     MemoryObserver.USE_MEMORY_OBSERVER.setIfUnset(conf, true);
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index c0af192..2236092 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -653,15 +653,6 @@ public interface GiraphConstants {
       new StrConfOption("giraph.nettyCompressionAlgorithm", "",
           "Which compression algorithm to use in netty");
 
-  /**
-   * Whether netty should pro-actively read requests and feed them to its
-   * processing pipeline
-   */
-  BooleanConfOption NETTY_AUTO_READ =
-      new BooleanConfOption("giraph.nettyAutoRead", true,
-          "Whether netty should pro-actively read requests and feed them to " +
-              "its processing pipeline");
-
   /** Max resolve address attempts */
   IntConfOption MAX_RESOLVE_ADDRESS_ATTEMPTS =
       new IntConfOption("giraph.maxResolveAddressAttempts", 5,
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 7f04e54..d244d20 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -1355,7 +1355,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   public ByteToMessageDecoder getNettyCompressionDecoder() {
     switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
     case "SNAPPY":
-      return new SnappyFramedDecoder(true);
+      return new SnappyFramedDecoder();
     case "INFLATE":
       return new JdkZlibDecoder();
     default:
diff --git a/pom.xml b/pom.xml
index 8e728e1..c70dd75 100644
--- a/pom.xml
+++ b/pom.xml
@@ -350,7 +350,7 @@ under the License.
     <dep.log4j.version>1.2.17</dep.log4j.version>
     <dep.mockito.version>1.9.5</dep.mockito.version>
     <!-- note: old version of netty is required by hadoop_facebook for tests to succeed -->
-    <dep.netty.version>4.0.14.Final</dep.netty.version>
+    <dep.netty.version>4.1.36.Final</dep.netty.version>
     <dep.oldnetty.version>3.2.2.Final</dep.oldnetty.version>
     <dep.objenesis.version>2.2</dep.objenesis.version>
     <dep.openhft-compiler.version>2.2.1</dep.openhft-compiler.version>