You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2010/09/07 22:49:24 UTC

svn commit: r993529 - in /hadoop/common/trunk: ./ src/java/org/apache/hadoop/ipc/ src/test/core/org/apache/hadoop/ipc/

Author: sharad
Date: Tue Sep  7 20:49:23 2010
New Revision: 993529

URL: http://svn.apache.org/viewvc?rev=993529&view=rev
Log:
HADOOP-6930. AvroRpcEngine doesn't work with generated Avro code. Contributed by Sharad Agarwal.

Added:
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroSpecificRpcEngine.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/AvroSpecificTestProtocol.avpr
Modified:
    hadoop/common/trunk/CHANGES.txt
    hadoop/common/trunk/build.xml
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java

Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=993529&r1=993528&r2=993529&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Tue Sep  7 20:49:23 2010
@@ -235,6 +235,9 @@ Trunk (unreleased changes)
     HADOOP-6938. ConnectionId.getRemotePrincipal() should check if security
     is enabled. (Kan Zhang via hairong)
 
+    HADOOP-6930. AvroRpcEngine doesn't work with generated Avro code. 
+    (sharad)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/build.xml?rev=993529&r1=993528&r2=993529&view=diff
==============================================================================
--- hadoop/common/trunk/build.xml (original)
+++ hadoop/common/trunk/build.xml Tue Sep  7 20:49:23 2010
@@ -473,6 +473,17 @@
     </schema>
   </target>
 
+  <target name="generate-avro-protocols" depends="init, ivy-retrieve-test">
+    <taskdef name="schema" classname="org.apache.avro.specific.ProtocolTask">
+      <classpath refid="test.classpath"/>
+    </taskdef>
+    <schema destdir="${test.generated.dir}">
+      <fileset dir="${test.src.dir}">
+        <include name="**/*.avpr" />
+     </fileset>
+    </schema>
+  </target>
+
   <!-- ================================================================== -->
   <!-- Compile test code                                                  --> 
   <!-- ================================================================== -->
@@ -480,7 +491,7 @@
   <target name="-classes-compilation"
     depends="compile-core-classes, compile-core-test"/> 
 
-  <target name="compile-core-test" depends="compile-core-classes, ivy-retrieve-test, generate-test-records, generate-avro-records">
+  <target name="compile-core-test" depends="compile-core-classes, ivy-retrieve-test, generate-test-records, generate-avro-records, generate-avro-protocols">
     <mkdir dir="${test.core.build.classes}"/>
     <javac 
      encoding="${build.encoding}" 

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=993529&r1=993528&r2=993529&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java Tue Sep  7 20:49:23 2010
@@ -18,34 +18,40 @@
 
 package org.apache.hadoop.ipc;
 
-import java.io.*;
-import java.util.*;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
-import javax.net.SocketFactory;
-import javax.security.auth.login.LoginException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 
-import org.apache.commons.logging.*;
+import javax.net.SocketFactory;
 
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.reflect.ReflectRequestor;
+import org.apache.avro.reflect.ReflectResponder;
+import org.apache.avro.specific.SpecificRequestor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.net.NetUtils;
-
-import org.apache.avro.*;
-import org.apache.avro.ipc.*;
-import org.apache.avro.reflect.*;
 
 /** Tunnel Avro-format RPC requests over a Hadoop {@link RPC} connection.  This
  * does not give cross-language wire compatibility, since the Hadoop RPC wire
  * format is non-standard, but it does permit use of Avro's protocol versioning
  * features for inter-Java RPCs. */
-class AvroRpcEngine implements RpcEngine {
+@InterfaceStability.Evolving
+public class AvroRpcEngine implements RpcEngine {
   private static final Log LOG = LogFactory.getLog(RPC.class);
 
   private static int VERSION = 0;
@@ -150,15 +156,15 @@ class AvroRpcEngine implements RpcEngine
     }
   }
 
