You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/12/15 00:45:36 UTC
svn commit: r1049338 - in /avro/trunk: ./
lang/java/src/java/org/apache/avro/ipc/
lang/java/src/java/org/apache/avro/specific/
lang/java/src/test/java/org/apache/avro/
Author: cutting
Date: Tue Dec 14 23:45:35 2010
New Revision: 1049338
URL: http://svn.apache.org/viewvc?rev=1049338&view=rev
Log:
AVRO-687. Java: Permit RPC applications to view remote protocol.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java
avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificRequestor.java
avro/trunk/lang/java/src/test/java/org/apache/avro/TestProtocolSpecific.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1049338&r1=1049337&r2=1049338&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Dec 14 23:45:35 2010
@@ -24,6 +24,8 @@ Avro 1.5.0 (unreleased)
AVRO-689. Java: Permit setting timeout of HttpTransceiver. (cutting)
+ AVRO-687. Java: Permit RPC applications to view remote protocol. (cutting)
+
IMPROVEMENTS
AVRO-682. Java: Add method DataFileStream.getMetaKeys().
Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java?rev=1049338&r1=1049337&r2=1049338&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java Tue Dec 14 23:45:35 2010
@@ -62,7 +62,6 @@ public abstract class Requestor {
protected List<RPCPlugin> rpcMetaPlugins;
public Protocol getLocal() { return local; }
- public Protocol getRemote() { return remote; }
public Transceiver getTransceiver() { return transceiver; }
protected Requestor(Protocol local, Transceiver transceiver)
@@ -127,7 +126,7 @@ public abstract class Requestor {
} while (!readHandshake(in));
// use remote protocol to read response
- Message rm = getRemote().getMessages().get(messageName);
+ Message rm = remote.getMessages().get(messageName);
if (rm == null)
throw new AvroRuntimeException("Not a remote message: "+messageName);
if (m.isOneWay() != rm.isOneWay())
@@ -236,6 +235,28 @@ public abstract class Requestor {
REMOTE_PROTOCOLS.put(remoteHash, remote);
}
+ /** Return the remote protocol. Force a handshake if required. */
+ public synchronized Protocol getRemote() throws IOException {
+ MD5 remoteHash = REMOTE_HASHES.get(transceiver.getRemoteName());
+ remote = REMOTE_PROTOCOLS.get(remoteHash);
+ if (remote != null)
+ return remote;
+ // force handshake
+ ByteBufferOutputStream bbo = new ByteBufferOutputStream();
+ Encoder out = new BinaryEncoder(bbo);
+ writeHandshake(out);
+ out.writeLong(0); // empty metadata
+ out.writeString(""); // bogus message name
+ List<ByteBuffer> response =
+ getTransceiver().transceive(bbo.getBufferList());
+ ByteBufferInputStream bbi = new ByteBufferInputStream(response);
+ BinaryDecoder in =
+ DecoderFactory.defaultFactory().createBinaryDecoder(bbi, null);
+ readHandshake(in);
+ return this.remote;
+ }
+
+
/** Writes a request message. */
public abstract void writeRequest(Schema schema, Object request,
Encoder out) throws IOException;
Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java?rev=1049338&r1=1049337&r2=1049338&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java Tue Dec 14 23:45:35 2010
@@ -55,6 +55,9 @@ public abstract class Responder {
private static final GenericDatumWriter<Map<CharSequence,ByteBuffer>>
META_WRITER = new GenericDatumWriter<Map<CharSequence,ByteBuffer>>(META);
+ private static final ThreadLocal<Protocol> REMOTE =
+ new ThreadLocal<Protocol>();
+
private Map<MD5,Protocol> protocols
= Collections.synchronizedMap(new HashMap<MD5,Protocol>());
@@ -71,6 +74,11 @@ public abstract class Responder {
Collections.synchronizedList(new ArrayList<RPCPlugin>());
}
+ /** Return the remote protocol. Accesses a {@link ThreadLocal} that's set
+ * around calls to {@link #respond(Protocol.Message, Object)}. */
+ public static Protocol getRemote() { return REMOTE.get(); }
+
+ /** Return the local protocol. */
public Protocol getLocal() { return local; }
/**
@@ -133,12 +141,15 @@ public abstract class Responder {
Object response = null;
try {
+ REMOTE.set(remote);
response = respond(m, request);
context.setResponse(response);
} catch (Exception e) {
error = e;
context.setError(error);
LOG.warn("user error", e);
+ } finally {
+ REMOTE.set(null);
}
if (m.isOneWay() && wasConnected) // no response data
Modified: avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificRequestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificRequestor.java?rev=1049338&r1=1049337&r2=1049338&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificRequestor.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/specific/SpecificRequestor.java Tue Dec 14 23:45:35 2010
@@ -107,5 +107,12 @@ public class SpecificRequestor extends R
return (T)Proxy.newProxyInstance(iface.getClassLoader(),
new Class[] { iface }, requestor);
}
+
+ /** Return the remote protocol for a proxy. */
+ public static Protocol getRemote(Object proxy) throws IOException {
+ return ((Requestor)Proxy.getInvocationHandler(proxy)).getRemote();
+
+ }
+
}
Modified: avro/trunk/lang/java/src/test/java/org/apache/avro/TestProtocolSpecific.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/TestProtocolSpecific.java?rev=1049338&r1=1049337&r2=1049338&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/TestProtocolSpecific.java (original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/TestProtocolSpecific.java Tue Dec 14 23:45:35 2010
@@ -85,6 +85,10 @@ public class TestProtocolSpecific {
proxy = SpecificRequestor.getClient(Simple.class, client);
}
+ @Test public void testGetRemote() throws IOException {
+ assertEquals(Simple.PROTOCOL, SpecificRequestor.getRemote(proxy));
+ }
+
@Test
public void testHello() throws IOException {
CharSequence response = proxy.hello(new Utf8("bob"));