You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/01/09 23:22:36 UTC

git commit: GIRAPH-474: Add an option not to use direct byte buffers

Updated Branches:
  refs/heads/giraph474 [created] 8711d9bbc


GIRAPH-474: Add an option not to use direct byte buffers


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8711d9bb
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8711d9bb
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8711d9bb

Branch: refs/heads/giraph474
Commit: 8711d9bbc3f1cb3e304408feae1a4d8ee7ecdee2
Parents: 7a04bfd
Author: Maja Kabiljo <ma...@maja-mbp.local>
Authored: Wed Jan 9 14:14:38 2013 -0800
Committer: Maja Kabiljo <ma...@maja-mbp.local>
Committed: Wed Jan 9 14:14:38 2013 -0800

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../org/apache/giraph/comm/netty/NettyClient.java  |    8 +---
 .../giraph/comm/netty/handler/RequestEncoder.java  |   31 ++++++++++-----
 .../org/apache/giraph/conf/GiraphConstants.java    |    9 ++++
 4 files changed, 34 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/8711d9bb/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index d67df78..6d469b0 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-474: Add an option not to use direct byte buffers (majakabiljo)
+
   GIRAPH-476: SequenceFileVertexOutputFormat (nitay)
 
   GIRAPH-409: Refactor / cleanups (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/8711d9bb/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index c66c819..ed92d82 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -278,9 +278,7 @@ public class NettyClient {
           // completes (as in non-auth pipeline below).
           pipeline.addLast("length-field-based-frame-decoder",
               new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
-          pipeline.addLast("request-encoder", new RequestEncoder(conf.getInt(
-              GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE,
-              GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT)));
+          pipeline.addLast("request-encoder", new RequestEncoder(conf));
           // The following pipeline component responds to the server's SASL
           // tokens with its own responses. Both client and server share the
           // same Hadoop Job token, which is used to create the SASL tokens to
@@ -298,9 +296,7 @@ public class NettyClient {
           pipeline.addLast("clientByteCounter", byteCounter);
           pipeline.addLast("responseFrameDecoder",
               new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES));
-          pipeline.addLast("requestEncoder", new RequestEncoder(conf.getInt(
-              GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE,
-              GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT)));
+          pipeline.addLast("requestEncoder", new RequestEncoder(conf));
           pipeline.addLast("responseClientHandler",
               new ResponseClientHandler(clientRequestIdRequestInfoMap, conf));
           if (executionHandler != null) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8711d9bb/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
index 7fa0a4e..4e739cb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
@@ -19,6 +19,8 @@
 package org.apache.giraph.comm.netty.handler;
 
 import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
 import org.apache.giraph.time.Times;
@@ -42,16 +44,23 @@ public class RequestEncoder extends OneToOneEncoder {
   private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
   /** Buffer starting size */
   private final int bufferStartingSize;
+  /** Whether or not to use direct byte buffers */
+  private final boolean useDirectBuffers;
   /** Start nanoseconds for the encoding time */
   private long startEncodingNanoseconds = -1;
 
   /**
    * Constructor.
    *
-   * @param bufferStartingSize Starting size of the buffer
+   * @param conf Giraph configuration
    */
-  public RequestEncoder(int bufferStartingSize) {
-    this.bufferStartingSize = bufferStartingSize;
+  public RequestEncoder(GiraphConfiguration conf) {
+    bufferStartingSize = conf.getInt(
+        GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE,
+        GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT);
+    useDirectBuffers = conf.getBoolean(
+        GiraphConstants.NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS,
+        GiraphConstants.NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS_DEFAULT);
   }
 
   @Override
@@ -68,17 +77,19 @@ public class RequestEncoder extends OneToOneEncoder {
     }
     WritableRequest writableRequest = (WritableRequest) msg;
     int requestSize = writableRequest.getSerializedSize();
-    ChannelBufferOutputStream outputStream;
+    ChannelBuffer channelBuffer;
     if (requestSize == WritableRequest.UNKNOWN_SIZE) {
-      outputStream =
-          new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer(
-              bufferStartingSize,
-              ctx.getChannel().getConfig().getBufferFactory()));
+      channelBuffer = ChannelBuffers.dynamicBuffer(
+          bufferStartingSize,
+          ctx.getChannel().getConfig().getBufferFactory());
     } else {
       requestSize += LENGTH_PLACEHOLDER.length + 1;
-      outputStream = new ChannelBufferOutputStream(
-          ChannelBuffers.directBuffer(requestSize));
+      channelBuffer = useDirectBuffers ?
+          ChannelBuffers.directBuffer(requestSize) :
+          ChannelBuffers.buffer(requestSize);
     }
+    ChannelBufferOutputStream outputStream =
+        new ChannelBufferOutputStream(channelBuffer);
     outputStream.write(LENGTH_PLACEHOLDER);
     outputStream.writeByte(writableRequest.getType().ordinal());
     try {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8711d9bb/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 11d4a41..9acc50a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -177,6 +177,15 @@ public interface GiraphConstants {
   /** Start with 32K */
   int NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT = 32 * 1024;
 
+  /** Whether or not netty request encoder should use direct byte buffers */
+  String NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS =
+      "giraph.nettyRequestEncoderUseDirectBuffers";
+  /**
+   * By default don't use direct buffers,
+   * since jobs can take more than allowed heap memory in that case
+   */
+  boolean NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS_DEFAULT = false;
+
   /** Netty client threads */
   String NETTY_CLIENT_THREADS = "giraph.nettyClientThreads";
   /** Default is 4 */