You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2009/08/01 00:56:28 UTC

svn commit: r799766 - in /hadoop/avro/trunk: ./ src/java/org/apache/avro/ipc/ src/java/org/apache/avro/reflect/ src/java/org/apache/avro/specific/ src/test/java/org/apache/avro/

Author: cutting
Date: Fri Jul 31 22:56:28 2009
New Revision: 799766

URL: http://svn.apache.org/viewvc?rev=799766&view=rev
Log:
AVRO-76.  Add Java RPC plugin framework.  Contributed by George Porter.

Added:
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGenericMeta.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflectMeta.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecificMeta.java
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
    hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java
    hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=799766&r1=799765&r2=799766&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Fri Jul 31 22:56:28 2009
@@ -9,6 +9,8 @@
     AVRO-50. Implmenent JSON data codec in Java. (Thiruvalluvan
     M. G. & cutting)
 
+    AVRO-76. Add Java RPC plugin framework.  (George Porter)
+
   IMPROVEMENTS
 
     AVRO-71.  C++: make deserializer more generic.  (Scott Banachowski

Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java?rev=799766&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCContext.java Fri Jul 31 22:56:28 2009
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.util.Utf8;
+
+/**
+ * This class represents the context of an RPC call or RPC handshake.
+ * Designed to provide information to RPC plugin writers,
+ * this class encapsulates information about the rpc exchange,
+ * including per-session and per-call metadata.
+ *
+ */
+public class RPCContext {
+  
+  protected Map<Utf8,ByteBuffer> requestSessionMeta, responseSessionMeta;
+  protected Map<Utf8,ByteBuffer> requestCallMeta, responseCallMeta;
+  
+  protected Object response;
+  protected AvroRemoteException error;
+  
+  /**
+   * This is an access method for the session state
+   * provided by the client to the server.
+   * @return a map representing session state from
+   * the client to the server
+   */
+  public Map<Utf8,ByteBuffer> requestSessionMeta() {
+    if (requestSessionMeta == null) {
+      requestSessionMeta = new HashMap<Utf8,ByteBuffer>();
+    }
+    return requestSessionMeta;
+  }
+  
+  void setRequestSessionMeta(Map<Utf8,ByteBuffer> newmeta) {
+    requestSessionMeta = newmeta;
+  }
+  
+  /**
+   * This is an access method for the session state
+   * provided by the server back to the client
+   * @return a map representing session state from
+   * the server to the client
+   */
+  public Map<Utf8,ByteBuffer> responseSessionMeta() {
+    if (responseSessionMeta == null) {
+      responseSessionMeta = new HashMap<Utf8,ByteBuffer>();
+    }
+    return responseSessionMeta;
+  }
+  
+  void setResponseSessionMeta(Map<Utf8,ByteBuffer> newmeta) {
+    responseSessionMeta = newmeta;
+  }
+  
+  /**
+   * This is an access method for the per-call state
+   * provided by the client to the server.
+   * @return a map representing per-call state from
+   * the client to the server
+   */
+  public Map<Utf8,ByteBuffer> requestCallMeta() {
+    if (requestCallMeta == null) {
+      requestCallMeta = new HashMap<Utf8,ByteBuffer>();
+    }
+    return requestCallMeta;
+  }
+  
+  void setRequestCallMeta(Map<Utf8,ByteBuffer> newmeta) {
+    requestCallMeta = newmeta;
+  }
+  
+  /**
+   * This is an access method for the per-call state
+   * provided by the server back to the client.
+   * @return a map representing per-call state from
+   * the server to the client
+   */
+  public Map<Utf8,ByteBuffer> responseCallMeta() {
+    if (responseCallMeta == null) {
+      responseCallMeta = new HashMap<Utf8,ByteBuffer>();
+    }
+    return responseCallMeta;
+  }
+  
+  void setResponseCallMeta(Map<Utf8,ByteBuffer> newmeta) {
+    responseCallMeta = newmeta;
+  }
+  
+  void setResponse(Object response) {
+    this.response = response;
+    this.error = null;
+  }
+  
+  /**
+   * The response object generated at the server,
+   * if it exists.  If an exception was generated,
+   * this will be null.
+   * @return the response created by this RPC, no
+   * null if an exception was generated
+   */
+  public Object response() {
+    return response;
+  }
+  
+  void setError(AvroRemoteException error) {
+    this.response = null;
+    this.error = error;
+  }
+  
+  /**
+   * The exception generated at the server,
+   * or null if no such exception has occured
+   * @return the exception generated at the server, or
+   * null if no such exception
+   */
+  public AvroRemoteException error() {
+    return error;
+  }
+  
+  /**
+   * Indicates whether an exception was generated
+   * at the server
+   * @return true is an exception was generated at
+   * the server, or false if not
+   */
+  public boolean isError() {
+    return error != null;
+  }
+}

Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java?rev=799766&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/RPCPlugin.java Fri Jul 31 22:56:28 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc;
+
+/**
+ * An instrumentation API for RPC metadata.  Each of these methods
+ * is invoked at key points during the RPC exchange.  Additionally,
+ * path-based <em>metadata</em> that is passed along with the RPC call
+ * and can be set or queried by subsequent instrumentation points.
+ */
+public class RPCPlugin {
+  
+  /**
+   * Called on the client before the initial RPC handshake to
+   * setup any per-session metadata for this plugin
+   * @param context the per-sesion rpc context
+   */
+  public void clientStartConnect(RPCContext context) { }
+  
+  /**
+   * Called on the server during the RPC handshake
+   * @param context the per-sesion rpc context
+   */
+  public void serverConnecting(RPCContext context) { }
+  
+  /**
+   * Called on the client after the initial RPC handshake
+   * @param context the per-sesion rpc context
+   */
+  public void clientFinishConnect(RPCContext context) { }
+  
+  /**
+   * This method is invoked at the client before it issues the RPC call.
+   * @param context the per-call rpc context (in/out parameter)
+   */
+  public void clientSendRequest(RPCContext context) { }
+  
+  /**
+   * This method is invoked at the RPC server when the request is received,
+   * but before the call itself is executed
+   * @param context the per-call rpc context (in/out parameter)
+   */
+  public void serverReceiveRequest(RPCContext context) { }
+  
+  /**
+   * This method is invoked at the server after the call is executed,
+   * but before the response is returned to the client
+   * @param context the per-call rpc context (in/out parameter)
+   */
+  public void serverSendResponse(RPCContext context) { }
+    
+  /**
+   * This method is invoked at the client after the call is executed,
+   * and after the client receives the response
+   * @param context the per-call rpc context
+   */
+  public void clientReceiveResponse(RPCContext context) { }
+  
+}

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java?rev=799766&r1=799765&r2=799766&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java Fri Jul 31 22:56:28 2009
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -56,6 +57,8 @@
   private Protocol remote;
   private boolean established, sendLocalText;
   private Transceiver transceiver;
+  
+  protected List<RPCPlugin> rpcMetaPlugins;
 
   public Protocol getLocal() { return local; }
   public Protocol getRemote() { return remote; }
@@ -65,6 +68,17 @@
     throws IOException {
     this.local = local;
     this.transceiver = transceiver;
+    this.rpcMetaPlugins =
+      Collections.synchronizedList(new ArrayList<RPCPlugin>());
+  }
+  
+  /**
+   * Adds a new plugin to manipulate RPC metadata.  Plugins
+   * are executed in the order that they are added.
+   * @param plugin a plugin that will manipulate RPC metadata
+   */
+  public void addRPCPlugin(RPCPlugin plugin) {
+    rpcMetaPlugins.add(plugin);
   }
 
   /** Writes a request message and reads a response or error message. */
@@ -72,7 +86,7 @@
     throws IOException {
     Decoder in;
     Message m;
-    Map<Utf8,ByteBuffer> requestMeta = new HashMap<Utf8,ByteBuffer>();
+    RPCContext context = new RPCContext();
     do {
       ByteBufferOutputStream bbo = new ByteBufferOutputStream();
       Encoder out = new BinaryEncoder(bbo);
@@ -85,7 +99,11 @@
       if (m == null)
         throw new AvroRuntimeException("Not a local message: "+messageName);
       
-      META_WRITER.write(requestMeta, out);
+      for (RPCPlugin plugin : rpcMetaPlugins) {
+        plugin.clientSendRequest(context);
+      }
+      
+      META_WRITER.write(context.requestCallMeta(), out);
       out.writeString(m.getName());       // write message name
       writeRequest(m.getRequest(), request, out); // write request payload
       
@@ -102,12 +120,25 @@
     m = getRemote().getMessages().get(messageName);
     if (m == null)
       throw new AvroRuntimeException("Not a remote message: "+messageName);
-    Map<Utf8,ByteBuffer> responseMeta = META_READER.read(null, in);
+    context.setRequestCallMeta(META_READER.read(null, in));
+    
     if (!in.readBoolean()) {                      // no error
-      return readResponse(m.getResponse(), in);
+      Object response = readResponse(m.getResponse(), in);
+      context.setResponse(response);
+      for (RPCPlugin plugin : rpcMetaPlugins) {
+        plugin.clientReceiveResponse(context);
+      }
+      return response;
+      
     } else {
-      throw readError(m.getErrors(), in);
+      AvroRemoteException error = readError(m.getErrors(), in);
+      context.setError(error);
+      for (RPCPlugin plugin : rpcMetaPlugins) {
+        plugin.clientReceiveResponse(context);
+      }
+      throw error;
     }
+    
   }
 
   private static final Map<String,MD5> REMOTE_HASHES =
@@ -136,9 +167,17 @@
     handshake.serverHash = remoteHash;
     if (sendLocalText)
       handshake.clientProtocol = new Utf8(local.toString());
+    
+    RPCContext context = new RPCContext();
+    for (RPCPlugin plugin : rpcMetaPlugins) {
+      plugin.clientStartConnect(context);
+    }
+    handshake.meta = context.requestSessionMeta();
+    
     HANDSHAKE_WRITER.write(handshake, out);
   }
 
+  @SuppressWarnings("unchecked")
   private void readHandshake(Decoder in) throws IOException {
     HandshakeResponse handshake =
       (HandshakeResponse)HANDSHAKE_READER.read(null, in);
@@ -159,6 +198,15 @@
     default:
       throw new AvroRuntimeException("Unexpected match: "+handshake.match);
     }
+    
+    RPCContext context = new RPCContext();
+    if (handshake.meta != null) {
+      context.setResponseSessionMeta((Map<Utf8, ByteBuffer>) handshake.meta);
+    }
+      
+    for (RPCPlugin plugin : rpcMetaPlugins) {
+      plugin.clientFinishConnect(context);
+    }
   }
 
   private void setRemote(HandshakeResponse handshake) {

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java?rev=799766&r1=799765&r2=799766&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java Fri Jul 31 22:56:28 2009
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -62,15 +63,27 @@
 
   private Protocol local;
   private MD5 localHash;
+  protected List<RPCPlugin> rpcMetaPlugins;
 
   protected Responder(Protocol local) {
     this.local = local;
     this.localHash = new MD5();
     localHash.bytes(local.getMD5());
     protocols.put(localHash, local);
+    this.rpcMetaPlugins =
+      Collections.synchronizedList(new ArrayList<RPCPlugin>());
   }
 
   public Protocol getLocal() { return local; }
+  
+  /**
+   * Adds a new plugin to manipulate per-call metadata.  Plugins
+   * are executed in the order that they are added.
+   * @param plugin a plugin that will manipulate RPC metadata
+   */
+  public void addRPCPlugin(RPCPlugin plugin) {
+    rpcMetaPlugins.add(plugin);
+  }
 
   /** Called by a server to deserialize a request, compute and serialize
    * a response or error. */
@@ -83,20 +96,24 @@
       new ByteBufferOutputStream();
     Encoder out = new BinaryEncoder(bbo);
     AvroRemoteException error = null;
-    Map<Utf8,ByteBuffer> responseMeta = new HashMap<Utf8,ByteBuffer>();
+    RPCContext context = new RPCContext();
     try {
       Protocol remote = handshake(transceiver, in, out);
       if (remote == null)                        // handshake failed
         return bbo.getBufferList();
 
       // read request using remote protocol specification
-      Map<Utf8,ByteBuffer> requestMeta = META_READER.read(null, in);
+      context.setRequestCallMeta(META_READER.read(null, in));
       String messageName = in.readString(null).toString();
       Message m = remote.getMessages().get(messageName);
       if (m == null)
         throw new AvroRuntimeException("No such remote message: "+messageName);
       
       Object request = readRequest(m.getRequest(), in);
+      
+      for (RPCPlugin plugin : rpcMetaPlugins) {
+        plugin.serverReceiveRequest(context);
+      }
 
       // create response using local protocol specification
       m = getLocal().getMessages().get(messageName);
@@ -105,13 +122,21 @@
       Object response = null;
       try {
         response = respond(m, request);
+        context.setResponse(response);
       } catch (AvroRemoteException e) {
         error = e;
+        context.setError(error);
       } catch (Exception e) {
         LOG.warn("application error", e);
         error = new AvroRemoteException(new Utf8(e.toString()));
+        context.setError(error);
+      }
+      
+      for (RPCPlugin plugin : rpcMetaPlugins) {
+        plugin.serverSendResponse(context);
       }
-      META_WRITER.write(responseMeta, out);
+      
+      META_WRITER.write(context.responseCallMeta(), out);
       out.writeBoolean(error != null);
       if (error == null)
         writeResponse(m.getResponse(), response, out);
@@ -121,9 +146,10 @@
     } catch (AvroRuntimeException e) {            // system error
       LOG.warn("system error", e);
       error = new AvroRemoteException(e);
+      context.setError(error);
       bbo = new ByteBufferOutputStream();
       out = new BinaryEncoder(bbo);
-      META_WRITER.write(responseMeta, out);
+      META_WRITER.write(context.responseCallMeta(), out);
       out.writeBoolean(true);
       writeError(Protocol.SYSTEM_ERRORS, error, out);
     }
@@ -136,6 +162,7 @@
   private SpecificDatumReader handshakeReader =
     new SpecificDatumReader(HandshakeRequest._SCHEMA);
 
+  @SuppressWarnings("unchecked")
   private Protocol handshake(Transceiver transceiver,
                              Decoder in, Encoder out)
     throws IOException {
@@ -162,6 +189,15 @@
       response.serverProtocol = new Utf8(local.toString());
       response.serverHash = localHash;
     }
+    
+    RPCContext context = new RPCContext();
+    context.setRequestSessionMeta((Map<Utf8, ByteBuffer>) request.meta);
+    
+    for (RPCPlugin plugin : rpcMetaPlugins) {
+      plugin.serverConnecting(context);
+    }
+    response.meta = context.responseSessionMeta();
+    
     handshakeWriter.write(response, out);
     return remote;
   }

Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java?rev=799766&r1=799765&r2=799766&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java Fri Jul 31 22:56:28 2009
@@ -37,6 +37,11 @@
 /** A {@link Requestor} for existing interfaces via Java reflection. */
 public class ReflectRequestor extends Requestor implements InvocationHandler {
   protected String packageName;
+  
+  public ReflectRequestor(Class<?> iface, Transceiver transceiver)
+    throws IOException {
+    this(ReflectData.getProtocol(iface), transceiver);
+  }
 
   protected ReflectRequestor(Protocol protocol, Transceiver transceiver)
     throws IOException {
@@ -82,5 +87,12 @@
                                   new Class[] { iface },
                                   new ReflectRequestor(protocol, transciever));
   }
