You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/01/09 23:22:36 UTC
git commit: GIRAPH-474: Add an option not to use direct byte buffers
Updated Branches:
refs/heads/giraph474 [created] 8711d9bbc
GIRAPH-474: Add an option not to use direct byte buffers
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8711d9bb
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8711d9bb
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8711d9bb
Branch: refs/heads/giraph474
Commit: 8711d9bbc3f1cb3e304408feae1a4d8ee7ecdee2
Parents: 7a04bfd
Author: Maja Kabiljo <ma...@maja-mbp.local>
Authored: Wed Jan 9 14:14:38 2013 -0800
Committer: Maja Kabiljo <ma...@maja-mbp.local>
Committed: Wed Jan 9 14:14:38 2013 -0800
----------------------------------------------------------------------
CHANGELOG | 2 +
.../org/apache/giraph/comm/netty/NettyClient.java | 8 +---
.../giraph/comm/netty/handler/RequestEncoder.java | 31 ++++++++++-----
.../org/apache/giraph/conf/GiraphConstants.java | 9 ++++
4 files changed, 34 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/8711d9bb/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index d67df78..6d469b0 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-474: Add an option not to use direct byte buffers (majakabiljo)
+
GIRAPH-476: SequenceFileVertexOutputFormat (nitay)
GIRAPH-409: Refactor / cleanups (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/8711d9bb/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index c66c819..ed92d82 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -278,9 +278,7 @@ public class NettyClient {
// completes (as in non-auth pipeline below).
pipeline.addLast("length-field-based-frame-decoder",
new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
- pipeline.addLast("request-encoder", new RequestEncoder(conf.getInt(
- GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE,
- GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT)));
+ pipeline.addLast("request-encoder", new RequestEncoder(conf));
// The following pipeline component responds to the server's SASL
// tokens with its own responses. Both client and server share the
// same Hadoop Job token, which is used to create the SASL tokens to
@@ -298,9 +296,7 @@ public class NettyClient {
pipeline.addLast("clientByteCounter", byteCounter);
pipeline.addLast("responseFrameDecoder",
new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES));
- pipeline.addLast("requestEncoder", new RequestEncoder(conf.getInt(
- GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE,
- GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT)));
+ pipeline.addLast("requestEncoder", new RequestEncoder(conf));
pipeline.addLast("responseClientHandler",
new ResponseClientHandler(clientRequestIdRequestInfoMap, conf));
if (executionHandler != null) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8711d9bb/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
index 7fa0a4e..4e739cb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestEncoder.java
@@ -19,6 +19,8 @@
package org.apache.giraph.comm.netty.handler;
import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.giraph.time.Times;
@@ -42,16 +44,23 @@ public class RequestEncoder extends OneToOneEncoder {
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
/** Buffer starting size */
private final int bufferStartingSize;
+ /** Whether or not to use direct byte buffers */
+ private final boolean useDirectBuffers;
/** Start nanoseconds for the encoding time */
private long startEncodingNanoseconds = -1;
/**
* Constructor.
*
- * @param bufferStartingSize Starting size of the buffer
+ * @param conf Giraph configuration
*/
- public RequestEncoder(int bufferStartingSize) {
- this.bufferStartingSize = bufferStartingSize;
+ public RequestEncoder(GiraphConfiguration conf) {
+ bufferStartingSize = conf.getInt(
+ GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE,
+ GiraphConstants.NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT);
+ useDirectBuffers = conf.getBoolean(
+ GiraphConstants.NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS,
+ GiraphConstants.NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS_DEFAULT);
}
@Override
@@ -68,17 +77,19 @@ public class RequestEncoder extends OneToOneEncoder {
}
WritableRequest writableRequest = (WritableRequest) msg;
int requestSize = writableRequest.getSerializedSize();
- ChannelBufferOutputStream outputStream;
+ ChannelBuffer channelBuffer;
if (requestSize == WritableRequest.UNKNOWN_SIZE) {
- outputStream =
- new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer(
- bufferStartingSize,
- ctx.getChannel().getConfig().getBufferFactory()));
+ channelBuffer = ChannelBuffers.dynamicBuffer(
+ bufferStartingSize,
+ ctx.getChannel().getConfig().getBufferFactory());
} else {
requestSize += LENGTH_PLACEHOLDER.length + 1;
- outputStream = new ChannelBufferOutputStream(
- ChannelBuffers.directBuffer(requestSize));
+ channelBuffer = useDirectBuffers ?
+ ChannelBuffers.directBuffer(requestSize) :
+ ChannelBuffers.buffer(requestSize);
}
+ ChannelBufferOutputStream outputStream =
+ new ChannelBufferOutputStream(channelBuffer);
outputStream.write(LENGTH_PLACEHOLDER);
outputStream.writeByte(writableRequest.getType().ordinal());
try {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8711d9bb/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 11d4a41..9acc50a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -177,6 +177,15 @@ public interface GiraphConstants {
/** Start with 32K */
int NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT = 32 * 1024;
+ /** Whether or not netty request encoder should use direct byte buffers */
+ String NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS =
+ "giraph.nettyRequestEncoderUseDirectBuffers";
+ /**
+ * By default don't use direct buffers,
+ * since jobs can take more than allowed heap memory in that case
+ */
+ boolean NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS_DEFAULT = false;
+
/** Netty client threads */
String NETTY_CLIENT_THREADS = "giraph.nettyClientThreads";
/** Default is 4 */