You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by ro...@apache.org on 2013/03/24 21:44:11 UTC

git commit: THRIFT-1864 java: implement event handler for non-blocking server Patch: Vitali Lovich

Updated Branches:
  refs/heads/master ce6d1d709 -> 9cda78844


THRIFT-1864 java: implement event handler for non-blocking server
Patch: Vitali Lovich


Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/9cda7884
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/9cda7884
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/9cda7884

Branch: refs/heads/master
Commit: 9cda78844de5097554414e3ef30e62d482679b81
Parents: ce6d1d7
Author: Roger Meier <ro...@apache.org>
Authored: Sun Mar 24 21:42:35 2013 +0100
Committer: Roger Meier <ro...@apache.org>
Committed: Sun Mar 24 21:42:35 2013 +0100

----------------------------------------------------------------------
 .../thrift/server/AbstractNonblockingServer.java   |   72 ++++++++------
 .../apache/thrift/server/TNonblockingServer.java   |    4 +
 .../thrift/server/TThreadedSelectorServer.java     |    4 +
 3 files changed, 49 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/thrift/blob/9cda7884/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
----------------------------------------------------------------------
diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
index e5e26b2..97afc0b 100644
--- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -281,7 +281,24 @@ public abstract class AbstractNonblockingServer extends TServer {
     // the ByteBuffer we'll be using to write and read, depending on the state
     private ByteBuffer buffer_;
 
-    private TByteArrayOutputStream response_;
+    private final TByteArrayOutputStream response_;
+    
+    // the frame that the TTransport should wrap.
+    private final TMemoryInputTransport frameTrans_;
+    
+    // the transport that should be used to connect to clients
+    private final TTransport inTrans_;
+    
+    private final TTransport outTrans_;
+    
+    // the input protocol to use on frames
+    private final TProtocol inProt_;
+    
+    // the output protocol to use on frames
+    private final TProtocol outProt_;
+    
+    // context associated with this connection
+    private final ServerContext context_;
 
     public FrameBuffer(final TNonblockingTransport trans,
         final SelectionKey selectionKey,
@@ -290,6 +307,19 @@ public abstract class AbstractNonblockingServer extends TServer {
       selectionKey_ = selectionKey;
       selectThread_ = selectThread;
       buffer_ = ByteBuffer.allocate(4);
+
+      frameTrans_ = new TMemoryInputTransport();
+      response_ = new TByteArrayOutputStream();
+      inTrans_ = inputTransportFactory_.getTransport(frameTrans_);
+      outTrans_ = outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
+      inProt_ = inputProtocolFactory_.getProtocol(inTrans_);
+      outProt_ = outputProtocolFactory_.getProtocol(outTrans_);
+
+      if (eventHandler_ != null) {
+        context_ = eventHandler_.createContext(inProt_, outProt_);
+      } else {
+        context_  = null;
+      }
     }
 
     /**
@@ -426,6 +456,9 @@ public abstract class AbstractNonblockingServer extends TServer {
         readBufferBytesAllocated.addAndGet(-buffer_.array().length);
       }
       trans_.close();
+      if (eventHandler_ != null) {
+        eventHandler_.deleteContext(context_, inProt_, outProt_);
+      }
     }
 
     /**
@@ -470,12 +503,14 @@ public abstract class AbstractNonblockingServer extends TServer {
      * Actually invoke the method signified by this FrameBuffer.
      */
     public void invoke() {
-      TTransport inTrans = getInputTransport();
-      TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
-      TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());
-
+      frameTrans_.reset(buffer_.array());
+      response_.reset();
+      
       try {
-        processorFactory_.getProcessor(inTrans).process(inProt, outProt);
+        if (eventHandler_ != null) {
+          eventHandler_.processContext(context_, inTrans_, outTrans_);
+        }
+        processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);
         responseReady();
         return;
       } catch (TException te) {
@@ -489,22 +524,6 @@ public abstract class AbstractNonblockingServer extends TServer {
     }
 
     /**
-     * Wrap the read buffer in a memory-based transport so a processor can read
-     * the data it needs to handle an invocation.
-     */
-    private TTransport getInputTransport() {
-      return inputTransportFactory_.getTransport(new TMemoryInputTransport(buffer_.array()));
-    }
-
-    /**
-     * Get the transport that should be used by the invoker for responding.
-     */
-    private TTransport getOutputTransport() {
-      response_ = new TByteArrayOutputStream();
-      return outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
-    }
-
-    /**
      * Perform a read into buffer.
      * 
      * @return true if the read succeeded, false if there was an error or the
@@ -550,13 +569,4 @@ public abstract class AbstractNonblockingServer extends TServer {
       }
     }
   } // FrameBuffer
-
-  public void setServerEventHandler(TServerEventHandler eventHandler) {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  public TServerEventHandler getEventHandler() {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/thrift/blob/9cda7884/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
----------------------------------------------------------------------
diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
index 169ae5c..240b123 100644
--- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
@@ -150,6 +150,10 @@ public class TNonblockingServer extends AbstractNonblockingServer {
      */
     public void run() {
       try {
+        if (eventHandler_ != null) {
+          eventHandler_.preServe();
+        }
+
         while (!stopped_) {
           select();
           processInterestChanges();

http://git-wip-us.apache.org/repos/asf/thrift/blob/9cda7884/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
----------------------------------------------------------------------
diff --git a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
index 23ec842..29eabb1 100644
--- a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
+++ b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
@@ -371,6 +371,10 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
      */
     public void run() {
       try {
+        if (eventHandler_ != null) {
+          eventHandler_.preServe();
+        }
+
         while (!stopped_) {
           select();
         }