You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2014/05/03 01:13:01 UTC

[16/16] git commit: ACCUMULO-1691 Support Thrift 0.9.1

ACCUMULO-1691 Support Thrift 0.9.1


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/debd8365
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/debd8365
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/debd8365

Branch: refs/heads/ACCUMULO-1691
Commit: debd8365ea7370da88e3fff76127302d2bfaf0a6
Parents: 0616258
Author: Christopher Tubbs <ct...@apache.org>
Authored: Fri May 2 16:33:24 2014 -0400
Committer: Christopher Tubbs <ct...@apache.org>
Committed: Fri May 2 19:05:32 2014 -0400

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 .../server/util/CustomNonBlockingServer.java    | 257 +++++++++++++++++++
 .../accumulo/server/util/TServerUtils.java      |  40 +--
 3 files changed, 262 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/debd8365/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 43aa5fb..b314ff0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -379,7 +379,7 @@
       <dependency>
         <groupId>org.apache.thrift</groupId>
         <artifactId>libthrift</artifactId>
-        <version>0.9.0</version>
+        <version>0.9.1</version>
       </dependency>
       <dependency>
         <groupId>org.apache.zookeeper</groupId>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/debd8365/server/base/src/main/java/org/apache/accumulo/server/util/CustomNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/util/CustomNonBlockingServer.java
