You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by gb...@apache.org on 2015/08/02 20:16:08 UTC
helix git commit: Cleaned up ByteBuf usage in IPC
Repository: helix
Updated Branches:
refs/heads/master 3cb08dbba -> 0cf986e17
Cleaned up ByteBuf usage in IPC
Removed dead code in IPC
Made max frame length configurable (also bumped to 128MB default)
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0cf986e1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0cf986e1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0cf986e1
Branch: refs/heads/master
Commit: 0cf986e1734a8e2261f14d0e5a4463e8826c4457
Parents: 3cb08db
Author: Greg Brandt <br...@gmail.com>
Authored: Wed Jul 1 15:50:04 2015 -0700
Committer: Greg Brandt <br...@gmail.com>
Committed: Sat Jul 18 12:54:21 2015 -0700
----------------------------------------------------------------------
.../ipc/netty/NettyHelixIPCCallbackHandler.java | 31 +++-----------------
.../helix/ipc/netty/NettyHelixIPCService.java | 16 ++++++----
.../helix/ipc/netty/NettyHelixIPCUtils.java | 4 ++-
3 files changed, 18 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/0cf986e1/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java
index 164e6d1..90fc7a2 100644
--- a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java
@@ -54,75 +54,52 @@ public class NettyHelixIPCCallbackHandler extends SimpleChannelInboundHandler<By
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
try {
- int idx = 0;
-
// Message length
int messageLength = byteBuf.readInt();
- idx += 4;
// Message version
@SuppressWarnings("unused")
int messageVersion = byteBuf.readInt();
- idx += 4;
// Message type
int messageType = byteBuf.readInt();
- idx += 4;
// Message ID
UUID messageId = new UUID(byteBuf.readLong(), byteBuf.readLong());
- idx += 16;
// Cluster
- byteBuf.readerIndex(idx);
int clusterSize = byteBuf.readInt();
- idx += 4;
checkLength("clusterSize", clusterSize, messageLength);
String clusterName = toNonEmptyString(clusterSize, byteBuf);
- idx += clusterSize;
// Resource
- byteBuf.readerIndex(idx);
int resourceSize = byteBuf.readInt();
- idx += 4;
checkLength("resourceSize", resourceSize, messageLength);
String resourceName = toNonEmptyString(resourceSize, byteBuf);
- idx += resourceSize;
// Partition
- byteBuf.readerIndex(idx);
int partitionSize = byteBuf.readInt();
- idx += 4;
checkLength("partitionSize", partitionSize, messageLength);
String partitionName = toNonEmptyString(partitionSize, byteBuf);
- idx += partitionSize;
// State
- byteBuf.readerIndex(idx);
int stateSize = byteBuf.readInt();
- idx += 4;
checkLength("stateSize", stateSize, messageLength);
String state = toNonEmptyString(stateSize, byteBuf);
- idx += stateSize;
// Source instance
- byteBuf.readerIndex(idx);
int srcInstanceSize = byteBuf.readInt();
- idx += 4;
checkLength("srcInstanceSize", srcInstanceSize, messageLength);
String srcInstance = toNonEmptyString(srcInstanceSize, byteBuf);
- idx += srcInstanceSize;
// Destination instance
- byteBuf.readerIndex(idx);
int dstInstanceSize = byteBuf.readInt();
- idx += 4;
checkLength("dstInstanceSize", dstInstanceSize, messageLength);
String dstInstance = toNonEmptyString(dstInstanceSize, byteBuf);
- idx += dstInstanceSize;
- // Position at message
- byteBuf.readerIndex(idx + 4);
+ // Message
+ int messageSize = byteBuf.readInt();
+ ByteBuf message = byteBuf.slice(byteBuf.readerIndex(), messageSize);
// Error check
if (dstInstance == null) {
@@ -147,7 +124,7 @@ public class NettyHelixIPCCallbackHandler extends SimpleChannelInboundHandler<By
}
// Handle callback
- callback.onMessage(scope, messageId, byteBuf);
+ callback.onMessage(scope, messageId, message);
// Stats
statRxMsg.mark();
http://git-wip-us.apache.org/repos/asf/helix/blob/0cf986e1/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
index 68f6fbc..c6393df 100644
--- a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
@@ -96,7 +96,6 @@ public class NettyHelixIPCService implements HelixIPCService {
// Parameters for length header field of message (tells decoder to interpret but preserve length
// field in message)
- private static final int MAX_FRAME_LENGTH = 1024 * 1024;
private static final int LENGTH_FIELD_OFFSET = 0;
private static final int LENGTH_FIELD_LENGTH = 4;
private static final int LENGTH_ADJUSTMENT = -4;
@@ -158,7 +157,7 @@ public class NettyHelixIPCService implements HelixIPCService {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(
new LengthFieldBasedFrameDecoder(
- MAX_FRAME_LENGTH,
+ config.getMaxFrameLength(),
LENGTH_FIELD_OFFSET,
LENGTH_FIELD_LENGTH,
LENGTH_ADJUSTMENT,
@@ -231,9 +230,6 @@ public class NettyHelixIPCService implements HelixIPCService {
synchronized (channelMap) {
channel = channels.get(idx);
if (channel == null || !channel.isOpen()) {
- if (channel != null && channel.isOpen()) {
- channel.close();
- }
channel = clientBootstrap.connect(destination.getSocketAddress()).sync().channel();
channels.set(idx, channel);
statChannelOpen.inc();
@@ -302,6 +298,7 @@ public class NettyHelixIPCService implements HelixIPCService {
private String instanceName;
private int port;
private int numConnections = 1;
+ private int maxFrameLength = 128 * 1024 * 1024;
public Config setInstanceName(String instanceName) {
this.instanceName = instanceName;
@@ -318,6 +315,11 @@ public class NettyHelixIPCService implements HelixIPCService {
return this;
}
+ public Config setMaxFrameLength(int maxFrameLength) {
+ this.maxFrameLength = maxFrameLength;
+ return this;
+ }
+
public String getInstanceName() {
return instanceName;
}
@@ -329,5 +331,9 @@ public class NettyHelixIPCService implements HelixIPCService {
public int getNumConnections() {
return numConnections;
}
+
+ public int getMaxFrameLength() {
+ return maxFrameLength;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/0cf986e1/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java
index 77b9123..19e4d87 100644
--- a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java
@@ -45,7 +45,9 @@ public class NettyHelixIPCUtils {
/** Given a byte buf w/ a certain reader index, encodes the next length bytes as a String */
public static String toNonEmptyString(int length, ByteBuf byteBuf) {
if (byteBuf.readableBytes() >= length) {
- return byteBuf.toString(byteBuf.readerIndex(), length, Charset.defaultCharset());
+ String string = byteBuf.toString(byteBuf.readerIndex(), length, Charset.defaultCharset());
+ byteBuf.readerIndex(byteBuf.readerIndex() + length);
+ return string;
}
return null;
}