You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2010/09/07 22:49:24 UTC
svn commit: r993529 - in /hadoop/common/trunk: ./
src/java/org/apache/hadoop/ipc/ src/test/core/org/apache/hadoop/ipc/
Author: sharad
Date: Tue Sep 7 20:49:23 2010
New Revision: 993529
URL: http://svn.apache.org/viewvc?rev=993529&view=rev
Log:
HADOOP-6930. AvroRpcEngine doesn't work with generated Avro code. Contributed by Sharad Agarwal.
Added:
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroSpecificRpcEngine.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/AvroSpecificTestProtocol.avpr
Modified:
hadoop/common/trunk/CHANGES.txt
hadoop/common/trunk/build.xml
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java
Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=993529&r1=993528&r2=993529&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Tue Sep 7 20:49:23 2010
@@ -235,6 +235,9 @@ Trunk (unreleased changes)
HADOOP-6938. ConnectionId.getRemotePrincipal() should check if security
is enabled. (Kan Zhang via hairong)
+ HADOOP-6930. AvroRpcEngine doesn't work with generated Avro code.
+ (sharad)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/build.xml?rev=993529&r1=993528&r2=993529&view=diff
==============================================================================
--- hadoop/common/trunk/build.xml (original)
+++ hadoop/common/trunk/build.xml Tue Sep 7 20:49:23 2010
@@ -473,6 +473,17 @@
</schema>
</target>
+ <target name="generate-avro-protocols" depends="init, ivy-retrieve-test">
+ <taskdef name="schema" classname="org.apache.avro.specific.ProtocolTask">
+ <classpath refid="test.classpath"/>
+ </taskdef>
+ <schema destdir="${test.generated.dir}">
+ <fileset dir="${test.src.dir}">
+ <include name="**/*.avpr" />
+ </fileset>
+ </schema>
+ </target>
+
<!-- ================================================================== -->
<!-- Compile test code -->
<!-- ================================================================== -->
@@ -480,7 +491,7 @@
<target name="-classes-compilation"
depends="compile-core-classes, compile-core-test"/>
- <target name="compile-core-test" depends="compile-core-classes, ivy-retrieve-test, generate-test-records, generate-avro-records">
+ <target name="compile-core-test" depends="compile-core-classes, ivy-retrieve-test, generate-test-records, generate-avro-records, generate-avro-protocols">
<mkdir dir="${test.core.build.classes}"/>
<javac
encoding="${build.encoding}"
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=993529&r1=993528&r2=993529&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java Tue Sep 7 20:49:23 2010
@@ -18,34 +18,40 @@
package org.apache.hadoop.ipc;
-import java.io.*;
-import java.util.*;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
-import javax.net.SocketFactory;
-import javax.security.auth.login.LoginException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
-import org.apache.commons.logging.*;
+import javax.net.SocketFactory;
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.reflect.ReflectRequestor;
+import org.apache.avro.reflect.ReflectResponder;
+import org.apache.avro.specific.SpecificRequestor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.net.NetUtils;
-
-import org.apache.avro.*;
-import org.apache.avro.ipc.*;
-import org.apache.avro.reflect.*;
/** Tunnel Avro-format RPC requests over a Hadoop {@link RPC} connection. This
* does not give cross-language wire compatibility, since the Hadoop RPC wire
* format is non-standard, but it does permit use of Avro's protocol versioning
* features for inter-Java RPCs. */
-class AvroRpcEngine implements RpcEngine {
+@InterfaceStability.Evolving
+public class AvroRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(RPC.class);
private static int VERSION = 0;
@@ -150,15 +156,15 @@ class AvroRpcEngine implements RpcEngine
}
}
- private static class Invoker implements InvocationHandler, Closeable {
+ private class Invoker implements InvocationHandler, Closeable {
private final ClientTransceiver tx;
- private final ReflectRequestor requestor;
+ private final SpecificRequestor requestor;
public Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf,
SocketFactory factory,
int rpcTimeout) throws IOException {
this.tx = new ClientTransceiver(addr, ticket, conf, factory, rpcTimeout);
- this.requestor = new ReflectRequestor(protocol, tx);
+ this.requestor = createRequestor(protocol, tx);
}
@Override public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
@@ -169,12 +175,20 @@ class AvroRpcEngine implements RpcEngine
}
}
- /** An Avro RPC Responder that can process requests passed via Hadoop RPC. */
- private static class TunnelResponder extends ReflectResponder
- implements TunnelProtocol {
+ protected SpecificRequestor createRequestor(Class<?> protocol,
+ Transceiver transeiver) throws IOException {
+ return new ReflectRequestor(protocol, transeiver);
+ }
+ protected Responder createResponder(Class<?> iface, Object impl) {
+ return new ReflectResponder(iface, impl);
+ }
+
+ /** An Avro RPC Responder that can process requests passed via Hadoop RPC. */
+ private class TunnelResponder implements TunnelProtocol {
+ private Responder responder;
public TunnelResponder(Class<?> iface, Object impl) {
- super(iface, impl);
+ responder = createResponder(iface, impl);
}
public long getProtocolVersion(String protocol, long version)
@@ -184,7 +198,7 @@ class AvroRpcEngine implements RpcEngine
public BufferListWritable call(final BufferListWritable request)
throws IOException {
- return new BufferListWritable(respond(request.buffers));
+ return new BufferListWritable(responder.respond(request.buffers));
}
}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroSpecificRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroSpecificRpcEngine.java?rev=993529&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroSpecificRpcEngine.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroSpecificRpcEngine.java Tue Sep 7 20:49:23 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Transceiver;
+import org.apache.avro.specific.SpecificRequestor;
+import org.apache.avro.specific.SpecificResponder;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * AvroRpcEngine which uses Avro's "specific" APIs. The protocols generated
+ * via Avro IDL needs to use this Engine.
+ */
+@InterfaceStability.Evolving
+public class AvroSpecificRpcEngine extends AvroRpcEngine {
+
+ protected SpecificRequestor createRequestor(Class<?> protocol,
+ Transceiver transeiver) throws IOException {
+ return new SpecificRequestor(protocol, transeiver);
+ }
+
+ protected Responder createResponder(Class<?> iface, Object impl) {
+ return new SpecificResponder(iface, impl);
+ }
+
+}
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=993529&r1=993528&r2=993529&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java Tue Sep 7 20:49:23 2010
@@ -78,8 +78,13 @@ public class RPC {
private static final String ENGINE_PROP = "rpc.engine";
- // set a protocol to use a non-default RpcEngine
- static void setProtocolEngine(Configuration conf,
+ /**
+ * Set a protocol to use a non-default RpcEngine.
+ * @param conf configuration to use
+ * @param protocol the protocol interface
+ * @param engine the RpcEngine impl
+ */
+ public static void setProtocolEngine(Configuration conf,
Class protocol, Class engine) {
conf.setClass(ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class);
}
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java?rev=993529&r1=993528&r2=993529&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java Tue Sep 7 20:49:23 2010
@@ -18,18 +18,21 @@
package org.apache.hadoop.ipc;
-import java.lang.reflect.Method;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.net.InetSocketAddress;
+
import javax.net.SocketFactory;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.conf.Configuration;
/** An RPC implementation. */
-interface RpcEngine {
+@InterfaceStability.Evolving
+public interface RpcEngine {
/** Construct a client-side proxy object. */
Object getProxy(Class<?> protocol,
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=993529&r1=993528&r2=993529&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java Tue Sep 7 20:49:23 2010
@@ -44,7 +44,8 @@ import org.apache.hadoop.conf.*;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
/** An RpcEngine implementation for Writable data. */
-class WritableRpcEngine implements RpcEngine {
+@InterfaceStability.Evolving
+public class WritableRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(RPC.class);
/** A method invocation, including the method name and its parameters.*/
Added: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/AvroSpecificTestProtocol.avpr
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/AvroSpecificTestProtocol.avpr?rev=993529&view=auto
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/AvroSpecificTestProtocol.avpr (added)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/AvroSpecificTestProtocol.avpr Tue Sep 7 20:49:23 2010
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "protocol" : "AvroSpecificTestProtocol",
+ "namespace" : "org.apache.hadoop.ipc",
+
+ "messages" : {
+ "echo" : {
+ "request" : [ {
+ "name" : "message",
+ "type" : "string"
+ } ],
+ "response" : "string"
+ },
+
+ "add" : {
+ "request" : [ {
+ "name" : "arg1",
+ "type" : "int"
+ }, {
+ "name" : "arg2",
+ "type" : "int",
+ "default" : 0
+ } ],
+ "response" : "int"
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java?rev=993529&r1=993528&r2=993529&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java Tue Sep 7 20:49:23 2010
@@ -18,19 +18,17 @@
package org.apache.hadoop.ipc;
-import java.io.IOException;
import java.net.InetSocketAddress;
import junit.framework.TestCase;
-import org.apache.commons.logging.*;
-
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.util.Utf8;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.net.NetUtils;
-import org.apache.avro.ipc.AvroRemoteException;
-
/** Unit tests for AvroRpc. */
public class TestAvroRpc extends TestCase {
private static final String ADDRESS = "0.0.0.0";
@@ -94,4 +92,47 @@ public class TestAvroRpc extends TestCas
server.stop();
}
}
+
+ public void testAvroSpecificRpc() throws Exception {
+ Configuration conf = new Configuration();
+ RPC.setProtocolEngine(conf, AvroSpecificTestProtocol.class,
+ AvroSpecificRpcEngine.class);
+ Server server = RPC.getServer(AvroSpecificTestProtocol.class,
+ new AvroSpecificTestProtocolImpl(),
+ ADDRESS, 0, conf);
+ AvroSpecificTestProtocol proxy = null;
+ try {
+ server.start();
+
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ proxy =
+ (AvroSpecificTestProtocol)RPC.getProxy(AvroSpecificTestProtocol.class,
+ 0, addr, conf);
+
+ Utf8 echo = proxy.echo(new Utf8("hello world"));
+ assertEquals("hello world", echo.toString());
+
+ int intResult = proxy.add(1, 2);
+ assertEquals(3, intResult);
+
+ } finally {
+ server.stop();
+ }
+ }
+
+ public static class AvroSpecificTestProtocolImpl implements
+ AvroSpecificTestProtocol {
+
+ @Override
+ public int add(int arg1, int arg2) throws AvroRemoteException {
+ return arg1 + arg2;
+ }
+
+ @Override
+ public Utf8 echo(Utf8 msg) throws AvroRemoteException {
+ return msg;
+ }
+
+ }
+
}