+  
+  /** Create a proxy instance whose methods invoke RPCs. */
+  public static Object getClient(Class<?> iface, ReflectRequestor rreq)
+    throws IOException {
+    return Proxy.newProxyInstance(iface.getClassLoader(),
+                                  new Class[] { iface }, rreq);
+  }
 }
 

Modified: hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java?rev=799766&r1=799765&r2=799766&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java Fri Jul 31 22:56:28 2009
@@ -31,6 +31,12 @@
 
 /** {@link Requestor} for generated interfaces. */
 public class SpecificRequestor extends ReflectRequestor {
+  
+  public SpecificRequestor(Class<?> iface, Transceiver transceiver)
+    throws IOException {
+    this(ReflectData.getProtocol(iface), transceiver);
+  }
+  
   private SpecificRequestor(Protocol protocol, Transceiver transceiver)
     throws IOException {
     super(protocol, transceiver);
@@ -52,5 +58,12 @@
                                   new Class[] { iface },
                                   new SpecificRequestor(protocol, transciever));
   }
+  
+  /** Create a proxy instance whose methods invoke RPCs. */
+  public static Object getClient(Class<?> iface, SpecificRequestor requestor)
+    throws IOException {
+    return Proxy.newProxyInstance(iface.getClassLoader(),
+                                  new Class[] { iface }, requestor);
+  }
 }
 

