You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by gb...@apache.org on 2015/08/02 20:16:08 UTC

helix git commit: Cleaned up ByteBuf usage in IPC

Repository: helix
Updated Branches:
  refs/heads/master 3cb08dbba -> 0cf986e17


Cleaned up ByteBuf usage in IPC

Removed dead code in IPC

Made max frame length configurable (also bumped to 128MB default)


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0cf986e1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0cf986e1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0cf986e1

Branch: refs/heads/master
Commit: 0cf986e1734a8e2261f14d0e5a4463e8826c4457
Parents: 3cb08db
Author: Greg Brandt <br...@gmail.com>
Authored: Wed Jul 1 15:50:04 2015 -0700
Committer: Greg Brandt <br...@gmail.com>
Committed: Sat Jul 18 12:54:21 2015 -0700

----------------------------------------------------------------------
 .../ipc/netty/NettyHelixIPCCallbackHandler.java | 31 +++-----------------
 .../helix/ipc/netty/NettyHelixIPCService.java   | 16 ++++++----
 .../helix/ipc/netty/NettyHelixIPCUtils.java     |  4 ++-
 3 files changed, 18 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/0cf986e1/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java
index 164e6d1..90fc7a2 100644
--- a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java
@@ -54,75 +54,52 @@ public class NettyHelixIPCCallbackHandler extends SimpleChannelInboundHandler<By
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
     try {
-      int idx = 0;
-
       // Message length
       int messageLength = byteBuf.readInt();
-      idx += 4;
 
       // Message version
       @SuppressWarnings("unused")
       int messageVersion = byteBuf.readInt();
-      idx += 4;
 
       // Message type
       int messageType = byteBuf.readInt();
-      idx += 4;
 
       // Message ID
       UUID messageId = new UUID(byteBuf.readLong(), byteBuf.readLong());
-      idx += 16;
 
       // Cluster
-      byteBuf.readerIndex(idx);
       int clusterSize = byteBuf.readInt();
-      idx += 4;
       checkLength("clusterSize", clusterSize, messageLength);
       String clusterName = toNonEmptyString(clusterSize, byteBuf);
-      idx += clusterSize;
 
       // Resource
-      byteBuf.readerIndex(idx);
       int resourceSize = byteBuf.readInt();
-      idx += 4;
       checkLength("resourceSize", resourceSize, messageLength);
       String resourceName = toNonEmptyString(resourceSize, byteBuf);
-      idx += resourceSize;
 
       // Partition
-      byteBuf.readerIndex(idx);
       int partitionSize = byteBuf.readInt();
-      idx += 4;
       checkLength("partitionSize", partitionSize, messageLength);
       String partitionName = toNonEmptyString(partitionSize, byteBuf);
-      idx += partitionSize;
 
       // State
-      byteBuf.readerIndex(idx);
       int stateSize = byteBuf.readInt();
-      idx += 4;
       checkLength("stateSize", stateSize, messageLength);
       String state = toNonEmptyString(stateSize, byteBuf);
-      idx += stateSize;
 
       // Source instance
-      byteBuf.readerIndex(idx);
       int srcInstanceSize = byteBuf.readInt();
-      idx += 4;
       checkLength("srcInstanceSize", srcInstanceSize, messageLength);
       String srcInstance = toNonEmptyString(srcInstanceSize, byteBuf);
-      idx += srcInstanceSize;
 
       // Destination instance
-      byteBuf.readerIndex(idx);
       int dstInstanceSize = byteBuf.readInt();
-      idx += 4;
       checkLength("dstInstanceSize", dstInstanceSize, messageLength);
       String dstInstance = toNonEmptyString(dstInstanceSize, byteBuf);
-      idx += dstInstanceSize;
 
-      // Position at message
-      byteBuf.readerIndex(idx + 4);
+      // Message
+      int messageSize = byteBuf.readInt();
+      ByteBuf message = byteBuf.slice(byteBuf.readerIndex(), messageSize);
 
       // Error check
       if (dstInstance == null) {
@@ -147,7 +124,7 @@ public class NettyHelixIPCCallbackHandler extends SimpleChannelInboundHandler<By
       }
 
       // Handle callback
-      callback.onMessage(scope, messageId, byteBuf);
+      callback.onMessage(scope, messageId, message);
 
       // Stats
       statRxMsg.mark();

http://git-wip-us.apache.org/repos/asf/helix/blob/0cf986e1/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
index 68f6fbc..c6393df 100644
--- a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
@@ -96,7 +96,6 @@ public class NettyHelixIPCService implements HelixIPCService {
 
   // Parameters for length header field of message (tells decoder to interpret but preserve length
   // field in message)
-  private static final int MAX_FRAME_LENGTH = 1024 * 1024;
   private static final int LENGTH_FIELD_OFFSET = 0;
   private static final int LENGTH_FIELD_LENGTH = 4;
   private static final int LENGTH_ADJUSTMENT = -4;
@@ -158,7 +157,7 @@ public class NettyHelixIPCService implements HelixIPCService {
             protected void initChannel(SocketChannel socketChannel) throws Exception {
               socketChannel.pipeline().addLast(
                   new LengthFieldBasedFrameDecoder(
-                      MAX_FRAME_LENGTH,
+                      config.getMaxFrameLength(),
                       LENGTH_FIELD_OFFSET,
                       LENGTH_FIELD_LENGTH,
                       LENGTH_ADJUSTMENT,
@@ -231,9 +230,6 @@ public class NettyHelixIPCService implements HelixIPCService {
         synchronized (channelMap) {
           channel = channels.get(idx);
           if (channel == null || !channel.isOpen()) {
-            if (channel != null && channel.isOpen()) {
-              channel.close();
-            }
             channel = clientBootstrap.connect(destination.getSocketAddress()).sync().channel();
             channels.set(idx, channel);
             statChannelOpen.inc();
@@ -302,6 +298,7 @@ public class NettyHelixIPCService implements HelixIPCService {
     private String instanceName;
     private int port;
     private int numConnections = 1;
+    private int maxFrameLength = 128 * 1024 * 1024;
 
     public Config setInstanceName(String instanceName) {
       this.instanceName = instanceName;
@@ -318,6 +315,11 @@ public class NettyHelixIPCService implements HelixIPCService {
       return this;
     }
 
+    public Config setMaxFrameLength(int maxFrameLength) {
+      this.maxFrameLength = maxFrameLength;
+      return this;
+    }
+
     public String getInstanceName() {
       return instanceName;
     }
@@ -329,5 +331,9 @@ public class NettyHelixIPCService implements HelixIPCService {
     public int getNumConnections() {
       return numConnections;
     }
+
+    public int getMaxFrameLength() {
+      return maxFrameLength;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/0cf986e1/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java
index 77b9123..19e4d87 100644
--- a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java
@@ -45,7 +45,9 @@ public class NettyHelixIPCUtils {
   /** Given a byte buf w/ a certain reader index, encodes the next length bytes as a String */
   public static String toNonEmptyString(int length, ByteBuf byteBuf) {
     if (byteBuf.readableBytes() >= length) {
-      return byteBuf.toString(byteBuf.readerIndex(), length, Charset.defaultCharset());
+      String string =  byteBuf.toString(byteBuf.readerIndex(), length, Charset.defaultCharset());
+      byteBuf.readerIndex(byteBuf.readerIndex() + length);
+      return string;
     }
     return null;
   }