You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/09/26 21:58:54 UTC
svn commit: r1390709 - in
/giraph/trunk/src/main/java/org/apache/giraph/comm/netty: NettyClient.java
NettyServer.java WrappedAdaptiveReceiveBufferSizePredictorFactory.java
handler/RequestEncoder.java
Author: aching
Date: Wed Sep 26 19:58:54 2012
New Revision: 1390709
URL: http://svn.apache.org/viewvc?rev=1390709&view=rev
Log:
GIRAPH-340: Added client/server ExecutionHandlers to Netty to avoid
and added WrappedAdaptiveReceiveBufferSizePredictorFactory to
debug/predict the size of the incoming messages.
Added:
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/WrappedAdaptiveReceiveBufferSizePredictorFactory.java
Modified:
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java?rev=1390709&r1=1390708&r2=1390709&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyClient.java Wed Sep 26 19:58:54 2012
@@ -50,9 +50,12 @@ import org.jboss.netty.channel.ChannelFu
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.FixedLengthFrameDecoder;
+import org.jboss.netty.handler.execution.ExecutionHandler;
+import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
+
+import static org.jboss.netty.channel.Channels.pipeline;
/**
* Netty client for sending requests.
@@ -120,6 +123,12 @@ public class NettyClient {
new AddressRequestIdGenerator();
/** Client id */
private final int clientId;
+ /** Maximum thread pool size */
+ private final int maxPoolSize;
+ /** Execution handler (if used) */
+ private final ExecutionHandler executionHandler;
+ /** Name of the handler before the execution handler (if used) */
+ private final String handlerBeforeExecutionHandler;
/**
* Only constructor
@@ -171,10 +180,34 @@ public class NettyClient {
GiraphConfiguration.WAITING_REQUEST_MSECS,
GiraphConfiguration.WAITING_REQUEST_MSECS_DEFAULT);
- int maxThreads = conf.getInt(GiraphConfiguration.MSG_NUM_FLUSH_THREADS,
- NettyServer.MAXIMUM_THREAD_POOL_SIZE_DEFAULT);
+ maxPoolSize = conf.getInt(
+ GiraphConfiguration.NETTY_CLIENT_THREADS,
+ GiraphConfiguration.NETTY_CLIENT_THREADS_DEFAULT);
+
clientRequestIdRequestInfoMap =
- new MapMaker().concurrencyLevel(maxThreads).makeMap();
+ new MapMaker().concurrencyLevel(maxPoolSize).makeMap();
+
+ handlerBeforeExecutionHandler = conf.get(
+ GiraphConfiguration.NETTY_CLIENT_EXECUTION_AFTER_HANDLER,
+ GiraphConfiguration.NETTY_CLIENT_EXECUTION_AFTER_HANDLER_DEFAULT);
+ boolean useExecutionHandler = conf.getBoolean(
+ GiraphConfiguration.NETTY_CLIENT_USE_EXECUTION_HANDLER,
+ GiraphConfiguration.NETTY_CLIENT_USE_EXECUTION_HANDLER_DEFAULT);
+ if (useExecutionHandler) {
+ int executionThreads = conf.getInt(
+ GiraphConfiguration.NETTY_CLIENT_EXECUTION_THREADS,
+ GiraphConfiguration.NETTY_CLIENT_EXECUTION_THREADS_DEFAULT);
+ executionHandler = new ExecutionHandler(
+ new MemoryAwareThreadPoolExecutor(
+ executionThreads, 1048576, 1048576));
+ if (LOG.isInfoEnabled()) {
+ LOG.info("NettyClient: Using execution handler with " +
+ executionThreads + " threads after " +
+ handlerBeforeExecutionHandler + ".");
+ }
+ } else {
+ executionHandler = null;
+ }
bossExecutorService = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat(
@@ -190,7 +223,7 @@ public class NettyClient {
new NioClientSocketChannelFactory(
bossExecutorService,
workerExecutorService,
- maxThreads));
+ maxPoolSize));
bootstrap.setOption("connectTimeoutMillis",
MAX_CONNECTION_MILLISECONDS_DEFAULT);
bootstrap.setOption("tcpNoDelay", true);
@@ -202,13 +235,31 @@ public class NettyClient {
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
- return Channels.pipeline(
- byteCounter,
- new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES),
- new RequestEncoder(),
+ ChannelPipeline pipeline = pipeline();
+ pipeline.addLast("clientByteCounter", byteCounter);
+ pipeline.addLast("responseFrameDecoder",
+ new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES));
+ pipeline.addLast("requestEncoder", new RequestEncoder(conf.getInt(
+ GiraphConfiguration.NETTY_REQUEST_ENCODER_BUFFER_SIZE,
+ GiraphConfiguration.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT)));
+ pipeline.addLast("responseClientHandler",
new ResponseClientHandler(clientRequestIdRequestInfoMap, conf));
+
+ if (executionHandler != null) {
+ pipeline.addAfter(handlerBeforeExecutionHandler,
+ "executionHandler", executionHandler);
+ }
+ return pipeline;
}
});
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("NettyClient: Started client" +
+ " with up to " + maxPoolSize + " threads" +
+ " with sendBufferSize = " + sendBufferSize +
+ " receiveBufferSize = " + receiveBufferSize +
+ " maxRequestMilliseconds = " + maxRequestMilliseconds);
+ }
}
/**
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java?rev=1390709&r1=1390708&r2=1390709&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java Wed Sep 26 19:58:54 2012
@@ -36,14 +36,16 @@ import org.jboss.netty.channel.ChannelEx
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.jboss.netty.handler.execution.ExecutionHandler;
+import org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor;
+import static org.jboss.netty.channel.Channels.pipeline;
/**
* This server uses Netty and will implement all Giraph communication
*/
@@ -63,7 +65,7 @@ public class NettyServer {
/** Address of the server */
private InetSocketAddress myAddress;
/** Maximum number of threads */
- private final int maximumPoolSize;
+ private final int maxPoolSize;
/** TCP backlog */
private final int tcpBacklog;
/** Factory for {@link RequestServerHandler} */
@@ -82,6 +84,10 @@ public class NettyServer {
private final ExecutorService workerExecutorService;
/** Request completed map per worker */
private final WorkerRequestReservedMap workerRequestReservedMap;
+ /** Execution handler (if used) */
+ private final ExecutionHandler executionHandler;
+ /** Name of the handler before the execution handler (if used) */
+ private final String handlerBeforeExecutionHandler;
/**
* Constructor for creating the server
@@ -115,8 +121,10 @@ public class NettyServer {
} catch (UnknownHostException e) {
throw new IllegalStateException("NettyServer: unable to get hostname");
}
- maximumPoolSize = conf.getInt(GiraphConfiguration.MSG_NUM_FLUSH_THREADS,
- MAXIMUM_THREAD_POOL_SIZE_DEFAULT);
+
+ maxPoolSize = conf.getInt(
+ GiraphConfiguration.NETTY_SERVER_THREADS,
+ GiraphConfiguration.NETTY_SERVER_THREADS_DEFAULT);
tcpBacklog = conf.getInt(GiraphConfiguration.TCP_BACKLOG,
conf.getInt(GiraphConfiguration.MAX_WORKERS,
@@ -125,7 +133,27 @@ public class NettyServer {
channelFactory = new NioServerSocketChannelFactory(
bossExecutorService,
workerExecutorService,
- maximumPoolSize);
+ maxPoolSize);
+
+ handlerBeforeExecutionHandler = conf.get(
+ GiraphConfiguration.NETTY_SERVER_EXECUTION_AFTER_HANDLER,
+ GiraphConfiguration.NETTY_SERVER_EXECUTION_AFTER_HANDLER_DEFAULT);
+ boolean useExecutionHandler = conf.getBoolean(
+ GiraphConfiguration.NETTY_SERVER_USE_EXECUTION_HANDLER,
+ GiraphConfiguration.NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT);
+ if (useExecutionHandler) {
+ int executionThreads = conf.getNettyServerExecutionThreads();
+ executionHandler = new ExecutionHandler(
+ new MemoryAwareThreadPoolExecutor(
+ executionThreads, 1048576, 1048576));
+ if (LOG.isInfoEnabled()) {
+ LOG.info("NettyServer: Using execution handler with " +
+ executionThreads + " threads after " +
+ handlerBeforeExecutionHandler + ".");
+ }
+ } else {
+ executionHandler = null;
+ }
}
/**
@@ -139,15 +167,31 @@ public class NettyServer {
bootstrap.setOption("child.sendBufferSize", sendBufferSize);
bootstrap.setOption("child.receiveBufferSize", receiveBufferSize);
bootstrap.setOption("backlog", tcpBacklog);
+ bootstrap.setOption("child.receiveBufferSizePredictorFactory",
+ new WrappedAdaptiveReceiveBufferSizePredictorFactory(
+ receiveBufferSize / 4,
+ receiveBufferSize,
+ receiveBufferSize));
+
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
- return Channels.pipeline(
- byteCounter,
- new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 0, 4, 0, 4),
- new RequestDecoder(conf, byteCounter),
- requestServerHandlerFactory.newHandler(workerRequestReservedMap,
- conf));
+ ChannelPipeline pipeline = pipeline();
+
+ pipeline.addLast("serverByteCounter", byteCounter);
+ pipeline.addLast("requestFrameDecoder",
+ new LengthFieldBasedFrameDecoder(
+ 1024 * 1024 * 1024, 0, 4, 0, 4));
+ pipeline.addLast("requestDecoder",
+ new RequestDecoder(conf, byteCounter));
+ pipeline.addLast("requestProcessor",
+ requestServerHandlerFactory.newHandler(
+ workerRequestReservedMap, conf));
+ if (executionHandler != null) {
+ pipeline.addAfter(handlerBeforeExecutionHandler,
+ "executionHandler", executionHandler);
+ }
+ return pipeline;
}
});
@@ -206,7 +250,7 @@ public class NettyServer {
if (LOG.isInfoEnabled()) {
LOG.info("start: Started server " +
"communication server: " + myAddress + " with up to " +
- maximumPoolSize + " threads on bind attempt " + bindAttempts +
+ maxPoolSize + " threads on bind attempt " + bindAttempts +
" with sendBufferSize = " + sendBufferSize +
" receiveBufferSize = " + receiveBufferSize + " backlog = " +
bootstrap.getOption("backlog"));
Added: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/WrappedAdaptiveReceiveBufferSizePredictorFactory.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/WrappedAdaptiveReceiveBufferSizePredictorFactory.java?rev=1390709&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/WrappedAdaptiveReceiveBufferSizePredictorFactory.java (added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/WrappedAdaptiveReceiveBufferSizePredictorFactory.java Wed Sep 26 19:58:54 2012
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.netty;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictor;
+import org.jboss.netty.channel.ReceiveBufferSizePredictor;
+import org.jboss.netty.channel.ReceiveBufferSizePredictorFactory;
+
+/**
+ * Uses composition to learn more about what
+ * AdaptiveReceiveBufferSizePredictor has determined what the actual
+ * sizes are.
+ */
+public class WrappedAdaptiveReceiveBufferSizePredictorFactory implements
+ ReceiveBufferSizePredictorFactory {
+ /** Internal predictor */
+ private final ReceiveBufferSizePredictor receiveBufferSizePredictor;
+
+ /**
+ * Constructor with defaults.
+ */
+ public WrappedAdaptiveReceiveBufferSizePredictorFactory() {
+ receiveBufferSizePredictor =
+ new WrappedAdaptiveReceiveBufferSizePredictor();
+ }
+
+ /**
+ * Creates a new predictor with the specified parameters.
+ *
+ * @param minimum The inclusive lower bound of the expected buffer size
+ * @param initial The initial buffer size when no feed back was received
+ * @param maximum The inclusive upper bound of the expected buffer size
+ */
+ public WrappedAdaptiveReceiveBufferSizePredictorFactory(int minimum,
+ int initial,
+ int maximum) {
+ receiveBufferSizePredictor = new WrappedAdaptiveReceiveBufferSizePredictor(
+ minimum, initial, maximum);
+ }
+
+ @Override
+ public ReceiveBufferSizePredictor getPredictor() throws Exception {
+ return receiveBufferSizePredictor;
+ }
+
+ /**
+ * Uses composition to expose
+ * details of AdaptiveReceiveBufferSizePredictor.
+ */
+ private static class WrappedAdaptiveReceiveBufferSizePredictor implements
+ ReceiveBufferSizePredictor {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(
+ WrappedAdaptiveReceiveBufferSizePredictor.class);
+ /** Internally delegated predictor */
+ private final AdaptiveReceiveBufferSizePredictor
+ adaptiveReceiveBufferSizePredictor;
+ /** Number of calls to nextReceiveBufferSize() */
+ private long nextReceiveBufferSizeCount = 0;
+ /** Number of calls to previousReceiveBufferSize() */
+ private long previousReceiveBufferSizeCount = 0;
+
+ /**
+ * Creates a new predictor with the default parameters. With the default
+ * parameters, the expected buffer size starts from {@code 1024}, does not
+ * go down below {@code 64}, and does not go up above {@code 65536}.
+ */
+ public WrappedAdaptiveReceiveBufferSizePredictor() {
+ adaptiveReceiveBufferSizePredictor =
+ new AdaptiveReceiveBufferSizePredictor();
+ }
+
+ /**
+ * Creates a new predictor with the specified parameters.
+ *
+ * @param minimum the inclusive lower bound of the expected buffer size
+ * @param initial the initial buffer size when no feed back was received
+ * @param maximum the inclusive upper bound of the expected buffer size
+ */
+ public WrappedAdaptiveReceiveBufferSizePredictor(int minimum,
+ int initial,
+ int maximum) {
+ adaptiveReceiveBufferSizePredictor =
+ new AdaptiveReceiveBufferSizePredictor(minimum, initial, maximum);
+ }
+
+ @Override
+ public int nextReceiveBufferSize() {
+ int nextReceiveBufferSize =
+ adaptiveReceiveBufferSizePredictor.nextReceiveBufferSize();
+ if (LOG.isDebugEnabled()) {
+ if (nextReceiveBufferSizeCount % 1000 == 0) {
+ LOG.debug("nextReceiveBufferSize: size " +
+ nextReceiveBufferSize + " " +
+ "count " + nextReceiveBufferSizeCount);
+ }
+ ++nextReceiveBufferSizeCount;
+ }
+ return nextReceiveBufferSize;
+ }
+
+ @Override
+ public void previousReceiveBufferSize(int previousReceiveBufferSize) {
+ if (LOG.isDebugEnabled()) {
+ if (previousReceiveBufferSizeCount % 1000 == 0) {
+ LOG.debug("previousReceiveBufferSize: size " +
+ previousReceiveBufferSize +
+ ", count " + previousReceiveBufferSizeCount);
+ }
+ ++previousReceiveBufferSizeCount;
+ }
+ adaptiveReceiveBufferSizePredictor.previousReceiveBufferSize(
+ previousReceiveBufferSize);
+ }
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java?rev=1390709&r1=1390708&r2=1390709&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java Wed Sep 26 19:58:54 2012
@@ -35,20 +35,31 @@ public class RequestEncoder extends OneT
private static final Logger LOG = Logger.getLogger(RequestEncoder.class);
/** Holds the place of the message length until known */
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
+ /** Buffer starting size */
+ private final int bufferStartingSize;
+
+ /**
+ * Constructor.
+ *
+ * @param bufferStartingSize Starting size of the buffer
+ */
+ public RequestEncoder(int bufferStartingSize) {
+ this.bufferStartingSize = bufferStartingSize;
+ }
@Override
protected Object encode(ChannelHandlerContext ctx,
- Channel channel, Object msg) throws Exception {
+ Channel channel, Object msg) throws Exception {
if (!(msg instanceof WritableRequest)) {
throw new IllegalArgumentException(
"encode: Got a message of type " + msg.getClass());
}
- @SuppressWarnings("unchecked")
+
WritableRequest writableRequest = (WritableRequest) msg;
ChannelBufferOutputStream outputStream =
new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer(
- 10, ctx.getChannel().getConfig().getBufferFactory()));
-
+ bufferStartingSize,
+ ctx.getChannel().getConfig().getBufferFactory()));
outputStream.write(LENGTH_PLACEHOLDER);
outputStream.writeByte(writableRequest.getType().ordinal());
writableRequest.write(outputStream);