You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2021/03/24 10:06:41 UTC
[phoenix-omid] branch master updated: OMID-202 Refactor Omid to use
Netty 4
This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-omid.git
The following commit(s) were added to refs/heads/master by this push:
new 18ef57d OMID-202 Refactor Omid to use Netty 4
18ef57d is described below
commit 18ef57d4ec23d96f0ff7dbebec399d838c6c7566
Author: Istvan Toth <st...@apache.org>
AuthorDate: Fri Mar 12 11:23:00 2021 +0100
OMID-202 Refactor Omid to use Netty 4
---
pom.xml | 2 +-
transaction-client/pom.xml | 4 +-
.../java/org/apache/omid/tso/client/TSOClient.java | 143 +++++++-------
tso-server/pom.xml | 4 +-
.../apache/omid/tso/AbstractRequestProcessor.java | 2 +-
.../src/main/java/org/apache/omid/tso/Batch.java | 2 +-
.../java/org/apache/omid/tso/PersistEvent.java | 2 +-
.../org/apache/omid/tso/PersistenceProcessor.java | 2 +-
.../apache/omid/tso/PersistenceProcessorImpl.java | 2 +-
.../omid/tso/PersitenceProcessorNullImpl.java | 2 +-
.../java/org/apache/omid/tso/ReplyProcessor.java | 2 +-
.../org/apache/omid/tso/ReplyProcessorImpl.java | 10 +-
.../java/org/apache/omid/tso/RequestProcessor.java | 2 +-
.../apache/omid/tso/RequestProcessorPersistCT.java | 2 +-
.../apache/omid/tso/RequestProcessorSkipCT.java | 2 +-
.../java/org/apache/omid/tso/RetryProcessor.java | 2 +-
.../org/apache/omid/tso/RetryProcessorImpl.java | 2 +-
.../org/apache/omid/tso/TSOChannelHandler.java | 211 ++++++++++-----------
.../org/apache/omid/tso/ProgrammableTSOServer.java | 112 ++++++-----
.../test/java/org/apache/omid/tso/TestBatch.java | 2 +-
.../apache/omid/tso/TestPersistenceProcessor.java | 2 +-
.../omid/tso/TestPersistenceProcessorHandler.java | 2 +-
.../org/apache/omid/tso/TestReplyProcessor.java | 2 +-
.../org/apache/omid/tso/TestRequestProcessor.java | 2 +-
.../org/apache/omid/tso/TestRetryProcessor.java | 2 +-
.../omid/tso/TestTSOChannelHandlerNetty.java | 157 ++++++++-------
.../org/apache/omid/tso/client/TSOClientRaw.java | 106 ++++++-----
27 files changed, 402 insertions(+), 383 deletions(-)
diff --git a/pom.xml b/pom.xml
index fea3247..0447d9b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -175,7 +175,7 @@
<testng.version>6.10</testng.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
- <netty.version>3.10.6.Final</netty.version>
+ <netty4.version>4.1.60.Final</netty4.version>
<protobuf.version>2.5.0</protobuf.version>
<protobuf.plugin.version>0.6.1</protobuf.plugin.version>
<os.plugin.version>1.6.2</os.plugin.version>
diff --git a/transaction-client/pom.xml b/transaction-client/pom.xml
index ef79107..43989b2 100644
--- a/transaction-client/pom.xml
+++ b/transaction-client/pom.xml
@@ -69,8 +69,8 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- <version>${netty.version}</version>
+ <artifactId>netty-all</artifactId>
+ <version>${netty4.version}</version>
</dependency>
<!-- end distributed comm -->
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
index 43eaaa6..172d5eb 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
@@ -30,25 +30,26 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,6 +65,7 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -84,8 +86,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
private CuratorFramework zkClient;
private NodeCache currentTSOZNode;
- private ChannelFactory factory;
- private ClientBootstrap bootstrap;
+ private Bootstrap bootstrap;
private Channel currentChannel;
private final ScheduledExecutorService fsmExecutor;
StateMachine.Fsm fsm;
@@ -115,17 +116,6 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
// Avoid instantiation
private TSOClient(OmidClientConfiguration omidConf) throws IOException {
- // Start client with Nb of active threads = 3 as maximum.
- int tsoExecutorThreads = omidConf.getExecutorThreads();
-
- factory = new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()),
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), tsoExecutorThreads);
- // Create the bootstrap
- bootstrap = new ClientBootstrap(factory);
-
requestTimeoutInMs = omidConf.getRequestTimeoutInMs();
requestMaxRetries = omidConf.getRequestMaxRetries();
tsoReconnectionDelayInSecs = omidConf.getReconnectionDelayInSecs();
@@ -160,21 +150,31 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
fsm = new StateMachine.FsmImpl(fsmExecutor);
fsm.setInitState(new DisconnectedState(fsm));
- ChannelPipeline pipeline = bootstrap.getPipeline();
- pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
- pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
- pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
- pipeline.addLast("protobufencoder", new ProtobufEncoder());
- pipeline.addLast("handler", new Handler(fsm));
-
- bootstrap.setOption("tcpNoDelay", true);
- bootstrap.setOption("keepAlive", true);
- bootstrap.setOption("reuseAddress", true);
- bootstrap.setOption("connectTimeoutMillis", 100);
- lowLatency = false;
-
+ // Start client with the configured thread count
+ int tsoExecutorThreads = omidConf.getExecutorThreads();
+ ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build();
+ EventLoopGroup workerGroup = new NioEventLoopGroup(tsoExecutorThreads, workerThreadFactory);
+ bootstrap = new Bootstrap();
+ bootstrap.group(workerGroup);
+ bootstrap.channel(NioSocketChannel.class);
+ bootstrap.handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
+ pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+ pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
+ pipeline.addLast("protobufencoder", new ProtobufEncoder());
+ pipeline.addLast("inboundHandler", new Handler(fsm));
+ }
+ });
+ bootstrap.option(ChannelOption.TCP_NODELAY, true);
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+ bootstrap.option(ChannelOption.SO_REUSEADDR, true);
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100);
+ lowLatency = false;
conflictDetectionLevel = omidConf.getConflictAnalysisLevel();
}
@@ -317,6 +317,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
* Used for family deletion
* @return the conflict detection level.
*/
+ @Override
public ConflictDetectionLevel getConflictDetectionLevel() {
return conflictDetectionLevel;
}
@@ -324,6 +325,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
/**
* Used for family deletion testing
*/
+ @Override
public void setConflictDetectionLevel(ConflictDetectionLevel conflictDetectionLevel) {
this.conflictDetectionLevel = conflictDetectionLevel;
}
@@ -342,7 +344,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
setTSOAddress(hp.getHost(), hp.getPort());
epoch = Long.parseLong(currentTSOAndEpochArray[1]);
LOG.info("CurrentTSO ZNode changed. New TSO Host & Port {}/Epoch {}", hp, getEpoch());
- if (currentChannel != null && currentChannel.isConnected()) {
+ if (currentChannel != null && currentChannel.isActive()) {
LOG.info("\tClosing channel with previous TSO {}", currentChannel);
currentChannel.close();
}
@@ -513,7 +515,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
}
public StateMachine.State handleEvent(CloseEvent e) {
- factory.releaseExternalResources();
+ bootstrap.config().group().shutdownGracefully();
e.success(null);
return this;
}
@@ -527,10 +529,11 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
LOG.info("Connection to TSO [{}] established. Channel {}",
- tsoAddress, channelFuture.getChannel());
+ tsoAddress, channelFuture.channel());
} else {
LOG.error("Failed connection attempt to TSO [{}] failed. Channel {}",
- tsoAddress, channelFuture.getChannel());
+ tsoAddress, channelFuture.channel());
+ fsm.sendEvent(new ErrorEvent(new ConnectionException()));
}
}
});
@@ -582,6 +585,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
return timeout;
}
+ @Override
public String toString() {
String info = "Request type ";
if (event.getRequest().hasTimestampRequest()) {
@@ -610,7 +614,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
// Add the required handshake capabilities when necessary
handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
- channel.write(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
+ channel.writeAndFlush(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
timeout = newTimeout();
}
@@ -764,13 +768,13 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
request.error(new IllegalArgumentException("Unknown request type"));
return;
}
- ChannelFuture f = channel.write(req);
+ ChannelFuture f = channel.writeAndFlush(req);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
- fsm.sendEvent(new ErrorEvent(future.getCause()));
+ fsm.sendEvent(new ErrorEvent(future.cause()));
}
}
});
@@ -1003,7 +1007,7 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
// Helper classes & methods
// ----------------------------------------------------------------------------------------------------------------
- private class Handler extends SimpleChannelHandler {
+ private class Handler extends ChannelInboundHandlerAdapter {
private StateMachine.Fsm fsm;
@@ -1012,37 +1016,34 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
}
@Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
- currentChannel = e.getChannel();
- LOG.debug("HANDLER (CHANNEL CONNECTED): Connection {}. Sending connected event to FSM", e);
- fsm.sendEvent(new ConnectedEvent(e.getChannel()));
+ public void channelActive(ChannelHandlerContext ctx) {
+ currentChannel = ctx.channel();
+ LOG.debug("HANDLER (CHANNEL ACTIVE): Connection {}. Sending connected event to FSM", ctx.channel());
+ fsm.sendEvent(new ConnectedEvent(ctx.channel()));
}
@Override
- public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- LOG.debug("HANDLER (CHANNEL DISCONNECTED): Connection {}. Sending error event to FSM", e);
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ LOG.debug("HANDLER (CHANNEL INACTIVE): Connection {}. Sending error, then channelClosed event to FSM", ctx.channel());
+ // Netty 3 had separate callbacks, and the FSM expects both events.
+ // Sending both is much easier than rewriting the FSM
fsm.sendEvent(new ErrorEvent(new ConnectionException()));
- }
-
- @Override
- public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- LOG.debug("HANDLER (CHANNEL CLOSED): Connection {}. Sending channel closed event to FSM", e);
fsm.sendEvent(new ChannelClosedEvent(new ConnectionException()));
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
- if (e.getMessage() instanceof TSOProto.Response) {
- fsm.sendEvent(new ResponseEvent((TSOProto.Response) e.getMessage()));
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ if (msg instanceof TSOProto.Response) {
+ fsm.sendEvent(new ResponseEvent((TSOProto.Response) msg));
} else {
- LOG.warn("Received unknown message", e.getMessage());
+ LOG.warn("Received unknown message", msg);
}
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
- LOG.error("Error on channel {}", ctx.getChannel(), e.getCause());
- fsm.sendEvent(new ErrorEvent(e.getCause()));
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ LOG.error("Error on channel {}", ctx.channel(), cause);
+ fsm.sendEvent(new ErrorEvent(cause));
}
}
diff --git a/tso-server/pom.xml b/tso-server/pom.xml
index f5e3bbf..e2e0253 100644
--- a/tso-server/pom.xml
+++ b/tso-server/pom.xml
@@ -111,8 +111,8 @@
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- <version>${netty.version}</version>
+ <artifactId>netty-all</artifactId>
+ <version>${netty4.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
diff --git a/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java
index 9f502df..2eb9cb1 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java
@@ -28,7 +28,7 @@ import com.lmax.disruptor.dsl.Disruptor;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.tso.TSOStateManager.TSOState;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
index 3c3e432..57405f4 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -23,7 +23,7 @@ import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
index 9528d66..c177f02 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
@@ -19,7 +19,7 @@ package org.apache.omid.tso;
import org.apache.phoenix.thirdparty.com.google.common.base.MoreObjects;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
public final class PersistEvent {
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index beb51c4..7982e12 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -18,7 +18,7 @@
package org.apache.omid.tso;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
import java.io.Closeable;
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index 9360881..e1e3c60 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -30,7 +30,7 @@ import com.lmax.disruptor.dsl.Disruptor;
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.MetricsRegistry;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java
index e4aed24..411770e 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java
@@ -18,7 +18,7 @@
package org.apache.omid.tso;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
import java.io.IOException;
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
index d548909..50cf8a5 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
@@ -18,7 +18,7 @@
package org.apache.omid.tso;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
import java.io.Closeable;
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index b51612e..7b4e4dc 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -32,7 +32,7 @@ import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.metrics.Meter;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.proto.TSOProto;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -210,7 +210,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
.setStartTimestamp(startTimestamp)
.setCommitTimestamp(commitTimestamp);
builder.setCommitResponse(commitBuilder.build());
- c.write(builder.build());
+ c.writeAndFlush(builder.build());
commitMeter.mark();
monCtx.timerStop("reply.processor.commit.latency");
}
@@ -223,7 +223,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
commitBuilder.setAborted(true);
commitBuilder.setStartTimestamp(startTimestamp);
builder.setCommitResponse(commitBuilder.build());
- c.write(builder.build());
+ c.writeAndFlush(builder.build());
abortMeter.mark();
monCtx.timerStop("reply.processor.abort.latency");
}
@@ -235,7 +235,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
TSOProto.TimestampResponse.Builder respBuilder = TSOProto.TimestampResponse.newBuilder();
respBuilder.setStartTimestamp(startTimestamp);
builder.setTimestampResponse(respBuilder.build());
- c.write(builder.build());
+ c.writeAndFlush(builder.build());
timestampMeter.mark();
monCtx.timerStop("reply.processor.timestamp.latency");
}
@@ -248,7 +248,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
fenceBuilder.setTableId(tableID);
fenceBuilder.setFenceId(fenceTimestamp);
builder.setFenceResponse(fenceBuilder.build());
- c.write(builder.build());
+ c.writeAndFlush(builder.build());
monCtx.timerStop("reply.processor.fence.latency");
fenceMeter.mark();
}
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessor.java
index 062329d..2f83b6e 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessor.java
@@ -17,7 +17,7 @@
*/
package org.apache.omid.tso;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
import java.io.Closeable;
import java.util.Collection;
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
index 7323638..15b5c78 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
@@ -20,7 +20,7 @@ package org.apache.omid.tso;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import com.google.inject.Inject;
import org.apache.omid.metrics.MetricsRegistry;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
import java.io.IOException;
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
index 2760e19..f8ce89a 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
@@ -20,7 +20,7 @@ package org.apache.omid.tso;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import com.google.inject.Inject;
import org.apache.omid.metrics.MetricsRegistry;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
import java.io.IOException;
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessor.java
index c17b29a..1f9c000 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessor.java
@@ -17,7 +17,7 @@
*/
package org.apache.omid.tso;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
import java.io.Closeable;
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
index 988e3cd..3654c58 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
@@ -31,7 +31,7 @@ import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
import org.apache.omid.metrics.Meter;
import org.apache.omid.metrics.MetricsRegistry;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
index 6269bb8..815ce55 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
@@ -17,56 +17,58 @@
*/
package org.apache.omid.tso;
-import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.ThreadFactory;
+
+import javax.inject.Inject;
+
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.proto.TSOProto;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelHandler;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.inject.Inject;
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.ClosedChannelException;
-import java.util.concurrent.Executors;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.GlobalEventExecutor;
/**
* ChannelHandler for the TSO Server.
*
* Incoming requests are processed in this class
*/
-public class TSOChannelHandler extends SimpleChannelHandler implements Closeable {
-
- private static final Logger LOG = LoggerFactory.getLogger(TSOChannelHandler.class);
+// Marked sharable, as all global members used in callbacks are singletons.
+@Sharable
+public class TSOChannelHandler extends ChannelInboundHandlerAdapter implements Closeable {
- private final ChannelFactory factory;
+ private final Logger LOG = LoggerFactory.getLogger(TSOChannelHandler.class);
private final ServerBootstrap bootstrap;
@VisibleForTesting
Channel listeningChannel;
@VisibleForTesting
- ChannelGroup channelGroup;
+ ChannelGroup allChannels;
private RequestProcessor requestProcessor;
@@ -74,38 +76,57 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
private MetricsRegistry metrics;
+ private static final AttributeKey<TSOChannelContext> TSO_CTX =
+ AttributeKey.valueOf("TSO_CTX");
+
@Inject
public TSOChannelHandler(TSOServerConfig config, RequestProcessor requestProcessor, MetricsRegistry metrics) {
this.config = config;
this.metrics = metrics;
this.requestProcessor = requestProcessor;
- // Setup netty listener
- this.factory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("boss-%d").build()),
- Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("worker-%d").build()),
- (Runtime.getRuntime().availableProcessors() * 2 + 1) * 2);
-
- this.bootstrap = new ServerBootstrap(factory);
- bootstrap.setPipelineFactory(new TSOPipelineFactory(this));
+ // Setup netty listener
+ int workerThreadCount= (Runtime.getRuntime().availableProcessors() * 2 + 1) * 2;
+ ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoserver-boss-%d").build();
+ ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoserver-worker-%d").build();
+ EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreadCount, workerThreadFactory);
+ EventLoopGroup bossGroup = new NioEventLoopGroup(bossThreadFactory);
+
+ this.bootstrap = new ServerBootstrap();
+ bootstrap.group(bossGroup, workerGroup);
+ bootstrap.channel(NioServerSocketChannel.class);
+ bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+ // Max packet length is 10MB. Transactions with so many cells
+ // that the packet is rejected will receive a ServiceUnavailableException.
+ // 10MB is enough for 2 million cells in a transaction though.
+ pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 4, 0, 4));
+ pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+ pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Request.getDefaultInstance()));
+ pipeline.addLast("protobufencoder", new ProtobufEncoder());
+ pipeline.addLast("handler", TSOChannelHandler.this);
+ }
+ });
}
/**
* Allows to create and connect the communication channel closing the previous one if existed
*/
void reconnect() {
- if (listeningChannel == null && channelGroup == null) {
+ if (listeningChannel == null && allChannels == null) {
LOG.debug("Creating communication channel...");
} else {
LOG.debug("Reconnecting communication channel...");
closeConnection();
}
// Create the global ChannelGroup
- channelGroup = new DefaultChannelGroup(TSOChannelHandler.class.getName());
+ allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
LOG.debug("\tCreating channel to listening for incoming connections in port {}", config.getPort());
- listeningChannel = bootstrap.bind(new InetSocketAddress(config.getPort()));
- channelGroup.add(listeningChannel);
+ listeningChannel = bootstrap.bind(new InetSocketAddress(config.getPort())).syncUninterruptibly().channel();
+ allChannels.add(listeningChannel);
LOG.debug("\tListening channel created and connected: {}", listeningChannel);
}
@@ -114,15 +135,10 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
*/
void closeConnection() {
LOG.debug("Closing communication channel...");
- if (listeningChannel != null) {
- LOG.debug("\tUnbinding listening channel {}", listeningChannel);
- listeningChannel.unbind().awaitUninterruptibly();
- LOG.debug("\tListening channel {} unbound", listeningChannel);
- }
- if (channelGroup != null) {
- LOG.debug("\tClosing channel group {}", channelGroup);
- channelGroup.close().awaitUninterruptibly();
- LOG.debug("\tChannel group {} closed", channelGroup);
+ if (allChannels != null) {
+ LOG.debug("\tClosing channel group {}", allChannels);
+ allChannels.close().awaitUninterruptibly();
+ LOG.debug("\tChannel group {} closed", allChannels);
}
}
@@ -131,28 +147,22 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
// ----------------------------------------------------------------------------------------------------------------
@Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- channelGroup.add(ctx.getChannel());
- LOG.debug("TSO channel connected: {}", ctx.getChannel());
- }
-
- @Override
- public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- channelGroup.remove(ctx.getChannel());
- LOG.debug("TSO channel disconnected: {}", ctx.getChannel());
+ public void channelActive(ChannelHandlerContext ctx) {
+ allChannels.add(ctx.channel());
+ LOG.debug("TSO channel active: {}", ctx.channel());
}
@Override
- public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- LOG.debug("TSO channel closed: {}", ctx.getChannel());
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ //ChannelGroup will automatically remove closed Channels
+ LOG.debug("TSO channel inactive: {}", ctx.channel());
}
/**
* Handle received messages
*/
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
- Object msg = e.getMessage();
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof TSOProto.Request) {
TSOProto.Request request = (TSOProto.Request) msg;
if (request.hasHandshakeRequest()) {
@@ -160,28 +170,28 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
return;
}
if (!handshakeCompleted(ctx)) {
- LOG.error("Handshake not completed. Closing channel {}", ctx.getChannel());
- ctx.getChannel().close();
+ LOG.error("Handshake not completed. Closing channel {}", ctx.channel());
+ ctx.channel().close();
}
if (request.hasTimestampRequest()) {
- requestProcessor.timestampRequest(ctx.getChannel(), MonitoringContextFactory.getInstance(config,metrics));
+ requestProcessor.timestampRequest(ctx.channel(), MonitoringContextFactory.getInstance(config,metrics));
} else if (request.hasCommitRequest()) {
TSOProto.CommitRequest cr = request.getCommitRequest();
requestProcessor.commitRequest(cr.getStartTimestamp(),
cr.getCellIdList(),
cr.getTableIdList(),
cr.getIsRetry(),
- ctx.getChannel(),
+ ctx.channel(),
MonitoringContextFactory.getInstance(config,metrics));
} else if (request.hasFenceRequest()) {
TSOProto.FenceRequest fr = request.getFenceRequest();
requestProcessor.fenceRequest(fr.getTableId(),
- ctx.getChannel(),
+ ctx.channel(),
MonitoringContextFactory.getInstance(config,metrics));
} else {
- LOG.error("Invalid request {}. Closing channel {}", request, ctx.getChannel());
- ctx.getChannel().close();
+ LOG.error("Invalid request {}. Closing channel {}", request, ctx.channel());
+ ctx.channel().close();
}
} else {
LOG.error("Unknown message type", msg);
@@ -190,13 +200,13 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- if (e.getCause() instanceof ClosedChannelException) {
- LOG.warn("ClosedChannelException caught. Cause: ", e.getCause());
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ if (cause instanceof ClosedChannelException) {
+ LOG.warn("ClosedChannelException caught. Cause: ", cause);
return;
}
- LOG.warn("Unexpected exception from downstream. Closing channel {} {}", ctx.getChannel(), e.getCause());
- ctx.getChannel().close();
+ LOG.warn("Unexpected exception. Closing channel {}", ctx.channel(), cause);
+ ctx.channel().close();
}
// ----------------------------------------------------------------------------------------------------------------
@@ -204,8 +214,12 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
// ----------------------------------------------------------------------------------------------------------------
@Override
public void close() throws IOException {
- closeConnection();
- factory.releaseExternalResources();
+ LOG.debug("Shutting down communication channel...");
+ bootstrap.config().group().shutdownGracefully();
+ bootstrap.config().childGroup().shutdownGracefully();
+
+ bootstrap.config().group().terminationFuture().awaitUninterruptibly();
+ bootstrap.config().childGroup().terminationFuture().awaitUninterruptibly();
}
// ----------------------------------------------------------------------------------------------------------------
@@ -242,51 +256,22 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
.setServerCapabilities(TSOProto.Capabilities.newBuilder().build());
TSOChannelContext tsoCtx = new TSOChannelContext();
tsoCtx.setHandshakeComplete();
- ctx.setAttachment(tsoCtx);
+ ctx.channel().attr(TSO_CTX).set(tsoCtx);
} else {
response.setClientCompatible(false);
}
response.setLowLatency(config.getLowLatency());
- ctx.getChannel().write(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build());
+ ctx.channel().writeAndFlush(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build());
}
private boolean handshakeCompleted(ChannelHandlerContext ctx) {
- Object o = ctx.getAttachment();
- if (o instanceof TSOChannelContext) {
- TSOChannelContext tsoCtx = (TSOChannelContext) o;
+ TSOChannelContext tsoCtx = ctx.channel().attr(TSO_CTX).get();
+ if (tsoCtx != null) {
return tsoCtx.getHandshakeComplete();
}
return false;
-
- }
-
- /**
- * Netty pipeline configuration
- */
- static class TSOPipelineFactory implements ChannelPipelineFactory {
-
- private final ChannelHandler handler;
-
- TSOPipelineFactory(ChannelHandler handler) {
- this.handler = handler;
- }
-
- public ChannelPipeline getPipeline() throws Exception {
-
- ChannelPipeline pipeline = Channels.pipeline();
- // Max packet length is 10MB. Transactions with so many cells
- // that the packet is rejected will receive a ServiceUnavailableException.
- // 10MB is enough for 2 million cells in a transaction though.
- pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 4, 0, 4));
- pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
- pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Request.getDefaultInstance()));
- pipeline.addLast("protobufencoder", new ProtobufEncoder());
- pipeline.addLast("handler", handler);
-
- return pipeline;
- }
}
}
diff --git a/tso-server/src/test/java/org/apache/omid/tso/ProgrammableTSOServer.java b/tso-server/src/test/java/org/apache/omid/tso/ProgrammableTSOServer.java
index 0e2c9f6..7142ef1 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/ProgrammableTSOServer.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/ProgrammableTSOServer.java
@@ -20,18 +20,27 @@ package org.apache.omid.tso;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.omid.proto.TSOProto;
import org.apache.omid.tso.ProgrammableTSOServer.Response.ResponseType;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,36 +49,51 @@ import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.LinkedList;
import java.util.Queue;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
/**
* Used in tests. Allows to program the set of responses returned by a TSO
*/
-public class ProgrammableTSOServer extends SimpleChannelHandler {
+@Sharable
+public class ProgrammableTSOServer extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(ProgrammableTSOServer.class);
- private ChannelFactory factory;
- private ChannelGroup channelGroup;
+ private ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private Queue<Response> responseQueue = new LinkedList<>();
+ private static final AttributeKey<TSOChannelContext> TSO_CTX =
+ AttributeKey.valueOf("TSO_CTX");
+
@Inject
public ProgrammableTSOServer(int port) {
// Setup netty listener
- factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(new ThreadFactoryBuilder()
- .setNameFormat("boss-%d").build()), Executors.newCachedThreadPool(new ThreadFactoryBuilder()
- .setNameFormat("worker-%d").build()), (Runtime.getRuntime().availableProcessors() * 2 + 1) * 2);
-
- // Create the global ChannelGroup
- channelGroup = new DefaultChannelGroup(ProgrammableTSOServer.class.getName());
- ServerBootstrap bootstrap = new ServerBootstrap(factory);
- bootstrap.setPipelineFactory(new TSOChannelHandler.TSOPipelineFactory(this));
+ int workerThreadCount = (Runtime.getRuntime().availableProcessors() * 2 + 1) * 2;
+ ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoserver-boss-%d").build();
+ ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoserver-worker-%d").build();
+ EventLoopGroup workerGroup = new NioEventLoopGroup(workerThreadCount, workerThreadFactory);
+ EventLoopGroup bossGroup = new NioEventLoopGroup(bossThreadFactory);
+
+ ServerBootstrap bootstrap = new ServerBootstrap();
+ bootstrap.group(bossGroup, workerGroup);
+ bootstrap.channel(NioServerSocketChannel.class);
+ bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(10 * 1024 * 1024, 0, 4, 0, 4));
+ pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+ pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Request.getDefaultInstance()));
+ pipeline.addLast("protobufencoder", new ProtobufEncoder());
+ pipeline.addLast("handler", ProgrammableTSOServer.this);
+ }
+ });
// Add the parent channel to the group
- Channel channel = bootstrap.bind(new InetSocketAddress(port));
- channelGroup.add(channel);
+ Channel channel = bootstrap.bind(new InetSocketAddress(port)).syncUninterruptibly().channel();
+ allChannels.add(channel);
LOG.info("********** Dumb TSO Server running on port {} **********", port);
}
@@ -96,24 +120,18 @@ public class ProgrammableTSOServer extends SimpleChannelHandler {
// ******************** End of Main interface for tests *******************
@Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- channelGroup.add(ctx.getChannel());
- }
-
- @Override
- public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- channelGroup.remove(ctx.getChannel());
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ allChannels.add(ctx.channel());
}
/**
* Handle received messages
*/
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
- Object msg = e.getMessage();
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof TSOProto.Request) {
TSOProto.Request request = (TSOProto.Request) msg;
- Channel channel = ctx.getChannel();
+ Channel channel = ctx.channel();
if (request.hasHandshakeRequest()) {
checkHandshake(ctx, request.getHandshakeRequest());
return;
@@ -148,7 +166,7 @@ public class ProgrammableTSOServer extends SimpleChannelHandler {
}
} else {
LOG.error("Invalid request {}", request);
- ctx.getChannel().close();
+ ctx.channel().close();
}
} else {
LOG.error("Unknown message type", msg);
@@ -156,12 +174,12 @@ public class ProgrammableTSOServer extends SimpleChannelHandler {
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- if (e.getCause() instanceof ClosedChannelException) {
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ if (cause instanceof ClosedChannelException) {
return;
}
- LOG.warn("TSOHandler: Unexpected exception from downstream.", e.getCause());
- Channels.close(e.getChannel());
+ LOG.warn("TSOHandler: Unexpected exception.", cause);
+ ctx.channel().close();
}
private void checkHandshake(final ChannelHandlerContext ctx, TSOProto.HandshakeRequest request) {
@@ -171,15 +189,15 @@ public class ProgrammableTSOServer extends SimpleChannelHandler {
response.setClientCompatible(true).setServerCapabilities(TSOProto.Capabilities.newBuilder().build());
TSOChannelContext tsoCtx = new TSOChannelContext();
tsoCtx.setHandshakeComplete();
- ctx.setAttachment(tsoCtx);
+ ctx.channel().attr(TSO_CTX).set(tsoCtx);
} else {
response.setClientCompatible(false);
}
- ctx.getChannel().write(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build());
+ ctx.channel().writeAndFlush(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build());
}
private boolean handshakeCompleted(ChannelHandlerContext ctx) {
- Object o = ctx.getAttachment();
+ Object o = ctx.channel().attr(TSO_CTX).get();
if (o instanceof TSOChannelContext) {
TSOChannelContext tsoCtx = (TSOChannelContext) o;
return tsoCtx.getHandshakeComplete();
@@ -192,7 +210,7 @@ public class ProgrammableTSOServer extends SimpleChannelHandler {
TSOProto.TimestampResponse.Builder respBuilder = TSOProto.TimestampResponse.newBuilder();
respBuilder.setStartTimestamp(startTimestamp);
builder.setTimestampResponse(respBuilder.build());
- c.write(builder.build());
+ c.writeAndFlush(builder.build());
}
private void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c) {
@@ -200,7 +218,7 @@ public class ProgrammableTSOServer extends SimpleChannelHandler {
TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
commitBuilder.setAborted(false).setStartTimestamp(startTimestamp).setCommitTimestamp(commitTimestamp);
builder.setCommitResponse(commitBuilder.build());
- c.write(builder.build());
+ c.writeAndFlush(builder.build());
}
private void sendAbortResponse(long startTimestamp, Channel c) {
@@ -208,7 +226,7 @@ public class ProgrammableTSOServer extends SimpleChannelHandler {
TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
commitBuilder.setAborted(true).setStartTimestamp(startTimestamp);
builder.setCommitResponse(commitBuilder.build());
- c.write(builder.build());
+ c.writeAndFlush(builder.build());
}
private static class TSOChannelContext {
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index 3e35f93..da3351c 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -19,7 +19,7 @@ package org.apache.omid.tso;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.apache.commons.pool2.PooledObject;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
import org.mockito.Mock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
index be15c62..e98336e 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
@@ -22,7 +22,7 @@ import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.metrics.NullMetricsProvider;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
index c99dd25..02a6790 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
@@ -22,7 +22,7 @@ import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
index 977f630..d1c5ade 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
@@ -23,7 +23,7 @@ import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.tso.ReplyProcessorImpl.ReplyBatchEvent;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
index 8f7a5a8..e8c621f 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -24,7 +24,7 @@ import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableF
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.metrics.NullMetricsProvider;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
index 1b84b31..370b27b 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
@@ -25,7 +25,7 @@ import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
import org.apache.omid.committable.InMemoryCommitTable;
import org.apache.omid.metrics.MetricsRegistry;
-import org.jboss.netty.channel.Channel;
+import io.netty.channel.Channel;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
index 88988e1..cae9c85 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
@@ -21,21 +21,25 @@ import org.apache.omid.NetworkUtils;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.proto.TSOProto;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelException;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelException;
+import io.netty.channel.ChannelFactory;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
@@ -48,6 +52,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.any;
@@ -92,45 +97,45 @@ public class TestTSOChannelHandlerNetty {
try {
// Check initial state
assertNull(channelHandler.listeningChannel);
- assertNull(channelHandler.channelGroup);
+ assertNull(channelHandler.allChannels);
// Check initial connection
channelHandler.reconnect();
assertTrue(channelHandler.listeningChannel.isOpen());
- assertEquals(channelHandler.channelGroup.size(), 1);
- assertEquals(((InetSocketAddress) channelHandler.listeningChannel.getLocalAddress()).getPort(), port);
+ assertEquals(channelHandler.allChannels.size(), 1);
+ assertEquals(((InetSocketAddress) channelHandler.listeningChannel.localAddress()).getPort(), port);
// Check connection close
channelHandler.closeConnection();
assertFalse(channelHandler.listeningChannel.isOpen());
- assertEquals(channelHandler.channelGroup.size(), 0);
+ assertEquals(channelHandler.allChannels.size(), 0);
// Check re-closing connection
channelHandler.closeConnection();
assertFalse(channelHandler.listeningChannel.isOpen());
- assertEquals(channelHandler.channelGroup.size(), 0);
+ assertEquals(channelHandler.allChannels.size(), 0);
// Check connection after closing
channelHandler.reconnect();
assertTrue(channelHandler.listeningChannel.isOpen());
- assertEquals(channelHandler.channelGroup.size(), 1);
+ assertEquals(channelHandler.allChannels.size(), 1);
// Check re-connection
channelHandler.reconnect();
assertTrue(channelHandler.listeningChannel.isOpen());
- assertEquals(channelHandler.channelGroup.size(), 1);
+ assertEquals(channelHandler.allChannels.size(), 1);
// Exercise closeable with re-connection trial
channelHandler.close();
assertFalse(channelHandler.listeningChannel.isOpen());
- assertEquals(channelHandler.channelGroup.size(), 0);
+ assertEquals(channelHandler.allChannels.size(), 0);
try {
channelHandler.reconnect();
fail("Can't reconnect after closing");
- } catch (ChannelException e) {
+ } catch (Exception e) {
// Expected: Can't reconnect after closing
assertFalse(channelHandler.listeningChannel.isOpen());
- assertEquals(channelHandler.channelGroup.size(), 0);
+ assertEquals(channelHandler.allChannels.size(), 0);
}
} finally {
if(channelHandler != null) channelHandler.close();
@@ -142,7 +147,7 @@ public class TestTSOChannelHandlerNetty {
int port = NetworkUtils.getFreePort();
TSOChannelHandler channelHandler = getTSOChannelHandler(port);
try {
- ClientBootstrap nettyClient = createNettyClientBootstrap();
+ Bootstrap nettyClient = createNettyClientBootstrap();
ChannelFuture channelF = nettyClient.connect(new InetSocketAddress("localhost", port));
@@ -158,7 +163,7 @@ public class TestTSOChannelHandlerNetty {
channelHandler.reconnect();
assertTrue(channelHandler.listeningChannel.isOpen());
// Eventually the channel group of the server should contain the listening channel
- assertEquals(channelHandler.channelGroup.size(), 1);
+ assertEquals(channelHandler.allChannels.size(), 1);
// ------------------------------------------------------------------------------------------------------------
// Test that a client can connect now
@@ -166,16 +171,16 @@ public class TestTSOChannelHandlerNetty {
channelF = nettyClient.connect(new InetSocketAddress("localhost", port));
while (!channelF.isDone()) /** do nothing */ ;
assertTrue(channelF.isSuccess());
- assertTrue(channelF.getChannel().isConnected());
+ assertTrue(channelF.channel().isActive());
// Eventually the channel group of the server should have 2 elements
- while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+ while (channelHandler.allChannels.size() != 2) /** do nothing */ ;
// ------------------------------------------------------------------------------------------------------------
// Close the channel on the client side and test we have one element less in the channel group
// ------------------------------------------------------------------------------------------------------------
- channelF.getChannel().close().await();
+ channelF.channel().close().await();
// Eventually the channel group of the server should have only one element
- while (channelHandler.channelGroup.size() != 1) /** do nothing */ ;
+ while (channelHandler.allChannels.size() != 1) /** do nothing */ ;
// ------------------------------------------------------------------------------------------------------------
// Open a new channel and test the connection closing on the server side through the channel handler
@@ -184,13 +189,13 @@ public class TestTSOChannelHandlerNetty {
while (!channelF.isDone()) /** do nothing */ ;
assertTrue(channelF.isSuccess());
// Eventually the channel group of the server should have 2 elements again
- while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+ while (channelHandler.allChannels.size() != 2) /** do nothing */ ;
channelHandler.closeConnection();
assertFalse(channelHandler.listeningChannel.isOpen());
- assertEquals(channelHandler.channelGroup.size(), 0);
+ assertEquals(channelHandler.allChannels.size(), 0);
// Wait some time and check the channel was closed
TimeUnit.SECONDS.sleep(1);
- assertFalse(channelF.getChannel().isOpen());
+ assertFalse(channelF.channel().isOpen());
// ------------------------------------------------------------------------------------------------------------
// Test server re-connections with connected clients
@@ -199,28 +204,28 @@ public class TestTSOChannelHandlerNetty {
channelHandler.reconnect();
assertTrue(channelHandler.listeningChannel.isOpen());
// Eventually the channel group of the server should contain the listening channel
- assertEquals(channelHandler.channelGroup.size(), 1);
+ assertEquals(channelHandler.allChannels.size(), 1);
// Check the client can connect
channelF = nettyClient.connect(new InetSocketAddress("localhost", port));
while (!channelF.isDone()) /** do nothing */ ;
assertTrue(channelF.isSuccess());
// Eventually the channel group of the server should have 2 elements
- while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+ while (channelHandler.allChannels.size() != 2) /** do nothing */ ;
// Re-connect and check that client connection was gone
channelHandler.reconnect();
assertTrue(channelHandler.listeningChannel.isOpen());
// Eventually the channel group of the server should contain the listening channel
- assertEquals(channelHandler.channelGroup.size(), 1);
+ assertEquals(channelHandler.allChannels.size(), 1);
// Wait some time and check the channel was closed
TimeUnit.SECONDS.sleep(1);
- assertFalse(channelF.getChannel().isOpen());
+ assertFalse(channelF.channel().isOpen());
// ------------------------------------------------------------------------------------------------------------
// Test closeable interface with re-connection trial
// ------------------------------------------------------------------------------------------------------------
channelHandler.close();
assertFalse(channelHandler.listeningChannel.isOpen());
- assertEquals(channelHandler.channelGroup.size(), 0);
+ assertEquals(channelHandler.allChannels.size(), 0);
} finally {
if (channelHandler != null) channelHandler.close();
}
@@ -239,20 +244,20 @@ public class TestTSOChannelHandlerNetty {
// Connect channel handler
channelHandler.reconnect();
// Create client and connect it
- ClientBootstrap nettyClient = createNettyClientBootstrap();
+ Bootstrap nettyClient = createNettyClientBootstrap();
ChannelFuture channelF = nettyClient.connect(new InetSocketAddress("localhost", port));
// Basic checks for connection
while (!channelF.isDone()) /** do nothing */ ;
assertTrue(channelF.isSuccess());
- assertTrue(channelF.getChannel().isConnected());
- Channel channel = channelF.getChannel();
+ assertTrue(channelF.channel().isActive());
+ Channel channel = channelF.channel();
// Eventually the channel group of the server should have 2 elements
- while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+ while (channelHandler.allChannels.size() != 2) /** do nothing */ ;
// Write first handshake request
TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
// NOTE: Add here the required handshake capabilities when necessary
handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
- channelF.getChannel().write(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
+ channelF.channel().writeAndFlush(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
// ------------------------------------------------------------------------------------------------------------
// Test channel writing
@@ -275,7 +280,7 @@ public class TestTSOChannelHandlerNetty {
TSOProto.TimestampRequest.Builder tsRequestBuilder = TSOProto.TimestampRequest.newBuilder();
tsBuilder.setTimestampRequest(tsRequestBuilder.build());
// Write into the channel
- channel.write(tsBuilder.build()).await();
+ channel.writeAndFlush(tsBuilder.build()).await();
verify(requestProcessor, timeout(100).times(1)).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
verify(requestProcessor, timeout(100).times(0))
.commitRequest(anyLong(), anyCollectionOf(Long.class), anyCollectionOf(Long.class), anyBoolean(), any(Channel.class), any(MonitoringContextImpl.class));
@@ -292,7 +297,7 @@ public class TestTSOChannelHandlerNetty {
TSOProto.Request r = commitBuilder.build();
assertTrue(r.hasCommitRequest());
// Write into the channel
- channel.write(commitBuilder.build()).await();
+ channel.writeAndFlush(commitBuilder.build()).await();
verify(requestProcessor, timeout(100).times(0)).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
verify(requestProcessor, timeout(100).times(1))
.commitRequest(eq(666L), anyCollectionOf(Long.class), anyCollectionOf(Long.class), eq(false), any(Channel.class), any(MonitoringContextImpl.class));
@@ -308,7 +313,7 @@ public class TestTSOChannelHandlerNetty {
TSOProto.Request r = fenceBuilder.build();
assertTrue(r.hasFenceRequest());
// Write into the channel
- channel.write(fenceBuilder.build()).await();
+ channel.writeAndFlush(fenceBuilder.build()).await();
verify(requestProcessor, timeout(100).times(0)).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
verify(requestProcessor, timeout(100).times(1))
.fenceRequest(eq(666L), any(Channel.class), any(MonitoringContextImpl.class));
@@ -318,37 +323,43 @@ public class TestTSOChannelHandlerNetty {
// Helper methods
// ----------------------------------------------------------------------------------------------------------------
- private ClientBootstrap createNettyClientBootstrap() {
-
- ChannelFactory factory = new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("client-boss-%d").build()),
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("client-worker-%d").build()), 1);
- // Create the bootstrap
- ClientBootstrap bootstrap = new ClientBootstrap(factory);
- bootstrap.setOption("tcpNoDelay", true);
- bootstrap.setOption("keepAlive", true);
- bootstrap.setOption("reuseAddress", true);
- bootstrap.setOption("connectTimeoutMillis", 100);
- ChannelPipeline pipeline = bootstrap.getPipeline();
- pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
- pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
- pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
- pipeline.addLast("protobufencoder", new ProtobufEncoder());
- pipeline.addLast("handler", new SimpleChannelHandler() {
+ private Bootstrap createNettyClientBootstrap() {
- @Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
- LOG.info("Channel {} connected", ctx.getChannel());
- }
+ ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build();
+ EventLoopGroup workerGroup = new NioEventLoopGroup(1, workerThreadFactory);
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.option(ChannelOption.TCP_NODELAY, true);
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+ bootstrap.option(ChannelOption.SO_REUSEADDR, true);
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100);
+
+ bootstrap.group(workerGroup);
+ bootstrap.channel(NioSocketChannel.class);
+ bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
- LOG.error("Error on channel {}", ctx.getChannel(), e.getCause());
+ public void initChannel(SocketChannel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
+ pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+ pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
+ pipeline.addLast("protobufencoder", new ProtobufEncoder());
+ pipeline.addLast("testhandler", new ChannelInboundHandlerAdapter() {
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) {
+ LOG.info("Channel {} active", ctx.channel());
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ LOG.error("Error on channel {}", ctx.channel(), cause);
+ }
+
+ });
}
-
});
+
return bootstrap;
}
}
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java b/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java
index d40c26c..03f1609 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TSOClientRaw.java
@@ -21,21 +21,24 @@ import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableF
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.omid.proto.TSOProto;
import org.apache.omid.proto.TSOProto.Response;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFactory;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +48,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
/**
* Raw client for communicating with tso server directly with protobuf messages
@@ -58,39 +62,39 @@ public class TSOClientRaw {
private final Channel channel;
public TSOClientRaw(String host, int port) throws InterruptedException, ExecutionException {
- // Start client with Nb of active threads = 3 as maximum.
- ChannelFactory factory = new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()),
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), 3);
- // Create the bootstrap
- ClientBootstrap bootstrap = new ClientBootstrap(factory);
InetSocketAddress addr = new InetSocketAddress(host, port);
- ChannelPipeline pipeline = bootstrap.getPipeline();
- pipeline.addLast("lengthbaseddecoder",
- new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
- pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
- pipeline.addLast("protobufdecoder",
- new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
- pipeline.addLast("protobufencoder", new ProtobufEncoder());
-
- Handler handler = new Handler();
- pipeline.addLast("handler", handler);
-
- bootstrap.setOption("tcpNoDelay", true);
- bootstrap.setOption("keepAlive", true);
- bootstrap.setOption("reuseAddress", true);
- bootstrap.setOption("connectTimeoutMillis", 100);
+ // Start client with Nb of active threads = 3
+ ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build();
+ EventLoopGroup workerGroup = new NioEventLoopGroup(3, workerThreadFactory);
+
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.group(workerGroup);
+ bootstrap.channel(NioSocketChannel.class);
+ bootstrap.handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
+ pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+ pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
+ pipeline.addLast("protobufencoder", new ProtobufEncoder());
+ pipeline.addLast("rawHandler", new RawHandler());
+ }
+ });
+ bootstrap.option(ChannelOption.TCP_NODELAY, true);
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+ bootstrap.option(ChannelOption.SO_REUSEADDR, true);
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100);
ChannelFuture channelFuture = bootstrap.connect(addr).await();
- channel = channelFuture.getChannel();
+ channel = channelFuture.channel();
+
}
public void write(TSOProto.Request request) {
- channel.write(request);
+ channel.writeAndFlush(request);
}
public Future<Response> getResponse() throws InterruptedException {
@@ -104,12 +108,12 @@ public class TSOClientRaw {
channel.close();
}
- private class Handler extends SimpleChannelHandler {
+ private class RawHandler extends ChannelInboundHandlerAdapter {
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
- LOG.info("Message received", e);
- if (e.getMessage() instanceof Response) {
- Response resp = (Response) e.getMessage();
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ LOG.info("Message received", msg);
+ if (msg instanceof Response) {
+ Response resp = (Response) msg;
try {
SettableFuture<Response> future = responseQueue.take();
future.set(resp);
@@ -118,16 +122,16 @@ public class TSOClientRaw {
LOG.warn("Interrupted in handler", ie);
}
} else {
- LOG.warn("Received unknown message", e.getMessage());
+ LOG.warn("Received unknown message", msg);
}
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- LOG.info("Exception received", e.getCause());
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ LOG.info("Exception received", cause);
try {
SettableFuture<Response> future = responseQueue.take();
- future.setException(e.getCause());
+ future.setException(cause);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted handling exception", ie);
@@ -135,9 +139,9 @@ public class TSOClientRaw {
}
@Override
- public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
+ public void channelInactive(ChannelHandlerContext ctx)
throws Exception {
- LOG.info("Disconnected");
+ LOG.info("Inactive");
try {
SettableFuture<Response> future = responseQueue.take();
future.setException(new ConnectionException());