You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2009/08/01 00:56:28 UTC
svn commit: r799766 - in /hadoop/avro/trunk: ./
src/java/org/apache/avro/ipc/ src/java/org/apache/avro/reflect/
src/java/org/apache/avro/specific/ src/test/java/org/apache/avro/
Author: cutting
Date: Fri Jul 31 22:56:28 2009
New Revision: 799766
URL: http://svn.apache.org/viewvc?rev=799766&view=rev
Log:
AVRO-76. Add Java RPC plugin framework. Contributed by George Porter.
Added:
hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java
hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGenericMeta.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflectMeta.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecificMeta.java
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=799766&r1=799765&r2=799766&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Fri Jul 31 22:56:28 2009
@@ -9,6 +9,8 @@
AVRO-50. Implmenent JSON data codec in Java. (Thiruvalluvan
M. G. & cutting)
+ AVRO-76. Add Java RPC plugin framework. (George Porter)
+
IMPROVEMENTS
AVRO-71. C++: make deserializer more generic. (Scott Banachowski
Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java?rev=799766&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java Fri Jul 31 22:56:28 2009
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.util.Utf8;
+
+/**
+ * This class represents the context of an RPC call or RPC handshake.
+ * Designed to provide information to RPC plugin writers,
+ * this class encapsulates information about the rpc exchange,
+ * including per-session and per-call metadata.
+ *
+ */
+public class RPCContext {
+
+ protected Map<Utf8,ByteBuffer> requestSessionMeta, responseSessionMeta;
+ protected Map<Utf8,ByteBuffer> requestCallMeta, responseCallMeta;
+
+ protected Object response;
+ protected AvroRemoteException error;
+
+ /**
+ * This is an access method for the session state
+ * provided by the client to the server.
+ * @return a map representing session state from
+ * the client to the server
+ */
+ public Map<Utf8,ByteBuffer> requestSessionMeta() {
+ if (requestSessionMeta == null) {
+ requestSessionMeta = new HashMap<Utf8,ByteBuffer>();
+ }
+ return requestSessionMeta;
+ }
+
+ void setRequestSessionMeta(Map<Utf8,ByteBuffer> newmeta) {
+ requestSessionMeta = newmeta;
+ }
+
+ /**
+ * This is an access method for the session state
+ * provided by the server back to the client
+ * @return a map representing session state from
+ * the server to the client
+ */
+ public Map<Utf8,ByteBuffer> responseSessionMeta() {
+ if (responseSessionMeta == null) {
+ responseSessionMeta = new HashMap<Utf8,ByteBuffer>();
+ }
+ return responseSessionMeta;
+ }
+
+ void setResponseSessionMeta(Map<Utf8,ByteBuffer> newmeta) {
+ responseSessionMeta = newmeta;
+ }
+
+ /**
+ * This is an access method for the per-call state
+ * provided by the client to the server.
+ * @return a map representing per-call state from
+ * the client to the server
+ */
+ public Map<Utf8,ByteBuffer> requestCallMeta() {
+ if (requestCallMeta == null) {
+ requestCallMeta = new HashMap<Utf8,ByteBuffer>();
+ }
+ return requestCallMeta;
+ }
+
+ void setRequestCallMeta(Map<Utf8,ByteBuffer> newmeta) {
+ requestCallMeta = newmeta;
+ }
+
+ /**
+ * This is an access method for the per-call state
+ * provided by the server back to the client.
+ * @return a map representing per-call state from
+ * the server to the client
+ */
+ public Map<Utf8,ByteBuffer> responseCallMeta() {
+ if (responseCallMeta == null) {
+ responseCallMeta = new HashMap<Utf8,ByteBuffer>();
+ }
+ return responseCallMeta;
+ }
+
+ void setResponseCallMeta(Map<Utf8,ByteBuffer> newmeta) {
+ responseCallMeta = newmeta;
+ }
+
+ void setResponse(Object response) {
+ this.response = response;
+ this.error = null;
+ }
+
+ /**
+ * The response object generated at the server,
+ * if it exists. If an exception was generated,
+ * this will be null.
+ * @return the response created by this RPC, no
+ * null if an exception was generated
+ */
+ public Object response() {
+ return response;
+ }
+
+ void setError(AvroRemoteException error) {
+ this.response = null;
+ this.error = error;
+ }
+
+ /**
+ * The exception generated at the server,
+ * or null if no such exception has occured
+ * @return the exception generated at the server, or
+ * null if no such exception
+ */
+ public AvroRemoteException error() {
+ return error;
+ }
+
+ /**
+ * Indicates whether an exception was generated
+ * at the server
+ * @return true is an exception was generated at
+ * the server, or false if not
+ */
+ public boolean isError() {
+ return error != null;
+ }
+}
Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java?rev=799766&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java Fri Jul 31 22:56:28 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc;
+
+/**
+ * An instrumentation API for RPC metadata. Each of these methods
+ * is invoked at key points during the RPC exchange. Additionally,
+ * path-based <em>metadata</em> that is passed along with the RPC call
+ * and can be set or queried by subsequent instrumentation points.
+ */
+public class RPCPlugin {
+
+ /**
+ * Called on the client before the initial RPC handshake to
+ * setup any per-session metadata for this plugin
+ * @param context the per-sesion rpc context
+ */
+ public void clientStartConnect(RPCContext context) { }
+
+ /**
+ * Called on the server during the RPC handshake
+ * @param context the per-sesion rpc context
+ */
+ public void serverConnecting(RPCContext context) { }
+
+ /**
+ * Called on the client after the initial RPC handshake
+ * @param context the per-sesion rpc context
+ */
+ public void clientFinishConnect(RPCContext context) { }
+
+ /**
+ * This method is invoked at the client before it issues the RPC call.
+ * @param context the per-call rpc context (in/out parameter)
+ */
+ public void clientSendRequest(RPCContext context) { }
+
+ /**
+ * This method is invoked at the RPC server when the request is received,
+ * but before the call itself is executed
+ * @param context the per-call rpc context (in/out parameter)
+ */
+ public void serverReceiveRequest(RPCContext context) { }
+
+ /**
+ * This method is invoked at the server after the call is executed,
+ * but before the response is returned to the client
+ * @param context the per-call rpc context (in/out parameter)
+ */
+ public void serverSendResponse(RPCContext context) { }
+
+ /**
+ * This method is invoked at the client after the call is executed,
+ * and after the client receives the response
+ * @param context the per-call rpc context
+ */
+ public void clientReceiveResponse(RPCContext context) { }
+
+}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java?rev=799766&r1=799765&r2=799766&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java Fri Jul 31 22:56:28 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -56,6 +57,8 @@
private Protocol remote;
private boolean established, sendLocalText;
private Transceiver transceiver;
+
+ protected List<RPCPlugin> rpcMetaPlugins;
public Protocol getLocal() { return local; }
public Protocol getRemote() { return remote; }
@@ -65,6 +68,17 @@
throws IOException {
this.local = local;
this.transceiver = transceiver;
+ this.rpcMetaPlugins =
+ Collections.synchronizedList(new ArrayList<RPCPlugin>());
+ }
+
+ /**
+ * Adds a new plugin to manipulate RPC metadata. Plugins
+ * are executed in the order that they are added.
+ * @param plugin a plugin that will manipulate RPC metadata
+ */
+ public void addRPCPlugin(RPCPlugin plugin) {
+ rpcMetaPlugins.add(plugin);
}
/** Writes a request message and reads a response or error message. */
@@ -72,7 +86,7 @@
throws IOException {
Decoder in;
Message m;
- Map<Utf8,ByteBuffer> requestMeta = new HashMap<Utf8,ByteBuffer>();
+ RPCContext context = new RPCContext();
do {
ByteBufferOutputStream bbo = new ByteBufferOutputStream();
Encoder out = new BinaryEncoder(bbo);
@@ -85,7 +99,11 @@
if (m == null)
throw new AvroRuntimeException("Not a local message: "+messageName);
- META_WRITER.write(requestMeta, out);
+ for (RPCPlugin plugin : rpcMetaPlugins) {
+ plugin.clientSendRequest(context);
+ }
+
+ META_WRITER.write(context.requestCallMeta(), out);
out.writeString(m.getName()); // write message name
writeRequest(m.getRequest(), request, out); // write request payload
@@ -102,12 +120,25 @@
m = getRemote().getMessages().get(messageName);
if (m == null)
throw new AvroRuntimeException("Not a remote message: "+messageName);
- Map<Utf8,ByteBuffer> responseMeta = META_READER.read(null, in);
+ context.setRequestCallMeta(META_READER.read(null, in));
+
if (!in.readBoolean()) { // no error
- return readResponse(m.getResponse(), in);
+ Object response = readResponse(m.getResponse(), in);
+ context.setResponse(response);
+ for (RPCPlugin plugin : rpcMetaPlugins) {
+ plugin.clientReceiveResponse(context);
+ }
+ return response;
+
} else {
- throw readError(m.getErrors(), in);
+ AvroRemoteException error = readError(m.getErrors(), in);
+ context.setError(error);
+ for (RPCPlugin plugin : rpcMetaPlugins) {
+ plugin.clientReceiveResponse(context);
+ }
+ throw error;
}
+
}
private static final Map<String,MD5> REMOTE_HASHES =
@@ -136,9 +167,17 @@
handshake.serverHash = remoteHash;
if (sendLocalText)
handshake.clientProtocol = new Utf8(local.toString());
+
+ RPCContext context = new RPCContext();
+ for (RPCPlugin plugin : rpcMetaPlugins) {
+ plugin.clientStartConnect(context);
+ }
+ handshake.meta = context.requestSessionMeta();
+
HANDSHAKE_WRITER.write(handshake, out);
}
+ @SuppressWarnings("unchecked")
private void readHandshake(Decoder in) throws IOException {
HandshakeResponse handshake =
(HandshakeResponse)HANDSHAKE_READER.read(null, in);
@@ -159,6 +198,15 @@
default:
throw new AvroRuntimeException("Unexpected match: "+handshake.match);
}
+
+ RPCContext context = new RPCContext();
+ if (handshake.meta != null) {
+ context.setResponseSessionMeta((Map<Utf8, ByteBuffer>) handshake.meta);
+ }
+
+ for (RPCPlugin plugin : rpcMetaPlugins) {
+ plugin.clientFinishConnect(context);
+ }
}
private void setRemote(HandshakeResponse handshake) {
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java?rev=799766&r1=799765&r2=799766&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java Fri Jul 31 22:56:28 2009
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -62,15 +63,27 @@
private Protocol local;
private MD5 localHash;
+ protected List<RPCPlugin> rpcMetaPlugins;
protected Responder(Protocol local) {
this.local = local;
this.localHash = new MD5();
localHash.bytes(local.getMD5());
protocols.put(localHash, local);
+ this.rpcMetaPlugins =
+ Collections.synchronizedList(new ArrayList<RPCPlugin>());
}
public Protocol getLocal() { return local; }
+
+ /**
+ * Adds a new plugin to manipulate per-call metadata. Plugins
+ * are executed in the order that they are added.
+ * @param plugin a plugin that will manipulate RPC metadata
+ */
+ public void addRPCPlugin(RPCPlugin plugin) {
+ rpcMetaPlugins.add(plugin);
+ }
/** Called by a server to deserialize a request, compute and serialize
* a response or error. */
@@ -83,20 +96,24 @@
new ByteBufferOutputStream();
Encoder out = new BinaryEncoder(bbo);
AvroRemoteException error = null;
- Map<Utf8,ByteBuffer> responseMeta = new HashMap<Utf8,ByteBuffer>();
+ RPCContext context = new RPCContext();
try {
Protocol remote = handshake(transceiver, in, out);
if (remote == null) // handshake failed
return bbo.getBufferList();
// read request using remote protocol specification
- Map<Utf8,ByteBuffer> requestMeta = META_READER.read(null, in);
+ context.setRequestCallMeta(META_READER.read(null, in));
String messageName = in.readString(null).toString();
Message m = remote.getMessages().get(messageName);
if (m == null)
throw new AvroRuntimeException("No such remote message: "+messageName);
Object request = readRequest(m.getRequest(), in);
+
+ for (RPCPlugin plugin : rpcMetaPlugins) {
+ plugin.serverReceiveRequest(context);
+ }
// create response using local protocol specification
m = getLocal().getMessages().get(messageName);
@@ -105,13 +122,21 @@
Object response = null;
try {
response = respond(m, request);
+ context.setResponse(response);
} catch (AvroRemoteException e) {
error = e;
+ context.setError(error);
} catch (Exception e) {
LOG.warn("application error", e);
error = new AvroRemoteException(new Utf8(e.toString()));
+ context.setError(error);
+ }
+
+ for (RPCPlugin plugin : rpcMetaPlugins) {
+ plugin.serverSendResponse(context);
}
- META_WRITER.write(responseMeta, out);
+
+ META_WRITER.write(context.responseCallMeta(), out);
out.writeBoolean(error != null);
if (error == null)
writeResponse(m.getResponse(), response, out);
@@ -121,9 +146,10 @@
} catch (AvroRuntimeException e) { // system error
LOG.warn("system error", e);
error = new AvroRemoteException(e);
+ context.setError(error);
bbo = new ByteBufferOutputStream();
out = new BinaryEncoder(bbo);
- META_WRITER.write(responseMeta, out);
+ META_WRITER.write(context.responseCallMeta(), out);
out.writeBoolean(true);
writeError(Protocol.SYSTEM_ERRORS, error, out);
}
@@ -136,6 +162,7 @@
private SpecificDatumReader handshakeReader =
new SpecificDatumReader(HandshakeRequest._SCHEMA);
+ @SuppressWarnings("unchecked")
private Protocol handshake(Transceiver transceiver,
Decoder in, Encoder out)
throws IOException {
@@ -162,6 +189,15 @@
response.serverProtocol = new Utf8(local.toString());
response.serverHash = localHash;
}
+
+ RPCContext context = new RPCContext();
+ context.setRequestSessionMeta((Map<Utf8, ByteBuffer>) request.meta);
+
+ for (RPCPlugin plugin : rpcMetaPlugins) {
+ plugin.serverConnecting(context);
+ }
+ response.meta = context.responseSessionMeta();
+
handshakeWriter.write(response, out);
return remote;
}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java?rev=799766&r1=799765&r2=799766&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java Fri Jul 31 22:56:28 2009
@@ -37,6 +37,11 @@
/** A {@link Requestor} for existing interfaces via Java reflection. */
public class ReflectRequestor extends Requestor implements InvocationHandler {
protected String packageName;
+
+ public ReflectRequestor(Class<?> iface, Transceiver transceiver)
+ throws IOException {
+ this(ReflectData.getProtocol(iface), transceiver);
+ }
protected ReflectRequestor(Protocol protocol, Transceiver transceiver)
throws IOException {
@@ -82,5 +87,12 @@
new Class[] { iface },
new ReflectRequestor(protocol, transciever));
}
+
+ /** Create a proxy instance whose methods invoke RPCs. */
+ public static Object getClient(Class<?> iface, ReflectRequestor rreq)
+ throws IOException {
+ return Proxy.newProxyInstance(iface.getClassLoader(),
+ new Class[] { iface }, rreq);
+ }
}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java?rev=799766&r1=799765&r2=799766&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java Fri Jul 31 22:56:28 2009
@@ -31,6 +31,12 @@
/** {@link Requestor} for generated interfaces. */
public class SpecificRequestor extends ReflectRequestor {
+
+ public SpecificRequestor(Class<?> iface, Transceiver transceiver)
+ throws IOException {
+ this(ReflectData.getProtocol(iface), transceiver);
+ }
+
private SpecificRequestor(Protocol protocol, Transceiver transceiver)
throws IOException {
super(protocol, transceiver);
@@ -52,5 +58,12 @@
new Class[] { iface },
new SpecificRequestor(protocol, transciever));
}
+
+ /** Create a proxy instance whose methods invoke RPCs. */
+ public static Object getClient(Class<?> iface, SpecificRequestor requestor)
+ throws IOException {
+ return Proxy.newProxyInstance(iface.getClassLoader(),
+ new Class[] { iface }, requestor);
+ }
}
Added: hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java?rev=799766&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java Fri Jul 31 22:56:28 2009
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.avro.ipc.RPCContext;
+import org.apache.avro.ipc.RPCPlugin;
+import org.apache.avro.util.Utf8;
+
+/**
+ * An implementation of an RPC metadata plugin API
+ * designed for unit testing. This plugin tests
+ * both session and per-call state by passing
+ * a string as per-call metadata, slowly building it
+ * up at each instrumentation point, testing it as
+ * it goes. Finally, after the call or handshake is
+ * complete, the constructed string is tested.
+ */
+public final class RPCMetaTestPlugin extends RPCPlugin {
+
+ protected final Utf8 key;
+
+ public RPCMetaTestPlugin(String keyname) {
+ key = new Utf8(keyname);
+ }
+
+ @Override
+ public void clientStartConnect(RPCContext context) {
+ ByteBuffer buf = ByteBuffer.wrap("ap".getBytes());
+ context.requestSessionMeta().put(key, buf);
+ }
+
+ @Override
+ public void serverConnecting(RPCContext context) {
+
+ Assert.assertNotNull(context.requestSessionMeta());
+ Assert.assertNotNull(context.responseSessionMeta());
+
+ if (!context.requestSessionMeta().containsKey(key)) return;
+
+ ByteBuffer buf = context.requestSessionMeta().get(key);
+ Assert.assertNotNull(buf);
+ Assert.assertNotNull(buf.array());
+
+ String partialstr = new String(buf.array());
+ Assert.assertNotNull(partialstr);
+ Assert.assertEquals("partial string mismatch", "ap", partialstr);
+
+ buf = ByteBuffer.wrap((partialstr + "ac").getBytes());
+ Assert.assertTrue(buf.remaining() > 0);
+ context.responseSessionMeta().put(key, buf);
+ }
+
+ @Override
+ public void clientFinishConnect(RPCContext context) {
+ Map<Utf8,ByteBuffer> sessionMeta = context.responseSessionMeta();
+
+ Assert.assertNotNull(sessionMeta);
+
+ if (!sessionMeta.containsKey(key)) return;
+
+ ByteBuffer buf = sessionMeta.get(key);
+ Assert.assertNotNull(buf);
+ Assert.assertNotNull(buf.array());
+
+ String partialstr = new String(buf.array());
+ Assert.assertNotNull(partialstr);
+ Assert.assertEquals("partial string mismatch", "apac", partialstr);
+
+ buf = ByteBuffer.wrap((partialstr + "he").getBytes());
+ Assert.assertTrue(buf.remaining() > 0);
+ sessionMeta.put(key, buf);
+
+ checkRPCMetaMap(sessionMeta);
+ }
+
+ @Override
+ public void clientSendRequest(RPCContext context) {
+ ByteBuffer buf = ByteBuffer.wrap("ap".getBytes());
+ context.requestCallMeta().put(key, buf);
+ }
+
+ @Override
+ public void serverReceiveRequest(RPCContext context) {
+ Map<Utf8,ByteBuffer> meta = context.requestCallMeta();
+
+ Assert.assertNotNull(meta);
+
+ if (!meta.containsKey(key)) return;
+
+ ByteBuffer buf = meta.get(key);
+ Assert.assertNotNull(buf);
+ Assert.assertNotNull(buf.array());
+
+ String partialstr = new String(buf.array());
+ Assert.assertNotNull(partialstr);
+ Assert.assertEquals("partial string mismatch", "ap", partialstr);
+
+ buf = ByteBuffer.wrap((partialstr + "a").getBytes());
+ Assert.assertTrue(buf.remaining() > 0);
+ meta.put(key, buf);
+ }
+
+ @Override
+ public void serverSendResponse(RPCContext context) {
+ Assert.assertNotNull(context.requestCallMeta());
+ Assert.assertNotNull(context.responseCallMeta());
+
+ if (!context.requestCallMeta().containsKey(key)) return;
+
+ ByteBuffer buf = context.requestCallMeta().get(key);
+ Assert.assertNotNull(buf);
+ Assert.assertNotNull(buf.array());
+
+ String partialstr = new String(buf.array());
+ Assert.assertNotNull(partialstr);
+ Assert.assertEquals("partial string mismatch", "apa", partialstr);
+
+ buf = ByteBuffer.wrap((partialstr + "c").getBytes());
+ Assert.assertTrue(buf.remaining() > 0);
+ context.responseCallMeta().put(key, buf);
+ }
+
+ @Override
+ public void clientReceiveResponse(RPCContext context) {
+ Assert.assertNotNull(context.responseCallMeta());
+
+ if (!context.responseCallMeta().containsKey(key)) return;
+
+ ByteBuffer buf = context.responseCallMeta().get(key);
+ Assert.assertNotNull(buf);
+ Assert.assertNotNull(buf.array());
+
+ String partialstr = new String(buf.array());
+ Assert.assertNotNull(partialstr);
+ Assert.assertEquals("partial string mismatch", "apac", partialstr);
+
+ buf = ByteBuffer.wrap((partialstr + "he").getBytes());
+ Assert.assertTrue(buf.remaining() > 0);
+ context.responseCallMeta().put(key, buf);
+
+ checkRPCMetaMap(context.responseCallMeta());
+ }
+
+ protected void checkRPCMetaMap(Map<Utf8,ByteBuffer> rpcMeta) {
+ Assert.assertNotNull(rpcMeta);
+ Assert.assertTrue("key not present in map", rpcMeta.containsKey(key));
+
+ ByteBuffer keybuf = rpcMeta.get(key);
+ Assert.assertNotNull(keybuf);
+ Assert.assertTrue("key BB had nothing remaining", keybuf.remaining() > 0);
+
+ String str = new String(keybuf.array());
+ Assert.assertEquals("apache", str);
+ }
+
+}
Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java?rev=799766&r1=799765&r2=799766&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java Fri Jul 31 22:56:28 2009
@@ -46,8 +46,8 @@
private static final Logger LOG
= LoggerFactory.getLogger(TestProtocolGeneric.class);
- private static final File FILE = new File("src/test/schemata/simple.avpr");
- private static final Protocol PROTOCOL;
+ protected static final File FILE = new File("src/test/schemata/simple.avpr");
+ protected static final Protocol PROTOCOL;
static {
try {
PROTOCOL = Protocol.parse(FILE);
@@ -56,7 +56,7 @@
}
}
- private static class TestResponder extends GenericResponder {
+ protected static class TestResponder extends GenericResponder {
public TestResponder() { super(PROTOCOL); }
public Object respond(Message message, Object request)
throws AvroRemoteException {
@@ -91,9 +91,9 @@
}
- private static SocketServer server;
- private static Transceiver client;
- private static Requestor requestor;
+ protected static SocketServer server;
+ protected static Transceiver client;
+ protected static Requestor requestor;
@Before
public void testStartServer() throws Exception {
Added: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGenericMeta.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGenericMeta.java?rev=799766&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGenericMeta.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGenericMeta.java Fri Jul 31 22:56:28 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import java.net.InetSocketAddress;
+
+import org.apache.avro.generic.GenericRequestor;
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.SocketServer;
+import org.apache.avro.ipc.SocketTransceiver;
+import org.junit.Before;
+
+public class TestProtocolGenericMeta extends TestProtocolGeneric {
+
+ @Before
+ public void testStartServer() throws Exception {
+ Responder responder = new TestResponder();
+ responder.addRPCPlugin(new RPCMetaTestPlugin("key1"));
+ responder.addRPCPlugin(new RPCMetaTestPlugin("key2"));
+ server = new SocketServer(responder, new InetSocketAddress(0));
+
+ client = new SocketTransceiver(new InetSocketAddress(server.getPort()));
+ requestor = new GenericRequestor(PROTOCOL, client);
+ requestor.addRPCPlugin(new RPCMetaTestPlugin("key1"));
+ requestor.addRPCPlugin(new RPCMetaTestPlugin("key2"));
+ }
+}
Added: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflectMeta.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflectMeta.java?rev=799766&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflectMeta.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflectMeta.java Fri Jul 31 22:56:28 2009
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import org.apache.avro.ipc.SocketServer;
+import org.apache.avro.ipc.SocketTransceiver;
+import org.apache.avro.reflect.ReflectRequestor;
+import org.apache.avro.reflect.ReflectResponder;
+import org.apache.avro.test.Simple;
+import org.junit.Before;
+
+import java.net.InetSocketAddress;
+
+public class TestProtocolReflectMeta extends TestProtocolReflect {
+
+ @Before
+ public void testStartServer() throws Exception {
+ ReflectResponder rresp = new ReflectResponder(Simple.class, new TestImpl());
+ rresp.addRPCPlugin(new RPCMetaTestPlugin("key1"));
+ rresp.addRPCPlugin(new RPCMetaTestPlugin("key2"));
+ server = new SocketServer(rresp, new InetSocketAddress(0));
+
+ client = new SocketTransceiver(new InetSocketAddress(server.getPort()));
+ ReflectRequestor requestor = new ReflectRequestor(Simple.class, client);
+ requestor.addRPCPlugin(new RPCMetaTestPlugin("key1"));
+ requestor.addRPCPlugin(new RPCMetaTestPlugin("key2"));
+ proxy = (Simple)ReflectRequestor.getClient(Simple.class, (ReflectRequestor)requestor);
+ }
+
+}
Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java?rev=799766&r1=799765&r2=799766&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java Fri Jul 31 22:56:28 2009
@@ -47,7 +47,7 @@
private static final Logger LOG
= LoggerFactory.getLogger(TestProtocolSpecific.class);
- private static final File SERVER_PORTS_DIR
+ protected static final File SERVER_PORTS_DIR
= new File(System.getProperty("test.dir", "/tmp")+"/server-ports/");
public static class TestImpl implements Simple {
Added: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecificMeta.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecificMeta.java?rev=799766&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecificMeta.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecificMeta.java Fri Jul 31 22:56:28 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import java.net.InetSocketAddress;
+
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.SocketServer;
+import org.apache.avro.ipc.SocketTransceiver;
+import org.apache.avro.specific.SpecificRequestor;
+import org.apache.avro.specific.SpecificResponder;
+import org.apache.avro.test.Simple;
+import org.junit.Before;
+
+
+public class TestProtocolSpecificMeta extends TestProtocolSpecific {
+
+ @Before
+ public void testStartServer() throws Exception {
+ Responder responder = new SpecificResponder(Simple.class, new TestImpl());
+ responder.addRPCPlugin(new RPCMetaTestPlugin("key1"));
+ responder.addRPCPlugin(new RPCMetaTestPlugin("key2"));
+ server = new SocketServer(responder, new InetSocketAddress(0));
+
+ client = new SocketTransceiver(new InetSocketAddress(server.getPort()));
+ SpecificRequestor req = new SpecificRequestor(Simple.class, client);
+ req.addRPCPlugin(new RPCMetaTestPlugin("key1"));
+ req.addRPCPlugin(new RPCMetaTestPlugin("key2"));
+ proxy = (Simple)SpecificRequestor.getClient(Simple.class, (SpecificRequestor)req);
+ }
+}