Added: hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java?rev=799766&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/RPCMetaTestPlugin.java Fri Jul 31 22:56:28 2009
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.avro.ipc.RPCContext;
+import org.apache.avro.ipc.RPCPlugin;
+import org.apache.avro.util.Utf8;
+
+/**
+ * An implementation of an RPC metadata plugin API
+ * designed for unit testing.  This plugin tests
+ * both session and per-call state by passing
+ * a string as per-call metadata, slowly building it
+ * up at each instrumentation point, testing it as
+ * it goes.  Finally, after the call or handshake is
+ * complete, the constructed string is tested.
+ */
+public final class RPCMetaTestPlugin extends RPCPlugin {
+  
+  protected final Utf8 key;
+  
+  public RPCMetaTestPlugin(String keyname) {
+    key = new Utf8(keyname);
+  }
+  
+  @Override
+  public void clientStartConnect(RPCContext context) {
+    ByteBuffer buf = ByteBuffer.wrap("ap".getBytes());
+    context.requestSessionMeta().put(key, buf);
+  }
+  
+  @Override
+  public void serverConnecting(RPCContext context) {
+    
+    Assert.assertNotNull(context.requestSessionMeta());
+    Assert.assertNotNull(context.responseSessionMeta());
+    
+    if (!context.requestSessionMeta().containsKey(key)) return;
+    
+    ByteBuffer buf = context.requestSessionMeta().get(key);
+    Assert.assertNotNull(buf);
+    Assert.assertNotNull(buf.array());
+    
+    String partialstr = new String(buf.array());
+    Assert.assertNotNull(partialstr);
+    Assert.assertEquals("partial string mismatch", "ap", partialstr);
+    
+    buf = ByteBuffer.wrap((partialstr + "ac").getBytes());
+    Assert.assertTrue(buf.remaining() > 0);
+    context.responseSessionMeta().put(key, buf);
+  }
+  
+  @Override
+  public void clientFinishConnect(RPCContext context) {
+    Map<Utf8,ByteBuffer> sessionMeta = context.responseSessionMeta();
+    
+    Assert.assertNotNull(sessionMeta);
+    
+    if (!sessionMeta.containsKey(key)) return;
+    
+    ByteBuffer buf = sessionMeta.get(key);
+    Assert.assertNotNull(buf);
+    Assert.assertNotNull(buf.array());
+    
+    String partialstr = new String(buf.array());
+    Assert.assertNotNull(partialstr);
+    Assert.assertEquals("partial string mismatch", "apac", partialstr);
+    
+    buf = ByteBuffer.wrap((partialstr + "he").getBytes());
+    Assert.assertTrue(buf.remaining() > 0);
+    sessionMeta.put(key, buf);
+    
+    checkRPCMetaMap(sessionMeta);
+  }
+  
+  @Override
+  public void clientSendRequest(RPCContext context) { 
+    ByteBuffer buf = ByteBuffer.wrap("ap".getBytes());
+    context.requestCallMeta().put(key, buf);
+  }
+  
+  @Override
+  public void serverReceiveRequest(RPCContext context) {
+    Map<Utf8,ByteBuffer> meta = context.requestCallMeta();
+    
+    Assert.assertNotNull(meta);
+    
+    if (!meta.containsKey(key)) return;
+    
+    ByteBuffer buf = meta.get(key);
+    Assert.assertNotNull(buf);
+    Assert.assertNotNull(buf.array());
+    
+    String partialstr = new String(buf.array());
+    Assert.assertNotNull(partialstr);
+    Assert.assertEquals("partial string mismatch", "ap", partialstr);
+    
+    buf = ByteBuffer.wrap((partialstr + "a").getBytes());
+    Assert.assertTrue(buf.remaining() > 0);
+    meta.put(key, buf);
+  }
+  
+  @Override
+  public void serverSendResponse(RPCContext context) {
+    Assert.assertNotNull(context.requestCallMeta());
+    Assert.assertNotNull(context.responseCallMeta());
+    
+    if (!context.requestCallMeta().containsKey(key)) return;
+    
+    ByteBuffer buf = context.requestCallMeta().get(key);
+    Assert.assertNotNull(buf);
+    Assert.assertNotNull(buf.array());
+    
+    String partialstr = new String(buf.array());
+    Assert.assertNotNull(partialstr);
+    Assert.assertEquals("partial string mismatch", "apa", partialstr);
+    
+    buf = ByteBuffer.wrap((partialstr + "c").getBytes());
+    Assert.assertTrue(buf.remaining() > 0);
+    context.responseCallMeta().put(key, buf);
+  }
+  
+  @Override
+  public void clientReceiveResponse(RPCContext context) {
+    Assert.assertNotNull(context.responseCallMeta());
+    
+    if (!context.responseCallMeta().containsKey(key)) return;
+    
+    ByteBuffer buf = context.responseCallMeta().get(key);
+    Assert.assertNotNull(buf);
+    Assert.assertNotNull(buf.array());
+    
+    String partialstr = new String(buf.array());
+    Assert.assertNotNull(partialstr);
+    Assert.assertEquals("partial string mismatch", "apac", partialstr);
+    
+    buf = ByteBuffer.wrap((partialstr + "he").getBytes());
+    Assert.assertTrue(buf.remaining() > 0);
+    context.responseCallMeta().put(key, buf);
+    
+    checkRPCMetaMap(context.responseCallMeta());
+  }
+  
+  protected void checkRPCMetaMap(Map<Utf8,ByteBuffer> rpcMeta) {
+    Assert.assertNotNull(rpcMeta);
+    Assert.assertTrue("key not present in map", rpcMeta.containsKey(key));
+    
+    ByteBuffer keybuf = rpcMeta.get(key);
+    Assert.assertNotNull(keybuf);
+    Assert.assertTrue("key BB had nothing remaining", keybuf.remaining() > 0);
+    
+    String str = new String(keybuf.array());
+    Assert.assertEquals("apache", str);
+  }
+  
+}

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java?rev=799766&r1=799765&r2=799766&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGeneric.java Fri Jul 31 22:56:28 2009
@@ -46,8 +46,8 @@
   private static final Logger LOG
     = LoggerFactory.getLogger(TestProtocolGeneric.class);
 
