You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by di...@apache.org on 2019/11/19 02:57:03 UTC
[giraph] branch trunk updated: GIRAPH-1228
This is an automated email from the ASF dual-hosted git repository.
dionysios pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/giraph.git
The following commit(s) were added to refs/heads/trunk by this push:
new acc1f71 GIRAPH-1228
acc1f71 is described below
commit acc1f71af03dec6f1a1c83fb0fcce08451e140ea
Author: Dionysios Logothetis <dl...@gmail.com>
AuthorDate: Mon Nov 18 18:56:36 2019 -0800
GIRAPH-1228
closes #114
---
.../org/apache/giraph/comm/netty/NettyClient.java | 32 ++++++++++++++--------
.../org/apache/giraph/comm/netty/NettyServer.java | 9 ++++++
.../comm/netty/handler/RequestServerHandler.java | 26 ------------------
.../apache/giraph/conf/FacebookConfiguration.java | 2 --
.../org/apache/giraph/conf/GiraphConstants.java | 9 ------
.../conf/ImmutableClassesGiraphConfiguration.java | 2 +-
pom.xml | 2 +-
7 files changed, 32 insertions(+), 50 deletions(-)
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 61b7aa5..3c0363c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -18,6 +18,7 @@
package org.apache.giraph.comm.netty;
+import io.netty.handler.flush.FlushConsolidationHandler;
import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
import org.apache.giraph.comm.flow_control.FlowControl;
import org.apache.giraph.comm.flow_control.NoOpFlowControl;
@@ -252,9 +253,9 @@ public class NettyClient {
* terminate job.
*/
public NettyClient(Mapper<?, ?, ?, ?>.Context context,
- final ImmutableClassesGiraphConfiguration conf,
- TaskInfo myTaskInfo,
- final Thread.UncaughtExceptionHandler exceptionHandler) {
+ final ImmutableClassesGiraphConfiguration conf, TaskInfo myTaskInfo,
+ final Thread.UncaughtExceptionHandler exceptionHandler) {
+
this.context = context;
this.myTaskInfo = myTaskInfo;
this.channelsPerServer = GiraphConstants.CHANNELS_PER_SERVER.get(conf);
@@ -280,16 +281,13 @@ public class NettyClient {
initialiseCounters();
networkRequestsResentForTimeout =
- new GiraphHadoopCounter(context.getCounter(
- NETTY_COUNTERS_GROUP,
+ new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME));
networkRequestsResentForChannelFailure =
- new GiraphHadoopCounter(context.getCounter(
- NETTY_COUNTERS_GROUP,
+ new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME));
networkRequestsResentForConnectionFailure =
- new GiraphHadoopCounter(context.getCounter(
- NETTY_COUNTERS_GROUP,
+ new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME));
maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
@@ -343,6 +341,10 @@ public class NettyClient {
if (conf.authenticate()) {
LOG.info("Using Netty with authentication.");
+ PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
+ new FlushConsolidationHandler(FlushConsolidationHandler
+ .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
+ handlerToUseExecutionGroup, executionGroup, ch);
// Our pipeline starts with just byteCounter, and then we use
// addLast() to incrementally add pipeline elements, so that we
// can name them for identification for removal or replacement
@@ -394,6 +396,10 @@ public class NettyClient {
} else {
LOG.info("Using Netty without authentication.");
/*end[HADOOP_NON_SECURE]*/
+ PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
+ new FlushConsolidationHandler(FlushConsolidationHandler
+ .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
+ handlerToUseExecutionGroup, executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("clientInboundByteCounter",
inboundByteCounter, handlerToUseExecutionGroup,
executionGroup, ch);
@@ -864,13 +870,17 @@ public class NettyClient {
}
/**
- * Write request to a channel for its destination
+ * Write request to a channel for its destination.
+ *
+ * Whenever we write to the channel, we also call flush, but we have added a
+ * {@link FlushConsolidationHandler} in the pipeline, which batches the
+ * flushes.
*
* @param requestInfo Request info
*/
private void writeRequestToChannel(RequestInfo requestInfo) {
Channel channel = getNextChannel(requestInfo.getDestinationAddress());
- ChannelFuture writeFuture = channel.write(requestInfo.getRequest());
+ ChannelFuture writeFuture = channel.writeAndFlush(requestInfo.getRequest());
requestInfo.setWriteFuture(writeFuture);
writeFuture.addListener(logErrorListener);
}
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
index dabd175..f44bf33 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
@@ -18,6 +18,7 @@
package org.apache.giraph.comm.netty;
+import io.netty.handler.flush.FlushConsolidationHandler;
import org.apache.giraph.comm.flow_control.FlowControl;
/*if_not[HADOOP_NON_SECURE]*/
import org.apache.giraph.comm.netty.handler.AuthorizeServerHandler;
@@ -257,6 +258,10 @@ public class NettyServer {
// pipeline components SaslServerHandler and ResponseEncoder are
// removed, leaving the pipeline the same as in the non-authenticated
// configuration except for the presence of the Authorize component.
+ PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
+ new FlushConsolidationHandler(FlushConsolidationHandler
+ .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
+ handlerToUseExecutionGroup, executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
if (conf.doCompression()) {
@@ -307,6 +312,10 @@ public class NettyServer {
ctx.fireChannelActive();
}
});
+ PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
+ new FlushConsolidationHandler(FlushConsolidationHandler
+ .DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
+ handlerToUseExecutionGroup, executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
if (conf.doCompression()) {
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
index 7bb4464..49b8e6d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
@@ -20,7 +20,6 @@ package org.apache.giraph.comm.netty.handler;
import org.apache.giraph.comm.flow_control.FlowControl;
import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.TaskInfo;
import org.apache.giraph.time.SystemTime;
@@ -32,8 +31,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED;
/**
@@ -64,10 +61,6 @@ public abstract class RequestServerHandler<R> extends
private long startProcessingNanoseconds = -1;
/** Handler for uncaught exceptions */
private final Thread.UncaughtExceptionHandler exceptionHandler;
- /** Whether it is the first time reading/handling a request*/
- private final AtomicBoolean firstRead = new AtomicBoolean(true);
- /** Cached value for NETTY_AUTO_READ configuration option */
- private final boolean nettyAutoRead;
/**
* Constructor
@@ -86,7 +79,6 @@ public abstract class RequestServerHandler<R> extends
closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
this.myTaskInfo = myTaskInfo;
this.exceptionHandler = exceptionHandler;
- this.nettyAutoRead = GiraphConstants.NETTY_AUTO_READ.get(conf);
}
@Override
@@ -141,24 +133,6 @@ public abstract class RequestServerHandler<R> extends
flowControl.calculateResponse(alreadyDone, request.getClientId());
buffer.writeInt(signal);
ctx.write(buffer);
- // NettyServer is bootstrapped with auto-read set to true by default. After
- // the first request is processed, we set auto-read to false. This prevents
- // netty from reading requests continuously and putting them in off-heap
- // memory. Instead, we will call `read` on requests one by one, so that the
- // lower level transport layer handles the congestion if the rate of
- // incoming requests is more than the available processing capability.
- if (!nettyAutoRead && firstRead.compareAndSet(true, false)) {
- ctx.channel().config().setAutoRead(false);
- }
- }
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- if (!nettyAutoRead) {
- ctx.read();
- } else {
- super.channelReadComplete(ctx);
- }
}
/**
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/FacebookConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/FacebookConfiguration.java
index e11f3d0..04064b9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/FacebookConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/FacebookConfiguration.java
@@ -132,8 +132,6 @@ public class FacebookConfiguration implements BulkConfigurator {
StaticFlowControl.MAX_NUMBER_OF_OPEN_REQUESTS.setIfUnset(conf, 100);
// Pooled allocator in netty is faster
GiraphConstants.NETTY_USE_POOLED_ALLOCATOR.setIfUnset(conf, true);
- // Turning off auto read is faster
- GiraphConstants.NETTY_AUTO_READ.setIfUnset(conf, false);
// Synchronize full gc calls across workers
MemoryObserver.USE_MEMORY_OBSERVER.setIfUnset(conf, true);
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index c0af192..2236092 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -653,15 +653,6 @@ public interface GiraphConstants {
new StrConfOption("giraph.nettyCompressionAlgorithm", "",
"Which compression algorithm to use in netty");
- /**
- * Whether netty should pro-actively read requests and feed them to its
- * processing pipeline
- */
- BooleanConfOption NETTY_AUTO_READ =
- new BooleanConfOption("giraph.nettyAutoRead", true,
- "Whether netty should pro-actively read requests and feed them to " +
- "its processing pipeline");
-
/** Max resolve address attempts */
IntConfOption MAX_RESOLVE_ADDRESS_ATTEMPTS =
new IntConfOption("giraph.maxResolveAddressAttempts", 5,
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 7f04e54..d244d20 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -1355,7 +1355,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
public ByteToMessageDecoder getNettyCompressionDecoder() {
switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
case "SNAPPY":
- return new SnappyFramedDecoder(true);
+ return new SnappyFramedDecoder();
case "INFLATE":
return new JdkZlibDecoder();
default:
diff --git a/pom.xml b/pom.xml
index 8e728e1..c70dd75 100644
--- a/pom.xml
+++ b/pom.xml
@@ -350,7 +350,7 @@ under the License.
<dep.log4j.version>1.2.17</dep.log4j.version>
<dep.mockito.version>1.9.5</dep.mockito.version>
<!-- note: old version of netty is required by hadoop_facebook for tests to succeed -->
- <dep.netty.version>4.0.14.Final</dep.netty.version>
+ <dep.netty.version>4.1.36.Final</dep.netty.version>
<dep.oldnetty.version>3.2.2.Final</dep.oldnetty.version>
<dep.objenesis.version>2.2</dep.objenesis.version>
<dep.openhft-compiler.version>2.2.1</dep.openhft-compiler.version>