You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2016/01/11 22:28:16 UTC
accumulo git commit: ACCUMULO-4095 Hacks on CustomNonBlockingServer
to restore client address functionality.
Repository: accumulo
Updated Branches:
refs/heads/master e7fe96f2a -> 64713554b
ACCUMULO-4095 Hacks on CustomNonBlockingServer to restore client address functionality.
Closes apache/accumulo#63
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/64713554
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/64713554
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/64713554
Branch: refs/heads/master
Commit: 64713554b7c114088dcb7fd432e25bcd421cc04a
Parents: e7fe96f
Author: Josh Elser <el...@apache.org>
Authored: Fri Jan 8 00:49:44 2016 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Mon Jan 11 14:23:25 2016 -0500
----------------------------------------------------------------------
.../server/rpc/CustomNonBlockingServer.java | 63 ++++++++++++++++++--
1 file changed, 58 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/64713554/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java
index f4737be..ae65c1e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomNonBlockingServer.java
@@ -16,30 +16,83 @@
*/
package org.apache.accumulo.server.rpc;
+import java.io.IOException;
+import java.lang.reflect.Field;
import java.net.Socket;
import java.nio.channels.SelectionKey;
+import org.apache.accumulo.server.rpc.TServerUtils;
import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;
/**
* This class implements a custom non-blocking thrift server that stores the client address in thread-local storage for the invocation.
- *
*/
public class CustomNonBlockingServer extends THsHaServer {
+ private final Field selectAcceptThreadField;
+
public CustomNonBlockingServer(Args args) {
super(args);
+
+ try {
+ selectAcceptThreadField = TNonblockingServer.class.getDeclaredField("selectAcceptThread_");
+ selectAcceptThreadField.setAccessible(true);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to access required field in Thrift code.", e);
+ }
+ }
+
+ @Override
+ protected boolean startThreads() {
+ // Yet another dirty/gross hack to get access to the client's address.
+
+ // start the selector
+ try {
+ // Hack in our SelectAcceptThread impl
+ SelectAcceptThread selectAcceptThread_ = new CustomSelectAcceptThread((TNonblockingServerTransport) serverTransport_);
+ // Set the private field before continuing.
+ selectAcceptThreadField.set(this, selectAcceptThread_);
+
+ selectAcceptThread_.start();
+ return true;
+ } catch (IOException e) {
+ LOGGER.error("Failed to start selector thread!", e);
+ return false;
+ } catch (IllegalAccessException | IllegalArgumentException e) {
+ throw new RuntimeException("Exception setting customer select thread in Thrift");
+ }
}
- protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey, final AbstractSelectThread selectThread) {
- return new CustomAsyncFrameBuffer(trans, selectionKey, selectThread);
+ /**
+ * Custom wrapper around {@link org.apache.thrift.server.TNonblockingServer.SelectAcceptThread} to create our {@link CustomFrameBuffer}.
+ */
+ private class CustomSelectAcceptThread extends SelectAcceptThread {
+
+ public CustomSelectAcceptThread(TNonblockingServerTransport serverTransport) throws IOException {
+ super(serverTransport);
+ }
+
+ @Override
+ protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey, final AbstractSelectThread selectThread) {
+ if (processorFactory_.isAsyncProcessor()) {
+ throw new IllegalStateException("This implementation does not support AsyncProcessors");
+ }
+
+ return new CustomFrameBuffer(trans, selectionKey, selectThread);
+ }
}
- private class CustomAsyncFrameBuffer extends AsyncFrameBuffer {
+ /**
+ * Custom wrapper around {@link org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer} to extract the client's network location before accepting the
+ * request.
+ */
+ private class CustomFrameBuffer extends FrameBuffer {
- public CustomAsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
+ public CustomFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) {
super(trans, selectionKey, selectThread);
}