You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2014/05/03 01:13:01 UTC
[16/16] git commit: ACCUMULO-1691 Support Thrift 0.9.1
ACCUMULO-1691 Support Thrift 0.9.1
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/debd8365
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/debd8365
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/debd8365
Branch: refs/heads/ACCUMULO-1691
Commit: debd8365ea7370da88e3fff76127302d2bfaf0a6
Parents: 0616258
Author: Christopher Tubbs <ct...@apache.org>
Authored: Fri May 2 16:33:24 2014 -0400
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Fri May 2 19:05:32 2014 -0400
----------------------------------------------------------------------
pom.xml | 2 +-
.../server/util/CustomNonBlockingServer.java | 257 +++++++++++++++++++
.../accumulo/server/util/TServerUtils.java | 40 +--
3 files changed, 262 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/debd8365/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 43aa5fb..b314ff0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -379,7 +379,7 @@
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
- <version>0.9.0</version>
+ <version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
http://git-wip-us.apache.org/repos/asf/accumulo/blob/debd8365/server/base/src/main/java/org/apache/accumulo/server/util/CustomNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/util/CustomNonBlockingServer.java
new file mode 100644
index 0000000..f5511bc
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/CustomNonBlockingServer.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.channels.SelectionKey;
+import java.util.Iterator;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * This class implements a custom non-blocking thrift server, incorporating the {@link THsHaServer} features, and overriding the underlying
+ * {@link TNonblockingServer} methods, especially {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread}, in order to override the
+ * {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer} with one that reveals the client address from its transport. See
+ * https://issues.apache.org/jira/browse/ACCUMULO-1691
+ *
+ * <p>
+ * This class contains a copy of {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread} from Thrift 0.9.1, with the slight modification of
+ * instantiating a CustomFrameBuffer, rather than the {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer}
+ */
+public class CustomNonBlockingServer extends THsHaServer {
+
+ private static final Logger LOGGER = Logger.getLogger(CustomNonBlockingServer.class);
+ private SelectAcceptThread selectAcceptThread_;
+ private volatile boolean stopped_ = false;
+
+ public CustomNonBlockingServer(Args args) {
+ super(args);
+ }
+
+ @Override
+ protected Runnable getRunnable(final FrameBuffer frameBuffer) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ if (frameBuffer instanceof CustomNonblockingFrameBuffer) {
+ TNonblockingTransport trans = ((CustomNonblockingFrameBuffer) frameBuffer).getTransport();
+ if (trans instanceof TNonblockingSocket) {
+ TNonblockingSocket tsock = (TNonblockingSocket) trans;
+ Socket sock = tsock.getSocketChannel().socket();
+ TServerUtils.clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
+ }
+ }
+ frameBuffer.invoke();
+ }
+ };
+ }
+
+ @Override
+ protected boolean startThreads() {
+ // start the selector
+ try {
+ selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport) serverTransport_);
+ selectAcceptThread_.start();
+ return true;
+ } catch (IOException e) {
+ LOGGER.error("Failed to start selector thread!", e);
+ return false;
+ }
+ }
+
+ @Override
+ public void stop() {
+ stopped_ = true;
+ if (selectAcceptThread_ != null) {
+ selectAcceptThread_.wakeupSelector();
+ }
+ }
+
+ @Override
+ public boolean isStopped() {
+ return selectAcceptThread_.isStopped();
+ }
+
+ @Override
+ protected void joinSelector() {
+ // wait until the selector thread exits
+ try {
+ selectAcceptThread_.join();
+ } catch (InterruptedException e) {
+ // for now, just silently ignore. technically this means we'll have less of
+ // a graceful shutdown as a result.
+ }
+ }
+
+ private interface CustomNonblockingFrameBuffer {
+ TNonblockingTransport getTransport();
+ }
+
+ private class CustomAsyncFrameBuffer extends AsyncFrameBuffer implements CustomNonblockingFrameBuffer {
+ private TNonblockingTransport trans;
+
+ public CustomAsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
+ super(trans, selectionKey, selectThread);
+ this.trans = trans;
+ }
+
+ @Override
+ public TNonblockingTransport getTransport() {
+ return trans;
+ }
+ }
+
+
+ private class CustomFrameBuffer extends FrameBuffer implements CustomNonblockingFrameBuffer {
+ private TNonblockingTransport trans;
+
+ public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
+ super(trans, selectionKey, selectThread);
+ this.trans = trans;
+ }
+
+ @Override
+ public TNonblockingTransport getTransport() {
+ return trans;
+ }
+ }
+
+ // @formatter:off
+ private class SelectAcceptThread extends AbstractSelectThread {
+
+ // The server transport on which new client transports will be accepted
+ private final TNonblockingServerTransport serverTransport;
+
+ /**
+ * Set up the thread that will handle the non-blocking accepts, reads, and
+ * writes.
+ */
+ public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
+ throws IOException {
+ this.serverTransport = serverTransport;
+ serverTransport.registerSelector(selector);
+ }
+
+ public boolean isStopped() {
+ return stopped_;
+ }
+
+ /**
+ * The work loop. Handles both selecting (all IO operations) and managing
+ * the selection preferences of all existing connections.
+ */
+ @Override
+ public void run() {
+ try {
+ if (eventHandler_ != null) {
+ eventHandler_.preServe();
+ }
+
+ while (!stopped_) {
+ select();
+ processInterestChanges();
+ }
+ for (SelectionKey selectionKey : selector.keys()) {
+ cleanupSelectionKey(selectionKey);
+ }
+ } catch (Throwable t) {
+ LOGGER.error("run() exiting due to uncaught error", t);
+ } finally {
+ stopped_ = true;
+ }
+ }
+
+ /**
+ * Select and process IO events appropriately:
+ * If there are connections to be accepted, accept them.
+ * If there are existing connections with data waiting to be read, read it,
+ * buffering until a whole frame has been read.
+ * If there are any pending responses, buffer them until their target client
+ * is available, and then send the data.
+ */
+ private void select() {
+ try {
+ // wait for io events.
+ selector.select();
+
+ // process the io events we received
+ Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
+ while (!stopped_ && selectedKeys.hasNext()) {
+ SelectionKey key = selectedKeys.next();
+ selectedKeys.remove();
+
+ // skip if not valid
+ if (!key.isValid()) {
+ cleanupSelectionKey(key);
+ continue;
+ }
+
+ // if the key is marked Accept, then it has to be the server
+ // transport.
+ if (key.isAcceptable()) {
+ handleAccept();
+ } else if (key.isReadable()) {
+ // deal with reads
+ handleRead(key);
+ } else if (key.isWritable()) {
+ // deal with writes
+ handleWrite(key);
+ } else {
+ LOGGER.warn("Unexpected state in select! " + key.interestOps());
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Got an IOException while selecting!", e);
+ }
+ }
+
+ /**
+ * Accept a new connection.
+ */
+ @SuppressWarnings("unused")
+ private void handleAccept() throws IOException {
+ SelectionKey clientKey = null;
+ TNonblockingTransport client = null;
+ try {
+ // accept the connection
+ client = (TNonblockingTransport)serverTransport.accept();
+ clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
+
+ // add this key to the map
+ FrameBuffer frameBuffer = processorFactory_.isAsyncProcessor() ?
+ new CustomAsyncFrameBuffer(client, clientKey,SelectAcceptThread.this) :
+ new CustomFrameBuffer(client, clientKey,SelectAcceptThread.this);
+
+ clientKey.attach(frameBuffer);
+ } catch (TTransportException tte) {
+ // something went wrong accepting.
+ LOGGER.warn("Exception trying to accept!", tte);
+ tte.printStackTrace();
+ if (clientKey != null) cleanupSelectionKey(clientKey);
+ if (client != null) client.close();
+ }
+ }
+ } // SelectAcceptThread
+ // @formatter:on
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/debd8365/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
index 6d9e4c7..f51d003 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
@@ -22,7 +22,6 @@ import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
-import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
import java.util.Random;
@@ -47,7 +46,6 @@ import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
@@ -203,40 +201,10 @@ public class TServerUtils {
}
}
- public static class THsHaServer extends org.apache.thrift.server.THsHaServer {
- public THsHaServer(Args args) {
- super(args);
- }
-
- @Override
- protected Runnable getRunnable(FrameBuffer frameBuffer) {
- return new Invocation(frameBuffer);
- }
-
- private class Invocation implements Runnable {
-
- private final FrameBuffer frameBuffer;
-
- public Invocation(final FrameBuffer frameBuffer) {
- this.frameBuffer = frameBuffer;
- }
-
- @Override
- public void run() {
- if (frameBuffer.trans_ instanceof TNonblockingSocket) {
- TNonblockingSocket tsock = (TNonblockingSocket) frameBuffer.trans_;
- Socket sock = tsock.getSocketChannel().socket();
- clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
- }
- frameBuffer.invoke();
- }
- }
- }
-
- public static ServerAddress createHsHaServer(HostAndPort address, TProcessor processor, final String serverName, String threadName, final int numThreads,
+ public static ServerAddress creatNonBlockingServer(HostAndPort address, TProcessor processor, final String serverName, String threadName, final int numThreads,
long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort()));
- THsHaServer.Args options = new THsHaServer.Args(transport);
+ CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport);
options.protocolFactory(ThriftUtil.protocolFactory());
options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
options.maxReadBufferBytes = maxMessageSize;
@@ -274,7 +242,7 @@ public class TServerUtils {
if (address.getPort() == 0) {
address = HostAndPort.fromParts(address.getHostText(), transport.getPort());
}
- return new ServerAddress(new THsHaServer(options), address);
+ return new ServerAddress(new CustomNonBlockingServer(options), address);
}
public static ServerAddress createThreadPoolServer(HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads)
@@ -329,7 +297,7 @@ public class TServerUtils {
if (sslParams != null) {
serverAddress = createSslThreadPoolServer(address, processor, sslSocketTimeout, sslParams);
} else {
- serverAddress = createHsHaServer(address, processor, serverName, threadName, numThreads, timeBetweenThreadChecks, maxMessageSize);
+ serverAddress = creatNonBlockingServer(address, processor, serverName, threadName, numThreads, timeBetweenThreadChecks, maxMessageSize);
}
final TServer finalServer = serverAddress.server;
Runnable serveTask = new Runnable() {