You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/12/15 00:45:36 UTC

svn commit: r1049338 - in /avro/trunk: ./ lang/java/src/java/org/apache/avro/ipc/ lang/java/src/java/org/apache/avro/specific/ lang/java/src/test/java/org/apache/avro/

Author: cutting
Date: Tue Dec 14 23:45:35 2010
New Revision: 1049338

URL: http://svn.apache.org/viewvc?rev=1049338&view=rev
Log:
AVRO-687. Java: Permit RPC applications to view remote protocol.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java
    avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificRequestor.java
    avro/trunk/lang/java/src/test/java/org/apache/avro/TestProtocolSpecific.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1049338&r1=1049337&r2=1049338&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Dec 14 23:45:35 2010
@@ -24,6 +24,8 @@ Avro 1.5.0 (unreleased)
 
     AVRO-689. Java: Permit setting timeout of HttpTransceiver. (cutting)
 
+    AVRO-687. Java: Permit RPC applications to view remote protocol. (cutting)
+
   IMPROVEMENTS
 
     AVRO-682. Java: Add method DataFileStream.getMetaKeys().

Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java?rev=1049338&r1=1049337&r2=1049338&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java Tue Dec 14 23:45:35 2010
@@ -62,7 +62,6 @@ public abstract class Requestor {
   protected List<RPCPlugin> rpcMetaPlugins;
 
   public Protocol getLocal() { return local; }
-  public Protocol getRemote() { return remote; }
   public Transceiver getTransceiver() { return transceiver; }
 
   protected Requestor(Protocol local, Transceiver transceiver)
@@ -127,7 +126,7 @@ public abstract class Requestor {
     } while (!readHandshake(in));
 
     // use remote protocol to read response
-    Message rm = getRemote().getMessages().get(messageName);
+    Message rm = remote.getMessages().get(messageName);
     if (rm == null)
       throw new AvroRuntimeException("Not a remote message: "+messageName);
     if (m.isOneWay() != rm.isOneWay())
@@ -236,6 +235,28 @@ public abstract class Requestor {
       REMOTE_PROTOCOLS.put(remoteHash, remote);
   }
 
+  /** Return the remote protocol.  Force a handshake if required. */
+  public synchronized Protocol getRemote() throws IOException {
+    MD5 remoteHash = REMOTE_HASHES.get(transceiver.getRemoteName());
+    remote = REMOTE_PROTOCOLS.get(remoteHash);
+    if (remote != null)
+      return remote;
+    // force handshake
+    ByteBufferOutputStream bbo = new ByteBufferOutputStream();
+    Encoder out = new BinaryEncoder(bbo);
+    writeHandshake(out);
+    out.writeLong(0);                             // empty metadata
+    out.writeString("");                          // bogus message name
+    List<ByteBuffer> response =
+      getTransceiver().transceive(bbo.getBufferList());
+    ByteBufferInputStream bbi = new ByteBufferInputStream(response);
+    BinaryDecoder in =
+      DecoderFactory.defaultFactory().createBinaryDecoder(bbi, null);
+    readHandshake(in);
+    return this.remote;
+  }
+
+
   /** Writes a request message. */
   public abstract void writeRequest(Schema schema, Object request,
                                     Encoder out) throws IOException;

Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java?rev=1049338&r1=1049337&r2=1049338&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java Tue Dec 14 23:45:35 2010
@@ -55,6 +55,9 @@ public abstract class Responder {
   private static final GenericDatumWriter<Map<CharSequence,ByteBuffer>>
     META_WRITER = new GenericDatumWriter<Map<CharSequence,ByteBuffer>>(META);
 
+  private static final ThreadLocal<Protocol> REMOTE =
+    new ThreadLocal<Protocol>();
+
   private Map<MD5,Protocol> protocols
     = Collections.synchronizedMap(new HashMap<MD5,Protocol>());
 
@@ -71,6 +74,11 @@ public abstract class Responder {
       Collections.synchronizedList(new ArrayList<RPCPlugin>());
   }
 
+  /** Return the remote protocol.  Accesses a {@link ThreadLocal} that's set
+   * around calls to {@link #respond(Protocol.Message, Object)}. */
+  public static Protocol getRemote() { return REMOTE.get(); }
+  
+  /** Return the local protocol. */
   public Protocol getLocal() { return local; }
   
   /**
@@ -133,12 +141,15 @@ public abstract class Responder {
       Object response = null;
       
       try {
+        REMOTE.set(remote);
         response = respond(m, request);
         context.setResponse(response);
       } catch (Exception e) {
         error = e;
         context.setError(error);
         LOG.warn("user error", e);
+      } finally {
+        REMOTE.set(null);
       }
       
       if (m.isOneWay() && wasConnected)           // no response data

Modified: avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificRequestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificRequestor.java?rev=1049338&r1=1049337&r2=1049338&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificRequestor.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificRequestor.java Tue Dec 14 23:45:35 2010
@@ -107,5 +107,12 @@ public class SpecificRequestor extends R
     return (T)Proxy.newProxyInstance(iface.getClassLoader(),
                                   new Class[] { iface }, requestor);
   }
+
+  /** Return the remote protocol for a proxy. */
+  public static Protocol getRemote(Object proxy) throws IOException {
+    return ((Requestor)Proxy.getInvocationHandler(proxy)).getRemote();
+    
+  }
+
 }
 

Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/TestProtocolSpecific.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/TestProtocolSpecific.java?rev=1049338&r1=1049337&r2=1049338&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/TestProtocolSpecific.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/TestProtocolSpecific.java Tue Dec 14 23:45:35 2010
@@ -85,6 +85,10 @@ public class TestProtocolSpecific {
     proxy = SpecificRequestor.getClient(Simple.class, client);
   }
 
+  @Test public void testGetRemote() throws IOException {
+    assertEquals(Simple.PROTOCOL, SpecificRequestor.getRemote(proxy));
+  }
+
   @Test
   public void testHello() throws IOException {
     CharSequence response = proxy.hello(new Utf8("bob"));