-  private static class Invoker implements InvocationHandler, Closeable {
+  private class Invoker implements InvocationHandler, Closeable {
     private final ClientTransceiver tx;
-    private final ReflectRequestor requestor;
+    private final SpecificRequestor requestor;
     public Invoker(Class<?> protocol, InetSocketAddress addr,
                    UserGroupInformation ticket, Configuration conf,
                    SocketFactory factory,
                    int rpcTimeout) throws IOException {
       this.tx = new ClientTransceiver(addr, ticket, conf, factory, rpcTimeout);
-      this.requestor = new ReflectRequestor(protocol, tx);
+      this.requestor = createRequestor(protocol, tx);
     }
     @Override public Object invoke(Object proxy, Method method, Object[] args) 
       throws Throwable {
@@ -169,12 +175,20 @@ class AvroRpcEngine implements RpcEngine
     }
   }
 
-  /** An Avro RPC Responder that can process requests passed via Hadoop RPC. */
-  private static class TunnelResponder extends ReflectResponder
-    implements TunnelProtocol {
+  protected SpecificRequestor createRequestor(Class<?> protocol, 
+      Transceiver transeiver) throws IOException {
+    return new ReflectRequestor(protocol, transeiver);
+  }
 
+  protected Responder createResponder(Class<?> iface, Object impl) {
+    return new ReflectResponder(iface, impl);
+  }
+
+  /** An Avro RPC Responder that can process requests passed via Hadoop RPC. */
+  private class TunnelResponder implements TunnelProtocol {
+    private Responder responder;
     public TunnelResponder(Class<?> iface, Object impl) {
-      super(iface, impl);
+      responder = createResponder(iface, impl);
     }
 
     public long getProtocolVersion(String protocol, long version)
@@ -184,7 +198,7 @@ class AvroRpcEngine implements RpcEngine
 
     public BufferListWritable call(final BufferListWritable request)
       throws IOException {
-      return new BufferListWritable(respond(request.buffers));
+      return new BufferListWritable(responder.respond(request.buffers));
     }
   }
 

Added: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroSpecificRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroSpecificRpcEngine.java?rev=993529&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroSpecificRpcEngine.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroSpecificRpcEngine.java Tue Sep  7 20:49:23 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.specific.SpecificRequestor;
+import org.apache.avro.specific.SpecificResponder;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * AvroRpcEngine which uses Avro's "specific" APIs. The protocols generated 
+ * via Avro IDL needs to use this Engine.
+ */
+@InterfaceStability.Evolving
+public class AvroSpecificRpcEngine extends AvroRpcEngine {
+
+  protected SpecificRequestor createRequestor(Class<?> protocol, 
+      Transceiver transeiver) throws IOException {
+    return new SpecificRequestor(protocol, transeiver);
+  }
+
+  protected Responder createResponder(Class<?> iface, Object impl) {
+    return new SpecificResponder(iface, impl);
+  }
+
+}

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=993529&r1=993528&r2=993529&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java Tue Sep  7 20:49:23 2010
@@ -78,8 +78,13 @@ public class RPC {
 
   private static final String ENGINE_PROP = "rpc.engine";
 
-  // set a protocol to use a non-default RpcEngine
-  static void setProtocolEngine(Configuration conf,
+  /**
+   * Set a protocol to use a non-default RpcEngine.
+   * @param conf configuration to use
+   * @param protocol the protocol interface
+   * @param engine the RpcEngine impl
+   */
+  public static void setProtocolEngine(Configuration conf,
                                 Class protocol, Class engine) {
     conf.setClass(ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class);
   }

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java?rev=993529&r1=993528&r2=993529&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java Tue Sep  7 20:49:23 2010
@@ -18,18 +18,21 @@
 
 package org.apache.hadoop.ipc;
 
-import java.lang.reflect.Method;
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
+
 import javax.net.SocketFactory;
 
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.conf.Configuration;
 
 /** An RPC implementation. */
-interface RpcEngine {
+@InterfaceStability.Evolving
+public interface RpcEngine {
 
   /** Construct a client-side proxy object. */
   Object getProxy(Class<?> protocol,

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=993529&r1=993528&r2=993529&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java Tue Sep  7 20:49:23 2010
@@ -44,7 +44,8 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 
 /** An RpcEngine implementation for Writable data. */
-class WritableRpcEngine implements RpcEngine {
+@InterfaceStability.Evolving
+public class WritableRpcEngine implements RpcEngine {
   private static final Log LOG = LogFactory.getLog(RPC.class);
 
   /** A method invocation, including the method name and its parameters.*/

Added: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/AvroSpecificTestProtocol.avpr
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/AvroSpecificTestProtocol.avpr?rev=993529&view=auto
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/AvroSpecificTestProtocol.avpr (added)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/AvroSpecificTestProtocol.avpr Tue Sep  7 20:49:23 2010
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "protocol" : "AvroSpecificTestProtocol",
+  "namespace" : "org.apache.hadoop.ipc",
+
+  "messages" : {
+    "echo" : {
+      "request" : [ {
+        "name" : "message",
+        "type" : "string"
+      } ],
+      "response" : "string"
+    },
+    
+    "add" : {
+      "request" : [ {
+        "name" : "arg1",
+        "type" : "int"
+      }, {
+        "name" : "arg2",
+        "type" : "int",
+        "default" : 0
+      } ],
+      "response" : "int"
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java?rev=993529&r1=993528&r2=993529&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java Tue Sep  7 20:49:23 2010
@@ -18,19 +18,17 @@
 
 package org.apache.hadoop.ipc;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import junit.framework.TestCase;
 
-import org.apache.commons.logging.*;
-
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.util.Utf8;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-
 import org.apache.hadoop.net.NetUtils;
 
-import org.apache.avro.ipc.AvroRemoteException;
-
 /** Unit tests for AvroRpc. */
 public class TestAvroRpc extends TestCase {
   private static final String ADDRESS = "0.0.0.0";
@@ -94,4 +92,47 @@ public class TestAvroRpc extends TestCas
       server.stop();
     }
   }
+
+  public void testAvroSpecificRpc() throws Exception {
+    Configuration conf = new Configuration();
+    RPC.setProtocolEngine(conf, AvroSpecificTestProtocol.class, 
+        AvroSpecificRpcEngine.class);
+    Server server = RPC.getServer(AvroSpecificTestProtocol.class,
+                                  new AvroSpecificTestProtocolImpl(), 
+                                  ADDRESS, 0, conf);
+    AvroSpecificTestProtocol proxy = null;
+    try {
+      server.start();
+
+      InetSocketAddress addr = NetUtils.getConnectAddress(server);
+      proxy =
+        (AvroSpecificTestProtocol)RPC.getProxy(AvroSpecificTestProtocol.class, 
+            0, addr, conf);
+      
+      Utf8 echo = proxy.echo(new Utf8("hello world"));
+      assertEquals("hello world", echo.toString());
+
+      int intResult = proxy.add(1, 2);
+      assertEquals(3, intResult);
+
+    } finally {
+      server.stop();
+    }
+  }
+  
+  public static class AvroSpecificTestProtocolImpl implements 
+      AvroSpecificTestProtocol {
+
+    @Override
+    public int add(int arg1, int arg2) throws AvroRemoteException {
+      return arg1 + arg2;
+    }
+
+    @Override
+    public Utf8 echo(Utf8 msg) throws AvroRemoteException {
+      return msg;
+    }
+    
+  }
+  
 }