-  private static final File FILE = new File("src/test/schemata/simple.avpr");
-  private static final Protocol PROTOCOL;
+  protected static final File FILE = new File("src/test/schemata/simple.avpr");
+  protected static final Protocol PROTOCOL;
   static {
     try {
       PROTOCOL = Protocol.parse(FILE);
@@ -56,7 +56,7 @@
     }
   }
 
-  private static class TestResponder extends GenericResponder {
+  protected static class TestResponder extends GenericResponder {
     public TestResponder() { super(PROTOCOL); }
     public Object respond(Message message, Object request)
       throws AvroRemoteException {
@@ -91,9 +91,9 @@
 
   }
 
-  private static SocketServer server;
-  private static Transceiver client;
-  private static Requestor requestor;
+  protected static SocketServer server;
+  protected static Transceiver client;
+  protected static Requestor requestor;
 
   @Before
   public void testStartServer() throws Exception {

Added: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGenericMeta.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGenericMeta.java?rev=799766&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGenericMeta.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolGenericMeta.java Fri Jul 31 22:56:28 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import java.net.InetSocketAddress;
+
+import org.apache.avro.generic.GenericRequestor;
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.SocketServer;
+import org.apache.avro.ipc.SocketTransceiver;
+import org.junit.Before;
+
+public class TestProtocolGenericMeta extends TestProtocolGeneric {
+  
+  @Before
+  public void testStartServer() throws Exception {
+    Responder responder = new TestResponder();
+    responder.addRPCPlugin(new RPCMetaTestPlugin("key1"));
+    responder.addRPCPlugin(new RPCMetaTestPlugin("key2"));
+    server = new SocketServer(responder, new InetSocketAddress(0));
+    
+    client = new SocketTransceiver(new InetSocketAddress(server.getPort()));
+    requestor = new GenericRequestor(PROTOCOL, client);
+    requestor.addRPCPlugin(new RPCMetaTestPlugin("key1"));
+    requestor.addRPCPlugin(new RPCMetaTestPlugin("key2"));
+  }
+}

Added: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflectMeta.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflectMeta.java?rev=799766&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflectMeta.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolReflectMeta.java Fri Jul 31 22:56:28 2009
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import org.apache.avro.ipc.SocketServer;
+import org.apache.avro.ipc.SocketTransceiver;
+import org.apache.avro.reflect.ReflectRequestor;
+import org.apache.avro.reflect.ReflectResponder;
+import org.apache.avro.test.Simple;
+import org.junit.Before;
+
+import java.net.InetSocketAddress;
+
+public class TestProtocolReflectMeta extends TestProtocolReflect {
+
+  @Before
+  public void testStartServer() throws Exception {
+    ReflectResponder rresp = new ReflectResponder(Simple.class, new TestImpl());
+    rresp.addRPCPlugin(new RPCMetaTestPlugin("key1"));
+    rresp.addRPCPlugin(new RPCMetaTestPlugin("key2"));
+    server = new SocketServer(rresp, new InetSocketAddress(0));
+    
+    client = new SocketTransceiver(new InetSocketAddress(server.getPort()));
+    ReflectRequestor requestor = new ReflectRequestor(Simple.class, client);
+    requestor.addRPCPlugin(new RPCMetaTestPlugin("key1"));
+    requestor.addRPCPlugin(new RPCMetaTestPlugin("key2"));
+    proxy = (Simple)ReflectRequestor.getClient(Simple.class, (ReflectRequestor)requestor);
+  }
+
+}

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java?rev=799766&r1=799765&r2=799766&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecific.java Fri Jul 31 22:56:28 2009
@@ -47,7 +47,7 @@
   private static final Logger LOG
     = LoggerFactory.getLogger(TestProtocolSpecific.class);
 
-  private static final File SERVER_PORTS_DIR
+  protected static final File SERVER_PORTS_DIR
   = new File(System.getProperty("test.dir", "/tmp")+"/server-ports/");
 
   public static class TestImpl implements Simple {

Added: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecificMeta.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecificMeta.java?rev=799766&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecificMeta.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolSpecificMeta.java Fri Jul 31 22:56:28 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import java.net.InetSocketAddress;
+
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.SocketServer;
+import org.apache.avro.ipc.SocketTransceiver;
+import org.apache.avro.specific.SpecificRequestor;
+import org.apache.avro.specific.SpecificResponder;
+import org.apache.avro.test.Simple;
+import org.junit.Before;
+
+
+public class TestProtocolSpecificMeta extends TestProtocolSpecific {
+  
+  @Before
+  public void testStartServer() throws Exception {
+    Responder responder = new SpecificResponder(Simple.class, new TestImpl());
+    responder.addRPCPlugin(new RPCMetaTestPlugin("key1"));
+    responder.addRPCPlugin(new RPCMetaTestPlugin("key2"));
+    server = new SocketServer(responder, new InetSocketAddress(0));
+    
+    client = new SocketTransceiver(new InetSocketAddress(server.getPort()));
+    SpecificRequestor req = new SpecificRequestor(Simple.class, client);
+    req.addRPCPlugin(new RPCMetaTestPlugin("key1"));
+    req.addRPCPlugin(new RPCMetaTestPlugin("key2"));
+    proxy = (Simple)SpecificRequestor.getClient(Simple.class, (SpecificRequestor)req);
+  }
+}