You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/04/01 00:23:34 UTC
svn commit: r1087462 [13/20] - in /hadoop/mapreduce/branches/MR-279: ./
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/
mr-client/...
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/YarnRemoteExceptionFactoryPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/YarnRemoteExceptionFactoryPBImpl.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/YarnRemoteExceptionFactoryPBImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/YarnRemoteExceptionFactoryPBImpl.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,34 @@
+package org.apache.hadoop.yarn.factories.impl.pb;
+
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
+import org.apache.hadoop.yarn.factories.YarnRemoteExceptionFactory;
+
+public class YarnRemoteExceptionFactoryPBImpl implements
+ YarnRemoteExceptionFactory {
+
+ private static final YarnRemoteExceptionFactory self = new YarnRemoteExceptionFactoryPBImpl();
+
+ private YarnRemoteExceptionFactoryPBImpl() {
+ }
+
+ public static YarnRemoteExceptionFactory get() {
+ return self;
+ }
+
+ @Override
+ public YarnRemoteException createYarnRemoteException(String message) {
+ return new YarnRemoteExceptionPBImpl(message);
+ }
+
+ @Override
+ public YarnRemoteException createYarnRemoteException(String message,
+ Throwable t) {
+ return new YarnRemoteExceptionPBImpl(message, t);
+ }
+
+ @Override
+ public YarnRemoteException createYarnRemoteException(Throwable t) {
+ return new YarnRemoteExceptionPBImpl(t);
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/RecordFactoryProvider.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/RecordFactoryProvider.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/RecordFactoryProvider.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/RecordFactoryProvider.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,56 @@
+package org.apache.hadoop.yarn.factory.providers;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
+
+public class RecordFactoryProvider {
+
+ public static String RPC_SERIALIZER_KEY = "org.apache.yarn.ipc.rpc.serializer.property";
+ public static String RPC_SERIALIZER_DEFAULT = "protocolbuffers";
+
+ public static String RECORD_FACTORY_CLASS_KEY = "org.apache.yarn.ipc.record.factory.class";
+
+ private RecordFactoryProvider() {
+ }
+
+ public static RecordFactory getRecordFactory(Configuration conf) {
+ if (conf == null) {
+ //Assuming the default configuration has the correct factories set.
+ //Users can specify a particular factory by providing a configuration.
+ conf = new Configuration();
+ }
+ String recordFactoryClassName = conf.get(RECORD_FACTORY_CLASS_KEY);
+ if (recordFactoryClassName == null) {
+ String serializer = conf.get(RPC_SERIALIZER_KEY, RPC_SERIALIZER_DEFAULT);
+ if (serializer.equals(RPC_SERIALIZER_DEFAULT)) {
+ return RecordFactoryPBImpl.get();
+ } else {
+ throw new YarnException("Unknown serializer: [" + conf.get(RPC_SERIALIZER_KEY) + "]. Use keys: [" + RECORD_FACTORY_CLASS_KEY + "] to specify Record factory");
+ }
+ } else {
+ return (RecordFactory) getFactoryClassInstance(recordFactoryClassName);
+ }
+ }
+
+ private static Object getFactoryClassInstance(String factoryClassName) {
+ try {
+ Class clazz = Class.forName(factoryClassName);
+ Method method = clazz.getMethod("get", null);
+ method.setAccessible(true);
+ return method.invoke(null, null);
+ } catch (ClassNotFoundException e) {
+ throw new YarnException(e);
+ } catch (NoSuchMethodException e) {
+ throw new YarnException(e);
+ } catch (InvocationTargetException e) {
+ throw new YarnException(e);
+ } catch (IllegalAccessException e) {
+ throw new YarnException(e);
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/RpcFactoryProvider.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/RpcFactoryProvider.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/RpcFactoryProvider.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/RpcFactoryProvider.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,77 @@
+package org.apache.hadoop.yarn.factory.providers;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.InvalidParameterException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.factories.RpcClientFactory;
+import org.apache.hadoop.yarn.factories.RpcServerFactory;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
+
+/**
+ * A public static get() method must be present in the Client/Server Factory implementation.
+ */
+public class RpcFactoryProvider {
+
+ //TODO Move these keys to CommonConfigurationKeys
+ public static String RPC_SERIALIZER_KEY = "org.apache.yarn.ipc.rpc.serializer.property";
+ public static String RPC_SERIALIZER_DEFAULT = "protocolbuffers";
+
+ public static String RPC_CLIENT_FACTORY_CLASS_KEY = "org.apache.yarn.ipc.client.factory.class";
+ public static String RPC_SERVER_FACTORY_CLASS_KEY = "org.apache.yarn.ipc.server.factory.class";
+
+ private RpcFactoryProvider() {
+
+ }
+
+
+ public static RpcServerFactory getServerFactory(Configuration conf) {
+ if (conf == null) {
+ conf = new Configuration();
+ }
+ String serverFactoryClassName = conf.get(RPC_SERVER_FACTORY_CLASS_KEY);
+ if (serverFactoryClassName == null) {
+ if (conf.get(RPC_SERIALIZER_KEY, RPC_SERIALIZER_DEFAULT).equals(RPC_SERIALIZER_DEFAULT)) {
+ return RpcServerFactoryPBImpl.get();
+ } else {
+ throw new YarnException("Unknown serializer: [" + conf.get(RPC_SERIALIZER_KEY) + "]. Use keys: [" + RPC_CLIENT_FACTORY_CLASS_KEY + "][" + RPC_SERVER_FACTORY_CLASS_KEY + "] to specify factories");
+ }
+ } else {
+ return (RpcServerFactory) getFactoryClassInstance(serverFactoryClassName);
+ }
+ }
+
+ public static RpcClientFactory getClientFactory(Configuration conf) {
+ String clientFactoryClassName = conf.get(RPC_CLIENT_FACTORY_CLASS_KEY);
+ if (clientFactoryClassName == null) {
+ if (conf.get(RPC_SERIALIZER_KEY, RPC_SERIALIZER_DEFAULT).equals(RPC_SERIALIZER_DEFAULT)) {
+ return RpcClientFactoryPBImpl.get();
+ } else {
+ throw new YarnException("Unknown serializer: [" + conf.get(RPC_SERIALIZER_KEY) + "]. Use keys: [" + RPC_CLIENT_FACTORY_CLASS_KEY + "][" + RPC_SERVER_FACTORY_CLASS_KEY + "] to specify factories");
+ }
+ } else {
+ return(RpcClientFactory) getFactoryClassInstance(clientFactoryClassName);
+ }
+ }
+
+ private static Object getFactoryClassInstance(String factoryClassName) {
+ try {
+ Class clazz = Class.forName(factoryClassName);
+ Method method = clazz.getMethod("get", null);
+ method.setAccessible(true);
+ return method.invoke(null, null);
+ } catch (ClassNotFoundException e) {
+ throw new YarnException(e);
+ } catch (NoSuchMethodException e) {
+ throw new YarnException(e);
+ } catch (InvocationTargetException e) {
+ throw new YarnException(e);
+ } catch (IllegalAccessException e) {
+ throw new YarnException(e);
+ }
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/YarnRemoteExceptionFactoryProvider.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/YarnRemoteExceptionFactoryProvider.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/YarnRemoteExceptionFactoryProvider.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/YarnRemoteExceptionFactoryProvider.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,54 @@
+package org.apache.hadoop.yarn.factory.providers;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.factories.YarnRemoteExceptionFactory;
+import org.apache.hadoop.yarn.factories.impl.pb.YarnRemoteExceptionFactoryPBImpl;
+
+public class YarnRemoteExceptionFactoryProvider {
+
+ public static String RPC_SERIALIZER_KEY = "org.apache.yarn.ipc.rpc.serializer.property";
+ public static String RPC_SERIALIZER_DEFAULT = "protocolbuffers";
+
+ public static String EXCEPTION_FACTORY_CLASS_KEY = "org.apache.yarn.ipc.exception.factory.class";
+
+ private YarnRemoteExceptionFactoryProvider() {
+ }
+
+ public static YarnRemoteExceptionFactory getYarnRemoteExceptionFactory(Configuration conf) {
+ if (conf == null) {
+ conf = new Configuration();
+ }
+ String recordFactoryClassName = conf.get(EXCEPTION_FACTORY_CLASS_KEY);
+ if (recordFactoryClassName == null) {
+ String serializer = conf.get(RPC_SERIALIZER_KEY, RPC_SERIALIZER_DEFAULT);
+ if (serializer.equals(RPC_SERIALIZER_DEFAULT)) {
+ return YarnRemoteExceptionFactoryPBImpl.get();
+ } else {
+ throw new YarnException("Unknown serializer: [" + conf.get(RPC_SERIALIZER_KEY) + "]. Use keys: [" + EXCEPTION_FACTORY_CLASS_KEY + "] to specify Exception factory");
+ }
+ } else {
+ return (YarnRemoteExceptionFactory) getFactoryClassInstance(recordFactoryClassName);
+ }
+ }
+
+ private static Object getFactoryClassInstance(String factoryClassName) {
+ try {
+ Class clazz = Class.forName(factoryClassName);
+ Method method = clazz.getMethod("get", null);
+ method.setAccessible(true);
+ return method.invoke(null, null);
+ } catch (ClassNotFoundException e) {
+ throw new YarnException(e);
+ } catch (NoSuchMethodException e) {
+ throw new YarnException(e);
+ } catch (InvocationTargetException e) {
+ throw new YarnException(e);
+ } catch (IllegalAccessException e) {
+ throw new YarnException(e);
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnProtoRPC.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnProtoRPC.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnProtoRPC.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnProtoRPC.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,73 @@
+package org.apache.hadoop.yarn.ipc;
+
+import java.net.InetSocketAddress;
+
+import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.factory.providers.RpcFactoryProvider;
+
+/**
+ * This uses Hadoop RPC. Uses a tunnel ProtoSpecificRpcEngine over
+ * Hadoop connection.
+ * This does not give cross-language wire compatibility, since the Hadoop
+ * RPC wire format is non-standard, but it does permit use of Protocol Buffers
+ * protocol versioning features for inter-Java RPCs.
+ */
+public class HadoopYarnProtoRPC extends YarnRPC {
+
+ private static final Log LOG = LogFactory.getLog(HadoopYarnRPC.class);
+
+ @Override
+ public Object getProxy(Class protocol, InetSocketAddress addr,
+ Configuration conf) {
+ Configuration myConf = new Configuration(conf);
+ LOG.info("Creating a HadoopYarnProtoRpc proxy for protocol " + protocol);
+ LOG.debug("Configured SecurityInfo class name is "
+ + myConf.get(CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME));
+
+ return RpcFactoryProvider.getClientFactory(myConf).getClient(protocol, 1, addr, myConf);
+ }
+
+ @Override
+ public Server getServer(Class protocol, Object instance,
+ InetSocketAddress addr, Configuration conf,
+ SecretManager<? extends TokenIdentifier> secretManager) {
+ LOG.info("Creating a HadoopYarnProtoRpc server for protocol " + protocol);
+ LOG.info("Configured SecurityInfo class name is "
+ + conf.get(CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME));
+
+ final RPC.Server hadoopServer;
+ hadoopServer = RpcFactoryProvider.getServerFactory(conf).getServer(protocol, instance, addr, conf, secretManager);
+
+ Server server = new Server() {
+ @Override
+ public void close() {
+ hadoopServer.stop();
+ }
+
+ @Override
+ public int getPort() {
+ return hadoopServer.getListenerAddress().getPort();
+ }
+
+ @Override
+ public void join() throws InterruptedException {
+ hadoopServer.join();
+ }
+
+ @Override
+ public void start() {
+ hadoopServer.start();
+ }
+ };
+ return server;
+
+ }
+
+}
\ No newline at end of file
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,296 @@
+package org.apache.hadoop.yarn.ipc;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcEngine;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.ipc.WritableRpcEngine;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
+import org.apache.hadoop.yarn.ipc.RpcProtos.ProtoSpecificRpcRequest;
+import org.apache.hadoop.yarn.ipc.RpcProtos.ProtoSpecificRpcResponse;
+
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.ServiceException;
+
+
+@InterfaceStability.Evolving
+public class ProtoOverHadoopRpcEngine implements RpcEngine {
+ private static final Log LOG = LogFactory.getLog(RPC.class);
+
+ private static int VERSION = 0;
+
+ private static final RpcEngine ENGINE = new WritableRpcEngine();
+
+ /** Tunnel a Proto RPC request and response through Hadoop's RPC. */
+ private static interface TunnelProtocol extends VersionedProtocol {
+ /** All Proto methods and responses go through this. */
+ ProtoSpecificResponseWritable call(ProtoSpecificRequestWritable request) throws IOException;
+ }
+
+
+ @Override
+ public Object getProxy(Class<?> protocol, long clientVersion,
+ InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+ SocketFactory factory, int rpcTimeout) throws IOException {
+
+
+ return Proxy.newProxyInstance(protocol.getClassLoader(),
+ new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
+ factory, rpcTimeout));
+ }
+
+ @Override
+ public void stopProxy(Object proxy) {
+ try {
+ ((Invoker) Proxy.getInvocationHandler(proxy)).close();
+ } catch (IOException e) {
+ LOG.warn("Error while stopping " + proxy, e);
+ }
+ }
+
+ private class Invoker implements InvocationHandler, Closeable {
+ private TunnelProtocol tunnel;
+ private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>();
+
+ public Invoker(Class<?> protocol, InetSocketAddress addr,
+ UserGroupInformation ticket, Configuration conf, SocketFactory factory,
+ int rpcTimeout) throws IOException {
+ this.tunnel = (TunnelProtocol) ENGINE.getProxy(TunnelProtocol.class,
+ VERSION, addr, ticket, conf, factory, rpcTimeout);
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ ProtoSpecificRpcRequest rpcRequest = constructRpcRequest(method, args);
+ ProtoSpecificResponseWritable val = null;
+ try {
+ val = tunnel.call(new ProtoSpecificRequestWritable(rpcRequest));
+ } catch (Exception e) {
+ throw new ServiceException(e);
+ }
+
+ ProtoSpecificRpcResponse response = val.message;
+
+ if (response.hasIsError() && response.getIsError() == true) {
+ YarnRemoteExceptionPBImpl exception = new YarnRemoteExceptionPBImpl(response.getException());
+ exception.fillInStackTrace();
+ ServiceException se = new ServiceException(exception);
+ throw se;
+ }
+
+ Message prototype = null;
+ try {
+ prototype = getReturnProtoType(method);
+ } catch (Exception e) {
+ throw new ServiceException(e);
+// YarnRemoteExceptionPBImpl exception = new YarnRemoteExceptionPBImpl("Could not get prototype PB return type for method: [" + method.getName() + "]", e);
+ }
+ Message actualReturnMessage = prototype.newBuilderForType().mergeFrom(response.getResponseProto()).build();
+ return actualReturnMessage;
+ }
+
+ public void close() throws IOException {
+ ENGINE.stopProxy(tunnel);
+ }
+
+ private Message getReturnProtoType(Method method) throws Exception {
+ if (returnTypes.containsKey(method.getName())) {
+ return returnTypes.get(method.getName());
+ } else {
+ Class<?> returnType = method.getReturnType();
+
+ Method newInstMethod = returnType.getMethod("getDefaultInstance", null);
+ newInstMethod.setAccessible(true);
+ Message prototype = (Message) newInstMethod.invoke(null, null);
+ returnTypes.put(method.getName(), prototype);
+ return prototype;
+ }
+ }
+ }
+
+ private class TunnelResponder implements TunnelProtocol {
+ BlockingService service;
+
+ public TunnelResponder(Class<?> iface, Object impl) {
+ this.service = (BlockingService)impl;
+ }
+
+ public long getProtocolVersion(String protocol, long version)
+ throws IOException {
+ return VERSION;
+ }
+
+ public ProtoSpecificResponseWritable call(final ProtoSpecificRequestWritable request)
+ throws IOException {
+ ProtoSpecificRpcRequest rpcRequest = request.message;
+ String methodName = rpcRequest.getMethodName();
+ MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
+
+ Message prototype = service.getRequestPrototype(methodDescriptor);
+ Message param = prototype.newBuilderForType().mergeFrom(rpcRequest.getRequestProto()).build();
+
+ Message result;
+ try {
+ result = service.callBlockingMethod(methodDescriptor, null, param);
+ } catch (ServiceException e) {
+ return handleException(e);
+ } catch (Exception e) {
+ return handleException(e);
+ }
+
+ ProtoSpecificRpcResponse response = constructProtoSpecificRpcSuccessResponse(result);
+ return new ProtoSpecificResponseWritable(response);
+ }
+
+ private ProtoSpecificResponseWritable handleException (Throwable e) {
+ ProtoSpecificRpcResponse.Builder builder = ProtoSpecificRpcResponse.newBuilder();
+ builder.setIsError(true);
+ if (e.getCause() instanceof YarnRemoteExceptionPBImpl) {
+ builder.setException(((YarnRemoteExceptionPBImpl)e.getCause()).getProto());
+ } else {
+ builder.setException(new YarnRemoteExceptionPBImpl(e).getProto());
+ }
+ ProtoSpecificRpcResponse response = builder.build();
+ return new ProtoSpecificResponseWritable(response);
+ }
+ }
+
+ @Override
+ public Object[] call(Method method, Object[][] params,
+ InetSocketAddress[] addrs, UserGroupInformation ticket, Configuration conf)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+
+ @Override
+ public RPC.Server getServer(Class<?> protocol, Object instance,
+ String bindAddress, int port, int numHandlers, boolean verbose,
+ Configuration conf, SecretManager<? extends TokenIdentifier> secretManager)
+ throws IOException {
+
+ return ENGINE
+ .getServer(TunnelProtocol.class,
+ new TunnelResponder(protocol, instance), bindAddress, port,
+ numHandlers, verbose, conf, secretManager);
+ }
+
+
+ private Class<?>[] getRequestParameterTypes(Message[] messages) {
+ Class<?> [] paramTypes = new Class<?>[messages.length];
+ for (int i = 0 ; i < messages.length ; i++) {
+ paramTypes[i] = messages[i].getClass();
+ }
+ return paramTypes;
+ }
+
+ private ProtoSpecificRpcRequest constructRpcRequest(Method method,
+ Object[] params) throws ServiceException {
+ ProtoSpecificRpcRequest rpcRequest;
+ ProtoSpecificRpcRequest.Builder builder;
+
+ builder = ProtoSpecificRpcRequest.newBuilder();
+ builder.setMethodName(method.getName());
+
+ if (params.length != 2) { //RpcController + Message
+ throw new ServiceException("Too many parameters for request. Method: [" + method.getName() + "]" + ", Expected: 2, Actual: " + params.length);
+ }
+ if (params[1] == null) {
+ throw new ServiceException("null param while calling Method: [" + method.getName() +"]");
+ }
+
+ Message param = (Message) params[1];
+ builder.setRequestProto(param.toByteString());
+
+ rpcRequest = builder.build();
+ return rpcRequest;
+ }
+
+ private ProtoSpecificRpcResponse constructProtoSpecificRpcSuccessResponse(Message message) {
+ ProtoSpecificRpcResponse res = ProtoSpecificRpcResponse.newBuilder().setResponseProto(message.toByteString()).build();
+ return res;
+ }
+
+
+ /**
+ * Writable Wrapper for Protocol Buffer Responses
+ */
+ private static class ProtoSpecificResponseWritable implements Writable {
+ ProtoSpecificRpcResponse message;
+
+ public ProtoSpecificResponseWritable() {
+ }
+
+ ProtoSpecificResponseWritable(ProtoSpecificRpcResponse message) {
+ this.message = message;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+// System.err.println("XXX: writing length: " + message.toByteArray().length);
+ out.writeInt(message.toByteArray().length);
+ out.write(message.toByteArray());
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int length = in.readInt();
+// System.err.println("YYY: Reading length: " + length);
+ byte[] bytes = new byte[length];
+ in.readFully(bytes);
+ message = ProtoSpecificRpcResponse.parseFrom(bytes);
+ }
+ }
+
+ /**
+ * Writable Wrapper for Protocol Buffer Requests
+ */
+ private static class ProtoSpecificRequestWritable implements Writable {
+ ProtoSpecificRpcRequest message;
+
+ public ProtoSpecificRequestWritable() {
+ }
+
+ ProtoSpecificRequestWritable(ProtoSpecificRpcRequest message) {
+ this.message = message;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(message.toByteArray().length);
+ out.write(message.toByteArray());
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int length = in.readInt();
+ byte[] bytes = new byte[length];
+ in.readFully(bytes);
+ message = ProtoSpecificRpcRequest.parseFrom(bytes);
+ }
+ }
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/RPCUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/RPCUtil.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/RPCUtil.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/RPCUtil.java Thu Mar 31 22:23:22 2011
@@ -18,50 +18,38 @@
package org.apache.hadoop.yarn.ipc;
-import org.apache.hadoop.yarn.YarnRemoteException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.YarnRemoteExceptionFactory;
+import org.apache.hadoop.yarn.factory.providers.YarnRemoteExceptionFactoryProvider;
public class RPCUtil {
+
+ /**
+ * Relying on the default factory configuration to be set correctly
+ * for the default configuration.
+ */
+ private static Configuration conf = new Configuration();
+ private static YarnRemoteExceptionFactory exceptionFactory = YarnRemoteExceptionFactoryProvider.getYarnRemoteExceptionFactory(conf);
+
/**
- * Returns the YarnRemoteException which is serializable by
- * Avro.
+ * Returns the YarnRemoteException which is serializable.
*/
public static YarnRemoteException getRemoteException(Throwable t) {
- YarnRemoteException e = new YarnRemoteException();
- if (t != null) {
- e.message = t.getMessage();
- StringBuilder buf = new StringBuilder();
- StackTraceElement[] trace = t.getStackTrace();
- if (trace != null) {
- for (StackTraceElement element : trace) {
- buf.append(element.toString() + "\n at ");
- }
- e.trace = buf.toString();
- }
- Throwable cause = t.getCause();
- if (cause != null) {
- e.cause = getRemoteException(cause);
- }
- }
- return e;
+ return exceptionFactory.createYarnRemoteException(t);
}
/**
- * Returns the YarnRemoteException which is serializable by
- * Avro.
+ * Returns the YarnRemoteException which is serializable.
*/
public static YarnRemoteException getRemoteException(String message) {
- YarnRemoteException e = new YarnRemoteException();
- if (message != null) {
- e.message = message;
- }
- return e;
+ return exceptionFactory.createYarnRemoteException(message);
}
public static String toString(YarnRemoteException e) {
- return (e.message == null ? "" : e.message) +
- (e.trace == null ? "" : "\n StackTrace: " + e.trace) +
- (e.cause == null ? "" : "\n Caused by: " + toString(e.cause));
-
+ return (e.getMessage() == null ? "" : e.getMessage()) +
+ (e.getRemoteTrace() == null ? "" : "\n StackTrace: " + e.getRemoteTrace()) +
+ (e.getCause() == null ? "" : "\n Caused by: " + toString(e.getCause()));
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java Thu Mar 31 22:23:22 2011
@@ -36,7 +36,7 @@ public abstract class YarnRPC {
//use the default as Hadoop RPC
public static final String DEFAULT_RPC
- = "org.apache.hadoop.yarn.ipc.HadoopYarnRPC";
+ = "org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC";
public abstract Object getProxy(Class protocol, InetSocketAddress addr,
Configuration conf);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java Thu Mar 31 22:23:22 2011
@@ -25,7 +25,7 @@ import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
// TODO: Make it avro-ish. TokenIdentifier really isn't serialized
// as writable but simply uses readFields method in SaslRpcServer
@@ -39,8 +39,8 @@ public class ApplicationTokenIdentifier
// TODO: Add more information in the tokenID such that it is not
// transferrable, more secure etc.
- public ApplicationTokenIdentifier(ApplicationID id) {
- this.appId = new Text(Integer.toString(id.id));
+ public ApplicationTokenIdentifier(ApplicationId id) {
+ this.appId = new Text(Integer.toString(id.getId()));
}
public ApplicationTokenIdentifier() {
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java Thu Mar 31 22:23:22 2011
@@ -27,9 +27,10 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
public class ContainerTokenIdentifier extends TokenIdentifier {
@@ -38,12 +39,12 @@ public class ContainerTokenIdentifier ex
public static final Text KIND = new Text("ContainerToken");
- private ContainerID containerID;
+ private ContainerId containerId;
private String nmHostName;
private Resource resource;
- public ContainerTokenIdentifier(ContainerID containerID, String hostName, Resource r) {
- this.containerID = containerID;
+ public ContainerTokenIdentifier(ContainerId containerID, String hostName, Resource r) {
+ this.containerId = containerID;
this.nmHostName = hostName;
this.resource = r;
}
@@ -51,8 +52,8 @@ public class ContainerTokenIdentifier ex
public ContainerTokenIdentifier() {
}
- public ContainerID getContainerID() {
- return containerID;
+ public ContainerId getContainerID() {
+ return containerId;
}
public String getNmHostName() {
@@ -66,21 +67,21 @@ public class ContainerTokenIdentifier ex
@Override
public void write(DataOutput out) throws IOException {
LOG.info("Writing ContainerTokenIdentifier to RPC layer");
- out.writeInt(this.containerID.appID.id);
- out.writeInt(this.containerID.id);
+ out.writeInt(this.containerId.getAppId().getId());
+ out.writeInt(this.containerId.getId());
out.writeUTF(this.nmHostName);
- out.writeInt(this.resource.memory); // TODO: more resources.
+ out.writeInt(this.resource.getMemory()); // TODO: more resources.
}
@Override
public void readFields(DataInput in) throws IOException {
- this.containerID = new ContainerID();
- this.containerID.appID = new ApplicationID();
- this.containerID.appID.id = in.readInt();
- this.containerID.id = in.readInt();
+ this.containerId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ContainerId.class);
+ this.containerId.setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
+ this.containerId.getAppId().setId(in.readInt());
+ this.containerId.setId(in.readInt());
this.nmHostName = in.readUTF();
- this.resource = new Resource();
- this.resource.memory = in.readInt(); // TODO: more resources.
+ this.resource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
+ this.resource.setMemory(in.readInt()); // TODO: more resources.
}
@Override
@@ -90,7 +91,7 @@ public class ContainerTokenIdentifier ex
@Override
public UserGroupInformation getUser() {
- return UserGroupInformation.createRemoteUser(this.containerID.toString());
+ return UserGroupInformation.createRemoteUser(this.containerId.toString());
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Apps.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Apps.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Apps.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Apps.java Thu Mar 31 22:23:22 2011
@@ -21,7 +21,8 @@ package org.apache.hadoop.yarn.util;
import java.util.Iterator;
import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import static org.apache.hadoop.yarn.util.StringHelper.*;
@@ -32,25 +33,25 @@ public class Apps {
public static final String APP = "app";
public static final String ID = "ID";
- public static String toString(ApplicationID id) {
- return _join("app", id.clusterTimeStamp, id.id);
+ public static String toString(ApplicationId id) {
+ return _join("app", id.getClusterTimestamp(), id.getId());
}
- public static ApplicationID toAppID(String aid) {
+ public static ApplicationId toAppID(String aid) {
Iterator<String> it = _split(aid).iterator();
return toAppID(APP, aid, it);
}
- public static ApplicationID toAppID(String prefix, String s, Iterator<String> it) {
+ public static ApplicationId toAppID(String prefix, String s, Iterator<String> it) {
if (!it.hasNext() || !it.next().equals(prefix)) {
throwParseException(sjoin(prefix, ID), s);
}
shouldHaveNext(prefix, s, it);
- ApplicationID appID = new ApplicationID();
- appID.clusterTimeStamp = Long.parseLong(it.next());
+ ApplicationId appId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
+ appId.setClusterTimestamp(Long.parseLong(it.next()));
shouldHaveNext(prefix, s, it);
- appID.id = Integer.parseInt(it.next());
- return appID;
+ appId.setId(Integer.parseInt(it.next()));
+ return appId;
}
public static void shouldHaveNext(String prefix, String s, Iterator<String> it) {
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java Thu Mar 31 22:23:22 2011
@@ -20,9 +20,10 @@ package org.apache.hadoop.yarn.util;
import java.net.URI;
-import org.apache.hadoop.yarn.LocalResource;
-import org.apache.hadoop.yarn.LocalResourceType;
-import org.apache.hadoop.yarn.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
/**
* Builder utilities to construct various objects.
@@ -33,12 +34,12 @@ public class BuilderUtils {
public static LocalResource newLocalResource(URI uri,
LocalResourceType type, LocalResourceVisibility visibility,
long size, long timestamp) {
- LocalResource resource = new LocalResource();
- resource.resource = AvroUtil.getYarnUrlFromURI(uri);
- resource.type = type;
- resource.state = visibility;
- resource.size = size;
- resource.timestamp = timestamp;
+ LocalResource resource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(LocalResource.class);
+ resource.setResource(ConverterUtils.getYarnUrlFromURI(uri));
+ resource.setType(type);
+ resource.setVisibility(visibility);
+ resource.setSize(size);
+ resource.setTimestamp(timestamp);
return resource;
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ContainerBuilderHelper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ContainerBuilderHelper.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ContainerBuilderHelper.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ContainerBuilderHelper.java Thu Mar 31 22:23:22 2011
@@ -20,7 +20,8 @@ package org.apache.hadoop.yarn.util;
import java.util.regex.Pattern;
-import org.apache.hadoop.yarn.YarnContainerTags;
+import org.apache.hadoop.yarn.api.records.YarnContainerTags;
+
// TODO: Remove this and related stuff?
public class ContainerBuilderHelper {
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,127 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.util;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.text.NumberFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+/**
+ * This class contains a set of utilities which help converting data structures
+ * from/to 'serializableFormat' to/from hadoop/nativejava data structures.
+ *
+ */
+public class ConverterUtils {
+
+ /**
+ * return a hadoop path from a given url
+ *
+ * @param url
+ * url to convert
+ * @return
+ * @throws URISyntaxException
+ */
+ public static Path getPathFromYarnURL(URL url) throws URISyntaxException {
+ String scheme = url.getScheme() == null ? "" : url.getScheme();
+ String authority = url.getHost() != null ? url.getHost() + ":" + url.getPort()
+ : "";
+ return new Path(
+ (new URI(scheme, authority, url.getFile(), null, null))
+ .normalize());
+ }
+
+ /**
+ * change from CharSequence to string for map key and value
+ * @param env
+ * @return
+ */
+ public static Map<String, String> convertToString(
+ Map<CharSequence, CharSequence> env) {
+
+ Map<String, String> stringMap = new HashMap<String, String>();
+ for (Entry<CharSequence, CharSequence> entry: env.entrySet()) {
+ stringMap.put(entry.getKey().toString(), entry.getValue().toString());
+ }
+ return stringMap;
+ }
+
+ public static URL getYarnUrlFromPath(Path path) {
+ return getYarnUrlFromURI(path.toUri());
+ }
+
+ public static URL getYarnUrlFromURI(URI uri) {
+ URL url = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(URL.class);
+ if (uri.getHost() != null) {
+ url.setHost(uri.getHost());
+ }
+ url.setPort(uri.getPort());
+ url.setScheme(uri.getScheme());
+ url.setFile(uri.getPath());
+ return url;
+ }
+
+ // TODO: Why thread local?
+ private static final ThreadLocal<NumberFormat> appIdFormat =
+ new ThreadLocal<NumberFormat>() {
+ @Override
+ public NumberFormat initialValue() {
+ NumberFormat fmt = NumberFormat.getInstance();
+ fmt.setGroupingUsed(false);
+ fmt.setMinimumIntegerDigits(4);
+ return fmt;
+ }
+ };
+
+ // TODO: Why thread local?
+ private static final ThreadLocal<NumberFormat> containerIdFormat =
+ new ThreadLocal<NumberFormat>() {
+ @Override
+ public NumberFormat initialValue() {
+ NumberFormat fmt = NumberFormat.getInstance();
+ fmt.setGroupingUsed(false);
+ fmt.setMinimumIntegerDigits(6);
+ return fmt;
+ }
+ };
+
+ public static String toString(ApplicationId appId) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("application_").append(appId.getClusterTimestamp()).append("_");
+ sb.append(appIdFormat.get().format(appId.getId()));
+ return sb.toString();
+ }
+
+ public static String toString(ContainerId cId) {
+ StringBuilder sb = new StringBuilder();
+ ApplicationId appId = cId.getAppId();
+ sb.append("container_").append(appId.getClusterTimestamp()).append("_");
+ sb.append(appIdFormat.get().format(appId.getId())).append("_");
+ sb.append(containerIdFormat.get().format(cId.getId()));
+ return sb.toString();
+ }
+}
\ No newline at end of file
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/SecurityUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/SecurityUtil.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/SecurityUtil.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/SecurityUtil.java Thu Mar 31 22:23:22 2011
@@ -22,24 +22,21 @@ import java.io.IOException;
import javax.crypto.SecretKey;
-import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.KerberosName;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
-import org.apache.hadoop.yarn.ApplicationID;
public class SecurityUtil {
@@ -48,7 +45,7 @@ public class SecurityUtil {
// imitation of operations done by client to upload FileSystem
// DelegationTokens
public static void uploadFileSystemDelegationTokens(Configuration conf,
- ApplicationID appId, Path[] resourcePaths, Credentials credentials)
+ ApplicationId appId, Path[] resourcePaths, Credentials credentials)
throws IOException {
getFileSystemTokens(conf, resourcePaths, credentials);
@@ -56,7 +53,7 @@ public class SecurityUtil {
FileSystem defaultFS = FileSystem.get(conf);
// TODO: fix
credentials.writeTokenStorageFile(
- new Path("yarn", Integer.toString(appId.id),
+ new Path("yarn", Integer.toString(appId.getId()),
YarnConfiguration.FS_TOKENS_FILE_NAME).makeQualified(
FileSystem.getDefaultUri(conf), defaultFS.getWorkingDirectory()),
conf);
@@ -86,7 +83,7 @@ public class SecurityUtil {
// TODO: ApplicationMaster needs one token for each NodeManager. This should
// be created by ResourceManager and sent to ApplicationMaster via RPC.
- public static void loadContainerManagerTokens(ApplicationID appId,
+ public static void loadContainerManagerTokens(ApplicationId appId,
Configuration conf, String nmServiceAddress) throws IOException {
Path masterKeyFile =
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/proto/yarnprototunnelrpc.proto
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/proto/yarnprototunnelrpc.proto?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/proto/yarnprototunnelrpc.proto (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/proto/yarnprototunnelrpc.proto Thu Mar 31 22:23:22 2011
@@ -0,0 +1,17 @@
+option java_package = "org.apache.hadoop.yarn.ipc";
+option java_outer_classname = "RpcProtos";
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+
+message ProtoSpecificRpcRequest {
+ required string method_name = 1;
+ optional bytes request_proto = 2;
+}
+
+message ProtoSpecificRpcResponse {
+ optional bytes response_proto = 1;
+
+ optional bool is_error = 2;
+ optional YarnRemoteExceptionProto exception = 3;
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java Thu Mar 31 22:23:22 2011
@@ -24,6 +24,12 @@ import com.google.common.collect.Lists;
import java.util.Iterator;
import java.util.List;
+import org.apache.hadoop.yarn.Application;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.api.records.ApplicationStatus;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
/**
* Utilities to generate fake test apps
*/
@@ -67,14 +73,14 @@ public class MockApps {
}
public static Application newApp(int i) {
- final ApplicationID id = newAppID(i);
+ final ApplicationId id = newAppID(i);
final ApplicationStatus status = newAppStatus();
final ApplicationState state = newAppState();
final String user = newUserName();
final String name = newAppName();
final String queue = newQueue();
return new Application() {
- @Override public ApplicationID id() { return id; }
+ @Override public ApplicationId id() { return id; }
@Override public CharSequence user() { return user; }
@Override public CharSequence name() { return name; }
@Override public ApplicationStatus status() { return status; }
@@ -95,17 +101,17 @@ public class MockApps {
};
}
- public static ApplicationID newAppID(int i) {
- ApplicationID id = new ApplicationID();
- id.clusterTimeStamp = TS;
- id.id = i;
+ public static ApplicationId newAppID(int i) {
+ ApplicationId id = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
+ id.setClusterTimestamp(TS);
+ id.setId(i);
return id;
}
public static ApplicationStatus newAppStatus() {
- ApplicationStatus status = new ApplicationStatus();
- status.progress = (float) Math.random();
- status.lastSeen = System.currentTimeMillis();
+ ApplicationStatus status = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationStatus.class);
+ status.setProgress((float)Math.random());
+ status.setLastSeen(System.currentTimeMillis());
return status;
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java Thu Mar 31 22:23:22 2011
@@ -24,39 +24,56 @@ import java.util.HashMap;
import junit.framework.Assert;
-import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.ipc.Server;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.CleanupContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CleanupContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.factory.providers.YarnRemoteExceptionFactoryProvider;
import org.apache.hadoop.yarn.ipc.AvroYarnRPC;
+import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.HadoopYarnRPC;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerLaunchContext;
-import org.apache.hadoop.yarn.ContainerManager;
-import org.apache.hadoop.yarn.ContainerState;
-import org.apache.hadoop.yarn.ContainerStatus;
-import org.apache.hadoop.yarn.Resource;
-import org.apache.hadoop.yarn.YarnRemoteException;
import org.junit.Test;
public class TestRPC {
private static final String EXCEPTION_MSG = "test error";
private static final String EXCEPTION_CAUSE = "exception cause";
+ private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
- @Test
- public void testAvroRPC() throws Exception {
- test(AvroYarnRPC.class.getName());
- }
+// @Test
+// public void testAvroRPC() throws Exception {
+// test(AvroYarnRPC.class.getName());
+// }
+//
+// @Test
+// public void testHadoopNativeRPC() throws Exception {
+// test(HadoopYarnRPC.class.getName());
+// }
@Test
- public void testHadoopNativeRPC() throws Exception {
- test(HadoopYarnRPC.class.getName());
+ public void testHadoopProtoRPC() throws Exception {
+ test(HadoopYarnProtoRPC.class.getName());
}
-
+
private void test(String rpcClass) throws Exception {
Configuration conf = new Configuration();
conf.set(YarnRPC.RPC_CLASSNAME, rpcClass);
@@ -69,33 +86,44 @@ public class TestRPC {
ContainerManager proxy = (ContainerManager)
rpc.getProxy(ContainerManager.class,
NetUtils.createSocketAddr("localhost:" + server.getPort()), conf);
- ContainerLaunchContext containerLaunchContext = new ContainerLaunchContext();
- containerLaunchContext.user = "dummy-user";
- containerLaunchContext.id = new ContainerID();
- containerLaunchContext.id.appID = new ApplicationID();
- containerLaunchContext.id.appID.id = 0;
- containerLaunchContext.id.id = 100;
- containerLaunchContext.env = new HashMap<CharSequence, CharSequence>();
- containerLaunchContext.resource = new Resource();
- containerLaunchContext.command = new ArrayList<CharSequence>();
- proxy.startContainer(containerLaunchContext);
- ContainerStatus status = proxy.getContainerStatus(containerLaunchContext.id);
+ ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ containerLaunchContext.setUser("dummy-user");
+ containerLaunchContext.setContainerId(recordFactory.newRecordInstance(ContainerId.class));
+ containerLaunchContext.getContainerId().setAppId(recordFactory.newRecordInstance(ApplicationId.class));
+ containerLaunchContext.getContainerId().getAppId().setId(0);
+ containerLaunchContext.getContainerId().setId(100);
+ containerLaunchContext.setResource(recordFactory.newRecordInstance(Resource.class));
+// containerLaunchContext.env = new HashMap<CharSequence, CharSequence>();
+// containerLaunchContext.command = new ArrayList<CharSequence>();
+
+ StartContainerRequest scRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
+ scRequest.setContainerLaunchContext(containerLaunchContext);
+ proxy.startContainer(scRequest);
+
+ GetContainerStatusRequest gcsRequest = recordFactory.newRecordInstance(GetContainerStatusRequest.class);
+ gcsRequest.setContainerId(containerLaunchContext.getContainerId());
+ GetContainerStatusResponse response = proxy.getContainerStatus(gcsRequest);
+ ContainerStatus status = response.getStatus();
//test remote exception
boolean exception = false;
try {
- proxy.stopContainer(containerLaunchContext.id);
+ StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
+ stopRequest.setContainerId(containerLaunchContext.getContainerId());
+ proxy.stopContainer(stopRequest);
} catch (YarnRemoteException e) {
exception = true;
- Assert.assertTrue(EXCEPTION_MSG.equals(e.message.toString()));
- Assert.assertTrue(EXCEPTION_CAUSE.equals(e.cause.message.toString()));
+ System.err.println(e.getMessage());
+ System.err.println(e.getCause().getMessage());
+ Assert.assertTrue(EXCEPTION_MSG.equals(e.getMessage()));
+ Assert.assertTrue(EXCEPTION_CAUSE.equals(e.getCause().getMessage()));
System.out.println("Test Exception is " + RPCUtil.toString(e));
}
Assert.assertTrue(exception);
server.close();
Assert.assertNotNull(status);
- Assert.assertEquals(ContainerState.RUNNING, status.state.RUNNING);
+ Assert.assertEquals(ContainerState.RUNNING, status.getState().RUNNING);
}
public class DummyContainerManager implements ContainerManager {
@@ -103,34 +131,35 @@ public class TestRPC {
private ContainerStatus status = null;
@Override
- public Void cleanupContainer(ContainerID containerId)
- throws AvroRemoteException {
- return null;
+ public CleanupContainerResponse cleanupContainer(CleanupContainerRequest request) throws YarnRemoteException {
+ CleanupContainerResponse response = recordFactory.newRecordInstance(CleanupContainerResponse.class);
+ return response;
}
-
+
+
@Override
- public ContainerStatus getContainerStatus(ContainerID containerId)
- throws AvroRemoteException {
- return status;
+ public GetContainerStatusResponse getContainerStatus(GetContainerStatusRequest request) throws YarnRemoteException {
+ GetContainerStatusResponse response = recordFactory.newRecordInstance(GetContainerStatusResponse.class);
+ response.setStatus(status);
+ return response;
}
@Override
- public Void startContainer(ContainerLaunchContext container)
- throws AvroRemoteException {
- status = new ContainerStatus();
- status.state = ContainerState.RUNNING;
- status.containerID = container.id;
- status.exitStatus = 0;
- return null;
+ public StartContainerResponse startContainer(StartContainerRequest request) throws YarnRemoteException {
+ ContainerLaunchContext container = request.getContainerLaunchContext();
+ StartContainerResponse response = recordFactory.newRecordInstance(StartContainerResponse.class);
+ status = recordFactory.newRecordInstance(ContainerStatus.class);
+ status.setState(ContainerState.RUNNING);
+ status.setContainerId(container.getContainerId());
+ status.setExitStatus(0);
+ return response;
}
@Override
- public Void stopContainer(ContainerID containerId)
- throws AvroRemoteException {
+ public StopContainerResponse stopContainer(StopContainerRequest request) throws YarnRemoteException {
Exception e = new Exception(EXCEPTION_MSG,
new Exception(EXCEPTION_CAUSE));
- throw RPCUtil.getRemoteException(e);
+ throw YarnRemoteExceptionFactoryProvider.getYarnRemoteExceptionFactory(null).createYarnRemoteException(e);
}
-
}
}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPCFactories.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPCFactories.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPCFactories.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPCFactories.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,107 @@
+package org.apache.hadoop.yarn;
+
+import java.net.InetSocketAddress;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
+import org.junit.Test;
+
+public class TestRPCFactories {
+
+
+
+ @Test
+ public void test() {
+ testPbServerFactory();
+
+ testPbClientFactory();
+ }
+
+
+
+ private void testPbServerFactory() {
+ InetSocketAddress addr = new InetSocketAddress(0);
+ Configuration conf = new Configuration();
+ AMRMProtocol instance = new AMRMProtocolTestImpl();
+ Server server = null;
+ try {
+ server = RpcServerFactoryPBImpl.get().getServer(AMRMProtocol.class, instance, addr, conf, null);
+ server.start();
+ } catch (YarnException e) {
+ e.printStackTrace();
+ Assert.fail("Failed to create server");
+ } finally {
+ if (server != null) {
+ server.stop();
+ }
+ }
+ }
+
+
+ private void testPbClientFactory() {
+ InetSocketAddress addr = new InetSocketAddress(0);
+ System.err.println(addr.getHostName() + addr.getPort());
+ Configuration conf = new Configuration();
+ AMRMProtocol instance = new AMRMProtocolTestImpl();
+ Server server = null;
+ try {
+ server = RpcServerFactoryPBImpl.get().getServer(AMRMProtocol.class, instance, addr, conf, null);
+ server.start();
+ System.err.println(server.getListenerAddress());
+ System.err.println(NetUtils.getConnectAddress(server));
+
+ AMRMProtocol amrmClient = null;
+ try {
+ amrmClient = (AMRMProtocol) RpcClientFactoryPBImpl.get().getClient(AMRMProtocol.class, 1, NetUtils.getConnectAddress(server), conf);
+ } catch (YarnException e) {
+ e.printStackTrace();
+ Assert.fail("Failed to create client");
+ }
+
+ } catch (YarnException e) {
+ e.printStackTrace();
+ Assert.fail("Failed to create server");
+ } finally {
+ if (server != null) {
+ server.stop();
+ }
+ }
+ }
+
+ public class AMRMProtocolTestImpl implements AMRMProtocol {
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request) throws YarnRemoteException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request) throws YarnRemoteException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnRemoteException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,36 @@
+package org.apache.hadoop.yarn;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.impl.pb.AMResponsePBImpl;
+import org.junit.Test;
+
+public class TestRecordFactory {
+
+ @Test
+ public void testPbRecordFactory() {
+ RecordFactory pbRecordFactory = RecordFactoryPBImpl.get();
+
+ try {
+ AMResponse response = pbRecordFactory.newRecordInstance(AMResponse.class);
+ Assert.assertEquals(AMResponsePBImpl.class, response.getClass());
+ } catch (YarnException e) {
+ e.printStackTrace();
+ Assert.fail("Failed to crete record");
+ }
+
+ try {
+ AllocateRequest response = pbRecordFactory.newRecordInstance(AllocateRequest.class);
+ Assert.assertEquals(AllocateRequestPBImpl.class, response.getClass());
+ } catch (YarnException e) {
+ e.printStackTrace();
+ Assert.fail("Failed to crete record");
+ }
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRpcFactoryProvider.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRpcFactoryProvider.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRpcFactoryProvider.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRpcFactoryProvider.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,54 @@
+package org.apache.hadoop.yarn;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.factories.RpcClientFactory;
+import org.apache.hadoop.yarn.factories.RpcServerFactory;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
+import org.apache.hadoop.yarn.factory.providers.RpcFactoryProvider;
+import org.junit.Test;
+
+public class TestRpcFactoryProvider {
+
+ @Test
+ public void testFactoryProvider() {
+ Configuration conf = new Configuration();
+ RpcClientFactory clientFactory = null;
+ RpcServerFactory serverFactory = null;
+
+
+ clientFactory = RpcFactoryProvider.getClientFactory(conf);
+ serverFactory = RpcFactoryProvider.getServerFactory(conf);
+ Assert.assertEquals(RpcClientFactoryPBImpl.class, clientFactory.getClass());
+ Assert.assertEquals(RpcServerFactoryPBImpl.class, serverFactory.getClass());
+
+ conf.set(RpcFactoryProvider.RPC_SERIALIZER_KEY, "writable");
+ try {
+ clientFactory = RpcFactoryProvider.getClientFactory(conf);
+ Assert.fail("Expected an exception - unknown serializer");
+ } catch (YarnException e) {
+ }
+ try {
+ serverFactory = RpcFactoryProvider.getServerFactory(conf);
+ Assert.fail("Expected an exception - unknown serializer");
+ } catch (YarnException e) {
+ }
+
+ conf = new Configuration();
+ conf.set(RpcFactoryProvider.RPC_CLIENT_FACTORY_CLASS_KEY, "NonExistantClass");
+ conf.set(RpcFactoryProvider.RPC_SERVER_FACTORY_CLASS_KEY, RpcServerFactoryPBImpl.class.getName());
+
+ try {
+ clientFactory = RpcFactoryProvider.getClientFactory(conf);
+ Assert.fail("Expected an exception - unknown class");
+ } catch (YarnException e) {
+ }
+ try {
+ serverFactory = RpcFactoryProvider.getServerFactory(conf);
+ } catch (YarnException e) {
+ Assert.fail("Error while loading factory using reflection: [" + RpcServerFactoryPBImpl.class.getName() + "]");
+ }
+ }
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/pom.xml?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/pom.xml (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/pom.xml Thu Mar 31 22:23:22 2011
@@ -1,4 +1,5 @@
-<?xml version="1.0"?><project>
+<?xml version="1.0"?>
+<project>
<parent>
<artifactId>yarn-server</artifactId>
<groupId>org.apache.hadoop</groupId>
@@ -27,7 +28,7 @@
<artifactId>zookeeper</artifactId>
<version>3.3.1</version>
<scope>compile</scope>
- <exclusions>
+ <exclusions>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
@@ -38,11 +39,16 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>2.4.0a</version>
+ </dependency>
</dependencies>
<build>
<plugins>
- <plugin>
+ <!--plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.4.0-SNAPSHOT</version>
@@ -54,7 +60,76 @@
</goals>
</execution>
</executions>
+ </plugin-->
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>create_generate_src_dirctory</id>
+ <phase>initialize</phase>
+ <configuration>
+ <tasks>
+ <mkdir dir="target/generated-sources/proto" />
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
</plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2</version>
+ <executions>
+ <execution>
+ <id>generate-sources</id>
+ <phase>generate-sources</phase>
+ <configuration>
+ <executable>protoc</executable>
+ <arguments>
+ <argument>-I../../yarn-api/src/main/proto/</argument>
+ <argument>-Isrc/main/proto/</argument>
+ <argument>--java_out=target/generated-sources/proto</argument>
+ <argument>src/main/proto/yarn_server_common_protos.proto</argument>
+ <argument>src/main/proto/yarn_server_common_service_protos.proto</argument>
+ <argument>src/main/proto/ResourceTracker.proto</argument>
+ </arguments>
+ </configuration>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/proto</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+
+
+
</plugins>
</build>
</project>
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,14 @@
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+
+public interface ResourceTracker {
+
+ public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnRemoteException;
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException;
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,70 @@
+package org.apache.hadoop.yarn.server.api.impl.pb.client;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine;
+import org.apache.hadoop.yarn.proto.ResourceTracker.ResourceTrackerService;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl;
+
+import com.google.protobuf.ServiceException;
+
+public class ResourceTrackerPBClientImpl implements ResourceTracker {
+
+private ResourceTrackerService.BlockingInterface proxy;
+
+ public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
+ RPC.setProtocolEngine(conf, ResourceTrackerService.BlockingInterface.class, ProtoOverHadoopRpcEngine.class);
+ proxy = (ResourceTrackerService.BlockingInterface)RPC.getProxy(
+ ResourceTrackerService.BlockingInterface.class, clientVersion, addr, conf);
+ }
+
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request) throws YarnRemoteException {
+ RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
+ try {
+ return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager(null, requestProto));
+ } catch (ServiceException e) {
+ if (e.getCause() instanceof YarnRemoteException) {
+ throw (YarnRemoteException)e.getCause();
+ } else if (e.getCause() instanceof UndeclaredThrowableException) {
+ throw (UndeclaredThrowableException)e.getCause();
+ } else {
+ throw new UndeclaredThrowableException(e);
+ }
+ }
+ }
+
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ throws YarnRemoteException {
+ NodeHeartbeatRequestProto requestProto = ((NodeHeartbeatRequestPBImpl)request).getProto();
+ try {
+ return new NodeHeartbeatResponsePBImpl(proxy.nodeHeartbeat(null, requestProto));
+ } catch (ServiceException e) {
+ if (e.getCause() instanceof YarnRemoteException) {
+ throw (YarnRemoteException)e.getCause();
+ } else if (e.getCause() instanceof UndeclaredThrowableException) {
+ throw (UndeclaredThrowableException)e.getCause();
+ } else {
+ throw new UndeclaredThrowableException(e);
+ }
+ }
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceTrackerPBServiceImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceTrackerPBServiceImpl.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceTrackerPBServiceImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceTrackerPBServiceImpl.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,53 @@
+package org.apache.hadoop.yarn.server.api.impl.pb.service;
+
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.proto.ResourceTracker.ResourceTrackerService.BlockingInterface;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProto;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class ResourceTrackerPBServiceImpl implements BlockingInterface {
+
+ private ResourceTracker real;
+
+ public ResourceTrackerPBServiceImpl(ResourceTracker impl) {
+ this.real = impl;
+ }
+
+ @Override
+ public RegisterNodeManagerResponseProto registerNodeManager(
+ RpcController controller, RegisterNodeManagerRequestProto proto)
+ throws ServiceException {
+ RegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);
+ try {
+ RegisterNodeManagerResponse response = real.registerNodeManager(request);
+ return ((RegisterNodeManagerResponsePBImpl)response).getProto();
+ } catch (YarnRemoteException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public NodeHeartbeatResponseProto nodeHeartbeat(RpcController controller,
+ NodeHeartbeatRequestProto proto) throws ServiceException {
+ NodeHeartbeatRequestPBImpl request = new NodeHeartbeatRequestPBImpl(proto);
+ try {
+ NodeHeartbeatResponse response = real.nodeHeartbeat(request);
+ return ((NodeHeartbeatResponsePBImpl)response).getProto();
+ } catch (YarnRemoteException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,10 @@
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+
+
+public interface NodeHeartbeatRequest {
+ public abstract NodeStatus getNodeStatus();
+
+ public abstract void setNodeStatus(NodeStatus status);
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,9 @@
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+
+public interface NodeHeartbeatResponse {
+ public abstract HeartbeatResponse getHeartbeatResponse();
+
+ public abstract void setHeartbeatResponse(HeartbeatResponse heartbeatResponse);
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,11 @@
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public interface RegisterNodeManagerRequest {
+ public abstract String getNode();
+ public abstract Resource getResource();
+
+ public abstract void setNode(String node);
+ public abstract void setResource(Resource resource);
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,10 @@
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
+
+public interface RegisterNodeManagerResponse {
+ public abstract RegistrationResponse getRegistrationResponse();
+
+ public abstract void setRegistrationResponse(RegistrationResponse registrationResponse);
+
+}