You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/09/26 21:58:54 UTC

svn commit: r1390709 - in /giraph/trunk/src/main/java/org/apache/giraph/comm/netty: NettyClient.java NettyServer.java WrappedAdaptiveReceiveBufferSizePredictorFactory.java handler/RequestEncoder.java

Author: aching
Date: Wed Sep 26 19:58:54 2012
New Revision: 1390709

URL: http://svn.apache.org/viewvc?rev=1390709&view=rev
Log:
GIRAPH-340: Added client/server ExecutionHandlers to Netty to avoid
and added WrappedAdaptiveReceiveBufferSizePredictorFactory to
debug/predict the size of the incoming messages.

Added:
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/WrappedAdaptiveReceiveBufferSizePredictorFactory.java
Modified:
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java?rev=1390709&r1=1390708&r2=1390709&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java Wed Sep 26 19:58:54 2012
@@ -50,9 +50,12 @@ import org.jboss.netty.channel.ChannelFu
 import org.jboss.netty.channel.ChannelFutureListener;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.handler.codec.frame.FixedLengthFrameDecoder;
+import org.jboss.netty.handler.execution.ExecutionHandler;
+import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
+
+import static org.jboss.netty.channel.Channels.pipeline;
 
 /**
  * Netty client for sending requests.
@@ -120,6 +123,12 @@ public class NettyClient {
       new AddressRequestIdGenerator();
   /** Client id */
   private final int clientId;
+  /** Maximum thread pool size */
+  private final int maxPoolSize;
+  /** Execution handler (if used) */
+  private final ExecutionHandler executionHandler;
+  /** Name of the handler before the execution handler (if used) */
+  private final String handlerBeforeExecutionHandler;
 
   /**
    * Only constructor
@@ -171,10 +180,34 @@ public class NettyClient {
         GiraphConfiguration.WAITING_REQUEST_MSECS,
         GiraphConfiguration.WAITING_REQUEST_MSECS_DEFAULT);
 
-    int maxThreads = conf.getInt(GiraphConfiguration.MSG_NUM_FLUSH_THREADS,
-        NettyServer.MAXIMUM_THREAD_POOL_SIZE_DEFAULT);
+    maxPoolSize = conf.getInt(
+        GiraphConfiguration.NETTY_CLIENT_THREADS,
+        GiraphConfiguration.NETTY_CLIENT_THREADS_DEFAULT);
+
     clientRequestIdRequestInfoMap =
-        new MapMaker().concurrencyLevel(maxThreads).makeMap();
+        new MapMaker().concurrencyLevel(maxPoolSize).makeMap();
+
+    handlerBeforeExecutionHandler = conf.get(
+        GiraphConfiguration.NETTY_CLIENT_EXECUTION_AFTER_HANDLER,
+        GiraphConfiguration.NETTY_CLIENT_EXECUTION_AFTER_HANDLER_DEFAULT);
+    boolean useExecutionHandler = conf.getBoolean(
+        GiraphConfiguration.NETTY_CLIENT_USE_EXECUTION_HANDLER,
+        GiraphConfiguration.NETTY_CLIENT_USE_EXECUTION_HANDLER_DEFAULT);
+    if (useExecutionHandler) {
+      int executionThreads = conf.getInt(
+          GiraphConfiguration.NETTY_CLIENT_EXECUTION_THREADS,
+          GiraphConfiguration.NETTY_CLIENT_EXECUTION_THREADS_DEFAULT);
+      executionHandler = new ExecutionHandler(
+          new MemoryAwareThreadPoolExecutor(
+              executionThreads, 1048576, 1048576));
+      if (LOG.isInfoEnabled()) {
+        LOG.info("NettyClient: Using execution handler with " +
+            executionThreads + " threads after " +
+            handlerBeforeExecutionHandler + ".");
+      }
+    } else {
+      executionHandler = null;
+    }
 
     bossExecutorService = Executors.newCachedThreadPool(
         new ThreadFactoryBuilder().setNameFormat(
@@ -190,7 +223,7 @@ public class NettyClient {
         new NioClientSocketChannelFactory(
             bossExecutorService,
             workerExecutorService,
-            maxThreads));
+            maxPoolSize));
     bootstrap.setOption("connectTimeoutMillis",
         MAX_CONNECTION_MILLISECONDS_DEFAULT);
     bootstrap.setOption("tcpNoDelay", true);
@@ -202,13 +235,31 @@ public class NettyClient {
     bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
       @Override
       public ChannelPipeline getPipeline() throws Exception {
-        return Channels.pipeline(
-            byteCounter,
-            new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES),
-            new RequestEncoder(),
+        ChannelPipeline pipeline = pipeline();
+        pipeline.addLast("clientByteCounter", byteCounter);
+        pipeline.addLast("responseFrameDecoder",
+            new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES));
+        pipeline.addLast("requestEncoder", new RequestEncoder(conf.getInt(
+            GiraphConfiguration.NETTY_REQUEST_ENCODER_BUFFER_SIZE,
+            GiraphConfiguration.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT)));
+        pipeline.addLast("responseClientHandler",
             new ResponseClientHandler(clientRequestIdRequestInfoMap, conf));
+
+        if (executionHandler != null) {
+          pipeline.addAfter(handlerBeforeExecutionHandler,
+              "executionHandler", executionHandler);
+        }
+        return pipeline;
       }
     });
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("NettyClient: Started client" +
+          " with up to " + maxPoolSize + " threads" +
+          " with sendBufferSize = " + sendBufferSize +
+          " receiveBufferSize = " + receiveBufferSize +
+          " maxRequestMilliseconds = " + maxRequestMilliseconds);
+    }
   }
 
   /**

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java?rev=1390709&r1=1390708&r2=1390709&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java Wed Sep 26 19:58:54 2012
@@ -36,14 +36,16 @@ import org.jboss.netty.channel.ChannelEx
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.jboss.netty.handler.execution.ExecutionHandler;
+import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
 
+import static org.jboss.netty.channel.Channels.pipeline;
 /**
  * This server uses Netty and will implement all Giraph communication
  */
