You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2016/01/11 22:28:16 UTC

accumulo git commit: ACCUMULO-4095 Hacks on CustomNonBlockingServer to restore client address functionality.

Repository: accumulo
Updated Branches:
  refs/heads/master e7fe96f2a -> 64713554b


ACCUMULO-4095 Hacks on CustomNonBlockingServer to restore client address functionality.

Closes apache/accumulo#63


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/64713554
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/64713554
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/64713554

Branch: refs/heads/master
Commit: 64713554b7c114088dcb7fd432e25bcd421cc04a
Parents: e7fe96f
Author: Josh Elser <el...@apache.org>
Authored: Fri Jan 8 00:49:44 2016 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jan 11 14:23:25 2016 -0500

----------------------------------------------------------------------
 .../server/rpc/CustomNonBlockingServer.java     | 63 ++++++++++++++++++--
 1 file changed, 58 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/64713554/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java
index f4737be..ae65c1e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java
@@ -16,30 +16,83 @@
  */
 package org.apache.accumulo.server.rpc;
 
+import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.Socket;
 import java.nio.channels.SelectionKey;
 
+import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
 import org.apache.thrift.transport.TNonblockingSocket;
 import org.apache.thrift.transport.TNonblockingTransport;
 
 /**
  * This class implements a custom non-blocking thrift server that stores the client address in thread-local storage for the invocation.
- *
  */
 public class CustomNonBlockingServer extends THsHaServer {
 
+  private final Field selectAcceptThreadField;
+
   public CustomNonBlockingServer(Args args) {
     super(args);
+
+    try {
+      selectAcceptThreadField = TNonblockingServer.class.getDeclaredField("selectAcceptThread_");
+      selectAcceptThreadField.setAccessible(true);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to access required field in Thrift code.", e);
+    }
+  }
+
+  @Override
+  protected boolean startThreads() {
+    // Yet another dirty/gross hack to get access to the client's address.
+
+    // start the selector
+    try {
+      // Hack in our SelectAcceptThread impl
+      SelectAcceptThread selectAcceptThread_ = new CustomSelectAcceptThread((TNonblockingServerTransport) serverTransport_);
+      // Set the private field before continuing.
+      selectAcceptThreadField.set(this, selectAcceptThread_);
+
+      selectAcceptThread_.start();
+      return true;
+    } catch (IOException e) {
+      LOGGER.error("Failed to start selector thread!", e);
+      return false;
+    } catch (IllegalAccessException | IllegalArgumentException e) {
+      throw new RuntimeException("Exception setting customer select thread in Thrift");
+    }
   }
 
-  protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey, final AbstractSelectThread selectThread) {
-    return new CustomAsyncFrameBuffer(trans, selectionKey, selectThread);
+  /**
+   * Custom wrapper around {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread} to create our {@link CustomFrameBuffer}.
+   */
+  private class CustomSelectAcceptThread extends SelectAcceptThread {
+
+    public CustomSelectAcceptThread(TNonblockingServerTransport serverTransport) throws IOException {
+      super(serverTransport);
+    }
+
+    @Override
+    protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey, final AbstractSelectThread selectThread) {
+      if (processorFactory_.isAsyncProcessor()) {
+        throw new IllegalStateException("This implementation does not support AsyncProcessors");
+      }
+
+      return new CustomFrameBuffer(trans, selectionKey, selectThread);
+    }
   }
 
-  private class CustomAsyncFrameBuffer extends AsyncFrameBuffer {
+  /**
+   * Custom wrapper around {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} to extract the client's network location before accepting the
+   * request.
+   */
+  private class CustomFrameBuffer extends FrameBuffer {
 
-    public CustomAsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
+    public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
       super(trans, selectionKey, selectThread);
     }