new file mode 100644
index 0000000..f5511bc
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/CustomNonBlockingServer.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.channels.SelectionKey;
+import java.util.Iterator;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
+
+/**
+ * This class implements a custom non-blocking thrift server, incorporating the {@link THsHaServer} features, and overriding the underlying
+ * {@link TNonblockingServer} methods, especially {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread}, in order to override the
+ * {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer} with one that reveals the client address from its transport. See
+ * https://issues.apache.org/jira/browse/ACCUMULO-1691
+ * 
+ * <p>
+ * This class contains a copy of {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread} from Thrift 0.9.1, with the slight modification of
+ * instantiating a CustomFrameBuffer, rather than the {@link org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer}
+ */
+public class CustomNonBlockingServer extends THsHaServer {
+
+  private static final Logger LOGGER = Logger.getLogger(CustomNonBlockingServer.class);
+  private SelectAcceptThread selectAcceptThread_;
+  private volatile boolean stopped_ = false;
+
+  public CustomNonBlockingServer(Args args) {
+    super(args);
+  }
+
+  @Override
+  protected Runnable getRunnable(final FrameBuffer frameBuffer) {
+    return new Runnable() {
+      @Override
+      public void run() {
+        if (frameBuffer instanceof CustomNonblockingFrameBuffer) {
+          TNonblockingTransport trans = ((CustomNonblockingFrameBuffer) frameBuffer).getTransport();
+          if (trans instanceof TNonblockingSocket) {
+            TNonblockingSocket tsock = (TNonblockingSocket) trans;
+            Socket sock = tsock.getSocketChannel().socket();
+            TServerUtils.clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
+          }
+        }
+        frameBuffer.invoke();
+      }
+    };
+  }
+
+  @Override
+  protected boolean startThreads() {
+    // start the selector
+    try {
+      selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport) serverTransport_);
+      selectAcceptThread_.start();
+      return true;
+    } catch (IOException e) {
+      LOGGER.error("Failed to start selector thread!", e);
+      return false;
+    }
+  }
+
+  @Override
+  public void stop() {
+    stopped_ = true;
+    if (selectAcceptThread_ != null) {
+      selectAcceptThread_.wakeupSelector();
+    }
+  }
+
+  @Override
+  public boolean isStopped() {
+    return selectAcceptThread_.isStopped();
+  }
+
+  @Override
+  protected void joinSelector() {
+    // wait until the selector thread exits
+    try {
+      selectAcceptThread_.join();
+    } catch (InterruptedException e) {
+      // for now, just silently ignore. technically this means we'll have less of
+      // a graceful shutdown as a result.
+    }
+  }
+
+  private interface CustomNonblockingFrameBuffer {
+    TNonblockingTransport getTransport();
+  }
+
+  private class CustomAsyncFrameBuffer extends AsyncFrameBuffer implements CustomNonblockingFrameBuffer {
+    private TNonblockingTransport trans;
+
+    public CustomAsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
+      super(trans, selectionKey, selectThread);
+      this.trans = trans;
+    }
+
+    @Override
+    public TNonblockingTransport getTransport() {
+      return trans;
+    }
+  }
+
+
+  private class CustomFrameBuffer extends FrameBuffer implements CustomNonblockingFrameBuffer {
+    private TNonblockingTransport trans;
+
+    public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
+      super(trans, selectionKey, selectThread);
+      this.trans = trans;
+    }
+
+    @Override
+    public TNonblockingTransport getTransport() {
+      return trans;
+    }
+  }
+
+  // @formatter:off
+  private class SelectAcceptThread extends AbstractSelectThread {
+
+    // The server transport on which new client transports will be accepted
+    private final TNonblockingServerTransport serverTransport;
+
+    /**
+     * Set up the thread that will handle the non-blocking accepts, reads, and
+     * writes.
+     */
+    public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
+    throws IOException {
+      this.serverTransport = serverTransport;
+      serverTransport.registerSelector(selector);
+    }
+
+    public boolean isStopped() {
+      return stopped_;
+    }
+
+    /**
+     * The work loop. Handles both selecting (all IO operations) and managing
+     * the selection preferences of all existing connections.
+     */
+    @Override
+    public void run() {
+      try {
+        if (eventHandler_ != null) {
+          eventHandler_.preServe();
+        }
+
+        while (!stopped_) {
+          select();
+          processInterestChanges();
+        }
+        for (SelectionKey selectionKey : selector.keys()) {
+          cleanupSelectionKey(selectionKey);
+        }
+      } catch (Throwable t) {
+        LOGGER.error("run() exiting due to uncaught error", t);
+      } finally {
+        stopped_ = true;
+      }
+    }
+
+    /**
+     * Select and process IO events appropriately:
+     * If there are connections to be accepted, accept them.
+     * If there are existing connections with data waiting to be read, read it,
+     * buffering until a whole frame has been read.
+     * If there are any pending responses, buffer them until their target client
+     * is available, and then send the data.
+     */
+    private void select() {
+      try {
+        // wait for io events.
+        selector.select();
+
+        // process the io events we received
+        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
+        while (!stopped_ && selectedKeys.hasNext()) {
+          SelectionKey key = selectedKeys.next();
+          selectedKeys.remove();
+
+          // skip if not valid
+          if (!key.isValid()) {
+            cleanupSelectionKey(key);
+            continue;
+          }
+
+          // if the key is marked Accept, then it has to be the server
+          // transport.
+          if (key.isAcceptable()) {
+            handleAccept();
+          } else if (key.isReadable()) {
+            // deal with reads
+            handleRead(key);
+          } else if (key.isWritable()) {
+            // deal with writes
+            handleWrite(key);
+          } else {
+            LOGGER.warn("Unexpected state in select! " + key.interestOps());
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.warn("Got an IOException while selecting!", e);
+      }
+    }
+
+    /**
+     * Accept a new connection.
+     */
+    @SuppressWarnings("unused")
+    private void handleAccept() throws IOException {
+      SelectionKey clientKey = null;
+      TNonblockingTransport client = null;
+      try {
+        // accept the connection
+        client = (TNonblockingTransport)serverTransport.accept();
+        clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
+
+        // add this key to the map
+          FrameBuffer frameBuffer = processorFactory_.isAsyncProcessor() ?
+                  new CustomAsyncFrameBuffer(client, clientKey,SelectAcceptThread.this) :
+                  new CustomFrameBuffer(client, clientKey,SelectAcceptThread.this);
+
+          clientKey.attach(frameBuffer);
+      } catch (TTransportException tte) {
+        // something went wrong accepting.
+        LOGGER.warn("Exception trying to accept!", tte);
+        tte.printStackTrace();
+        if (clientKey != null) cleanupSelectionKey(clientKey);
+        if (client != null) client.close();
+      }
+    }
+  } // SelectAcceptThread
+  // @formatter:on
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/debd8365/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
index 6d9e4c7..f51d003 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
@@ -22,7 +22,6 @@ import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.net.Socket;
 import java.net.UnknownHostException;
 import java.nio.channels.ServerSocketChannel;
 import java.util.Random;
@@ -47,7 +46,6 @@ import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TNonblockingSocket;
 import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
@@ -203,40 +201,10 @@ public class TServerUtils {
     }
   }
 
-  public static class THsHaServer extends org.apache.thrift.server.THsHaServer {
-    public THsHaServer(Args args) {
-      super(args);
-    }
-
-    @Override
-    protected Runnable getRunnable(FrameBuffer frameBuffer) {
-      return new Invocation(frameBuffer);
-    }
-
-    private class Invocation implements Runnable {
-
-      private final FrameBuffer frameBuffer;
-
-      public Invocation(final FrameBuffer frameBuffer) {
-        this.frameBuffer = frameBuffer;
-      }
-
-      @Override
-      public void run() {
-        if (frameBuffer.trans_ instanceof TNonblockingSocket) {
-          TNonblockingSocket tsock = (TNonblockingSocket) frameBuffer.trans_;
-          Socket sock = tsock.getSocketChannel().socket();
-          clientAddress.set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
-        }
-        frameBuffer.invoke();
-      }
-    }
-  }
-
-  public static ServerAddress createHsHaServer(HostAndPort address, TProcessor processor, final String serverName, String threadName, final int numThreads,
+  public static ServerAddress creatNonBlockingServer(HostAndPort address, TProcessor processor, final String serverName, String threadName, final int numThreads,
       long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
     TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort()));
-    THsHaServer.Args options = new THsHaServer.Args(transport);
+    CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport);
     options.protocolFactory(ThriftUtil.protocolFactory());
     options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
     options.maxReadBufferBytes = maxMessageSize;
@@ -274,7 +242,7 @@ public class TServerUtils {
     if (address.getPort() == 0) {
       address = HostAndPort.fromParts(address.getHostText(), transport.getPort());
     }
-    return new ServerAddress(new THsHaServer(options), address);
+    return new ServerAddress(new CustomNonBlockingServer(options), address);
   }
 
   public static ServerAddress createThreadPoolServer(HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads)
@@ -329,7 +297,7 @@ public class TServerUtils {
     if (sslParams != null) {
       serverAddress = createSslThreadPoolServer(address, processor, sslSocketTimeout, sslParams);
     } else {
-      serverAddress = createHsHaServer(address, processor, serverName, threadName, numThreads, timeBetweenThreadChecks, maxMessageSize);
+      serverAddress = creatNonBlockingServer(address, processor, serverName, threadName, numThreads, timeBetweenThreadChecks, maxMessageSize);
     }
     final TServer finalServer = serverAddress.server;
     Runnable serveTask = new Runnable() {