@@ -63,7 +65,7 @@ public class NettyServer {
   /** Address of the server */
   private InetSocketAddress myAddress;
   /** Maximum number of threads */
-  private final int maximumPoolSize;
+  private final int maxPoolSize;
   /** TCP backlog */
   private final int tcpBacklog;
   /** Factory for {@link RequestServerHandler} */
@@ -82,6 +84,10 @@ public class NettyServer {
   private final ExecutorService workerExecutorService;
   /** Request completed map per worker */
   private final WorkerRequestReservedMap workerRequestReservedMap;
+  /** Execution handler (if used) */
+  private final ExecutionHandler executionHandler;
+  /** Name of the handler before the execution handler (if used) */
+  private final String handlerBeforeExecutionHandler;
 
   /**
    * Constructor for creating the server
@@ -115,8 +121,10 @@ public class NettyServer {
     } catch (UnknownHostException e) {
       throw new IllegalStateException("NettyServer: unable to get hostname");
     }
-    maximumPoolSize = conf.getInt(GiraphConfiguration.MSG_NUM_FLUSH_THREADS,
-                                  MAXIMUM_THREAD_POOL_SIZE_DEFAULT);
+
+    maxPoolSize = conf.getInt(
+        GiraphConfiguration.NETTY_SERVER_THREADS,
+        GiraphConfiguration.NETTY_SERVER_THREADS_DEFAULT);
 
     tcpBacklog = conf.getInt(GiraphConfiguration.TCP_BACKLOG,
         conf.getInt(GiraphConfiguration.MAX_WORKERS,
@@ -125,7 +133,27 @@ public class NettyServer {
     channelFactory = new NioServerSocketChannelFactory(
         bossExecutorService,
         workerExecutorService,
-        maximumPoolSize);
+        maxPoolSize);
+
+    handlerBeforeExecutionHandler = conf.get(
+        GiraphConfiguration.NETTY_SERVER_EXECUTION_AFTER_HANDLER,
+        GiraphConfiguration.NETTY_SERVER_EXECUTION_AFTER_HANDLER_DEFAULT);
+    boolean useExecutionHandler = conf.getBoolean(
+        GiraphConfiguration.NETTY_SERVER_USE_EXECUTION_HANDLER,
+        GiraphConfiguration.NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT);
+    if (useExecutionHandler) {
+      int executionThreads = conf.getNettyServerExecutionThreads();
+      executionHandler = new ExecutionHandler(
+          new MemoryAwareThreadPoolExecutor(
+              executionThreads, 1048576, 1048576));
+      if (LOG.isInfoEnabled()) {
+        LOG.info("NettyServer: Using execution handler with " +
+            executionThreads + " threads after " +
+            handlerBeforeExecutionHandler + ".");
+      }
+    } else {
+      executionHandler = null;
+    }
   }
 
   /**
@@ -139,15 +167,31 @@ public class NettyServer {
     bootstrap.setOption("child.sendBufferSize", sendBufferSize);
     bootstrap.setOption("child.receiveBufferSize", receiveBufferSize);
     bootstrap.setOption("backlog", tcpBacklog);
+    bootstrap.setOption("child.receiveBufferSizePredictorFactory",
+        new WrappedAdaptiveReceiveBufferSizePredictorFactory(
+            receiveBufferSize / 4,
+            receiveBufferSize,
+            receiveBufferSize));
+
     bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
       @Override
       public ChannelPipeline getPipeline() throws Exception {
-        return Channels.pipeline(
-            byteCounter,
-            new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
-            new RequestDecoder(conf, byteCounter),
-            requestServerHandlerFactory.newHandler(workerRequestReservedMap,
-                conf));
+        ChannelPipeline pipeline = pipeline();
+
+        pipeline.addLast("serverByteCounter", byteCounter);
+        pipeline.addLast("requestFrameDecoder",
+            new LengthFieldBasedFrameDecoder(
+                1024 * 1024 * 1024, 0, 4, 0, 4));
+        pipeline.addLast("requestDecoder",
+            new RequestDecoder(conf, byteCounter));
+        pipeline.addLast("requestProcessor",
+            requestServerHandlerFactory.newHandler(
+                workerRequestReservedMap, conf));
+        if (executionHandler != null) {
+          pipeline.addAfter(handlerBeforeExecutionHandler,
+              "executionHandler", executionHandler);
+        }
+        return pipeline;
       }
     });
 
@@ -206,7 +250,7 @@ public class NettyServer {
     if (LOG.isInfoEnabled()) {
       LOG.info("start: Started server " +
           "communication server: " + myAddress + " with up to " +
-          maximumPoolSize + " threads on bind attempt " + bindAttempts +
+          maxPoolSize + " threads on bind attempt " + bindAttempts +
           " with sendBufferSize = " + sendBufferSize +
           " receiveBufferSize = " + receiveBufferSize + " backlog = " +
           bootstrap.getOption("backlog"));

Added: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/WrappedAdaptiveReceiveBufferSizePredictorFactory.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/WrappedAdaptiveReceiveBufferSizePredictorFactory.java?rev=1390709&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/WrappedAdaptiveReceiveBufferSizePredictorFactory.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/WrappedAdaptiveReceiveBufferSizePredictorFactory.java Wed Sep 26 19:58:54 2012
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictor;
+import org.jboss.netty.channel.ReceiveBufferSizePredictor;
+import org.jboss.netty.channel.ReceiveBufferSizePredictorFactory;
+
+/**
+ * Uses composition to learn more about what
+ * AdaptiveReceiveBufferSizePredictor has determined what the actual
+ * sizes are.
+ */
+public class WrappedAdaptiveReceiveBufferSizePredictorFactory implements
+    ReceiveBufferSizePredictorFactory {
+  /** Internal predictor */
+  private final ReceiveBufferSizePredictor receiveBufferSizePredictor;
+
+  /**
+   * Constructor with defaults.
+   */
+  public WrappedAdaptiveReceiveBufferSizePredictorFactory() {
+    receiveBufferSizePredictor =
+        new WrappedAdaptiveReceiveBufferSizePredictor();
+  }
+
+  /**
+   * Creates a new predictor with the specified parameters.
+   *
+   * @param minimum The inclusive lower bound of the expected buffer size
+   * @param initial The initial buffer size when no feed back was received
+   * @param maximum The inclusive upper bound of the expected buffer size
+   */
+  public WrappedAdaptiveReceiveBufferSizePredictorFactory(int minimum,
+                                                          int initial,
+                                                          int maximum) {
+    receiveBufferSizePredictor = new WrappedAdaptiveReceiveBufferSizePredictor(
+        minimum, initial, maximum);
+  }
+
+  @Override
+  public ReceiveBufferSizePredictor getPredictor() throws Exception {
+    return receiveBufferSizePredictor;
+  }
+
+  /**
+   * Uses composition to expose
+   * details of AdaptiveReceiveBufferSizePredictor.
+   */
+  private static class WrappedAdaptiveReceiveBufferSizePredictor implements
+      ReceiveBufferSizePredictor {
+    /** Class logger */
+    private static final Logger LOG = Logger.getLogger(
+        WrappedAdaptiveReceiveBufferSizePredictor.class);
+    /** Internally delegated predictor */
+    private final AdaptiveReceiveBufferSizePredictor
+    adaptiveReceiveBufferSizePredictor;
+    /** Number of calls to nextReceiveBufferSize()  */
+    private long nextReceiveBufferSizeCount = 0;
+    /** Number of calls to previousReceiveBufferSize()  */
+    private long previousReceiveBufferSizeCount = 0;
+
+    /**
+     * Creates a new predictor with the default parameters.  With the default
+     * parameters, the expected buffer size starts from {@code 1024}, does not
+     * go down below {@code 64}, and does not go up above {@code 65536}.
+     */
+    public WrappedAdaptiveReceiveBufferSizePredictor() {
+      adaptiveReceiveBufferSizePredictor =
+          new AdaptiveReceiveBufferSizePredictor();
+    }
+
+    /**
+     * Creates a new predictor with the specified parameters.
+     *
+     * @param minimum  the inclusive lower bound of the expected buffer size
+     * @param initial  the initial buffer size when no feed back was received
+     * @param maximum  the inclusive upper bound of the expected buffer size
+     */
+    public WrappedAdaptiveReceiveBufferSizePredictor(int minimum,
+                                                     int initial,
+                                                     int maximum) {
+      adaptiveReceiveBufferSizePredictor =
+          new AdaptiveReceiveBufferSizePredictor(minimum, initial, maximum);
+    }
+
+    @Override
+    public int nextReceiveBufferSize() {
+      int nextReceiveBufferSize =
+          adaptiveReceiveBufferSizePredictor.nextReceiveBufferSize();
+      if (LOG.isDebugEnabled()) {
+        if (nextReceiveBufferSizeCount % 1000 == 0) {
+          LOG.debug("nextReceiveBufferSize: size " +
+              nextReceiveBufferSize +            " " +
+              "count " + nextReceiveBufferSizeCount);
+        }
+        ++nextReceiveBufferSizeCount;
+      }
+      return nextReceiveBufferSize;
+    }
+
+    @Override
+    public void previousReceiveBufferSize(int previousReceiveBufferSize) {
+      if (LOG.isDebugEnabled()) {
+        if (previousReceiveBufferSizeCount % 1000 == 0) {
+          LOG.debug("previousReceiveBufferSize: size " +
+              previousReceiveBufferSize +
+              ", count " + previousReceiveBufferSizeCount);
+        }
+        ++previousReceiveBufferSizeCount;
+      }
+      adaptiveReceiveBufferSizePredictor.previousReceiveBufferSize(
+          previousReceiveBufferSize);
+    }
+  }
+}

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java?rev=1390709&r1=1390708&r2=1390709&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java Wed Sep 26 19:58:54 2012
@@ -35,20 +35,31 @@ public class RequestEncoder extends OneT
   private static final Logger LOG = Logger.getLogger(RequestEncoder.class);
   /** Holds the place of the message length until known */
   private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
+  /** Buffer starting size */
+  private final int bufferStartingSize;
+
+  /**
+   * Constructor.
+   *
+   * @param bufferStartingSize Starting size of the buffer
+   */
+  public RequestEncoder(int bufferStartingSize) {
+    this.bufferStartingSize = bufferStartingSize;
+  }
 
   @Override
   protected Object encode(ChannelHandlerContext ctx,
-      Channel channel, Object msg) throws Exception {
+                          Channel channel, Object msg) throws Exception {
     if (!(msg instanceof WritableRequest)) {
       throw new IllegalArgumentException(
           "encode: Got a message of type " + msg.getClass());
     }
-    @SuppressWarnings("unchecked")
+
     WritableRequest writableRequest = (WritableRequest) msg;
     ChannelBufferOutputStream outputStream =
         new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer(
-            10, ctx.getChannel().getConfig().getBufferFactory()));
-
+            bufferStartingSize,
+            ctx.getChannel().getConfig().getBufferFactory()));
     outputStream.write(LENGTH_PLACEHOLDER);
     outputStream.writeByte(writableRequest.getType().ordinal());
     writableRequest.write(outputStream);