You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/04/01 00:23:34 UTC

svn commit: r1087462 [13/20] - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ mr-client/...

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/YarnRemoteExceptionFactoryPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/YarnRemoteExceptionFactoryPBImpl.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/YarnRemoteExceptionFactoryPBImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factories/impl/pb/YarnRemoteExceptionFactoryPBImpl.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,34 @@
+package org.apache.hadoop.yarn.factories.impl.pb;
+
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
+import org.apache.hadoop.yarn.factories.YarnRemoteExceptionFactory;
+
+public class YarnRemoteExceptionFactoryPBImpl implements
+    YarnRemoteExceptionFactory {
+
+  private static final YarnRemoteExceptionFactory self = new YarnRemoteExceptionFactoryPBImpl();
+
+  private YarnRemoteExceptionFactoryPBImpl() {
+  }
+
+  public static YarnRemoteExceptionFactory get() {
+    return self;
+  }
+
+  @Override
+  public YarnRemoteException createYarnRemoteException(String message) {
+    return new YarnRemoteExceptionPBImpl(message);
+  }
+
+  @Override
+  public YarnRemoteException createYarnRemoteException(String message,
+      Throwable t) {
+    return new YarnRemoteExceptionPBImpl(message, t);
+  }
+
+  @Override
+  public YarnRemoteException createYarnRemoteException(Throwable t) {
+    return new YarnRemoteExceptionPBImpl(t);
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/RecordFactoryProvider.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/RecordFactoryProvider.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/RecordFactoryProvider.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/RecordFactoryProvider.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,56 @@
+package org.apache.hadoop.yarn.factory.providers;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
+
+public class RecordFactoryProvider {
+
+  public static String RPC_SERIALIZER_KEY = "org.apache.yarn.ipc.rpc.serializer.property";
+  public static String RPC_SERIALIZER_DEFAULT = "protocolbuffers";
+  
+  public static String RECORD_FACTORY_CLASS_KEY = "org.apache.yarn.ipc.record.factory.class";
+  
+  private RecordFactoryProvider() {
+  }
+  
+  public static RecordFactory getRecordFactory(Configuration conf) {
+    if (conf == null) {
+      //Assuming the default configuration has the correct factories set.
+      //Users can specify a particular factory by providing a configuration.
+      conf = new Configuration();
+    }
+    String recordFactoryClassName = conf.get(RECORD_FACTORY_CLASS_KEY);
+    if (recordFactoryClassName == null) {
+      String serializer = conf.get(RPC_SERIALIZER_KEY, RPC_SERIALIZER_DEFAULT);
+      if (serializer.equals(RPC_SERIALIZER_DEFAULT)) {
+        return RecordFactoryPBImpl.get();
+      } else {
+        throw new YarnException("Unknown serializer: [" + conf.get(RPC_SERIALIZER_KEY) + "]. Use keys: [" + RECORD_FACTORY_CLASS_KEY + "] to specify Record factory");
+      }
+    } else {
+      return (RecordFactory) getFactoryClassInstance(recordFactoryClassName);
+    }
+  }
+  
+  private static Object getFactoryClassInstance(String factoryClassName) {
+    try {
+      Class clazz = Class.forName(factoryClassName);
+      Method method = clazz.getMethod("get", null);
+      method.setAccessible(true);
+      return method.invoke(null, null);
+    } catch (ClassNotFoundException e) {
+      throw new YarnException(e);
+    } catch (NoSuchMethodException e) {
+      throw new YarnException(e);
+    } catch (InvocationTargetException e) {
+      throw new YarnException(e);
+    } catch (IllegalAccessException e) {
+      throw new YarnException(e);
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/RpcFactoryProvider.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/RpcFactoryProvider.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/RpcFactoryProvider.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/RpcFactoryProvider.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,77 @@
+package org.apache.hadoop.yarn.factory.providers;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.InvalidParameterException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.factories.RpcClientFactory;
+import org.apache.hadoop.yarn.factories.RpcServerFactory;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
+
+/**
+ * A public static get() method must be present in the Client/Server Factory implementation.
+ */
+public class RpcFactoryProvider {
+  
+  //TODO Move these keys to CommonConfigurationKeys
+  public static String RPC_SERIALIZER_KEY = "org.apache.yarn.ipc.rpc.serializer.property";
+  public static String RPC_SERIALIZER_DEFAULT = "protocolbuffers";
+
+  public static String RPC_CLIENT_FACTORY_CLASS_KEY = "org.apache.yarn.ipc.client.factory.class";
+  public static String RPC_SERVER_FACTORY_CLASS_KEY = "org.apache.yarn.ipc.server.factory.class";
+  
+  private RpcFactoryProvider() {
+    
+  }
+  
+  
+  public static RpcServerFactory getServerFactory(Configuration conf) {
+    if (conf == null) {
+      conf = new Configuration();
+    }
+    String serverFactoryClassName = conf.get(RPC_SERVER_FACTORY_CLASS_KEY);
+    if (serverFactoryClassName == null) {
+      if (conf.get(RPC_SERIALIZER_KEY, RPC_SERIALIZER_DEFAULT).equals(RPC_SERIALIZER_DEFAULT)) {
+        return RpcServerFactoryPBImpl.get();
+      } else {
+        throw new YarnException("Unknown serializer: [" + conf.get(RPC_SERIALIZER_KEY) + "]. Use keys: [" + RPC_CLIENT_FACTORY_CLASS_KEY + "][" + RPC_SERVER_FACTORY_CLASS_KEY + "] to specify factories");
+      }
+    } else {
+      return (RpcServerFactory) getFactoryClassInstance(serverFactoryClassName);
+    }
+  }
+  
+  public static RpcClientFactory getClientFactory(Configuration conf) {
+    String clientFactoryClassName = conf.get(RPC_CLIENT_FACTORY_CLASS_KEY);
+    if (clientFactoryClassName == null) {
+      if (conf.get(RPC_SERIALIZER_KEY, RPC_SERIALIZER_DEFAULT).equals(RPC_SERIALIZER_DEFAULT)) {
+        return RpcClientFactoryPBImpl.get();
+      } else {
+        throw new YarnException("Unknown serializer: [" + conf.get(RPC_SERIALIZER_KEY) + "]. Use keys: [" + RPC_CLIENT_FACTORY_CLASS_KEY + "][" + RPC_SERVER_FACTORY_CLASS_KEY + "] to specify factories");
+      }
+    } else {
+      return(RpcClientFactory) getFactoryClassInstance(clientFactoryClassName);
+    }
+  }
+
+  private static Object getFactoryClassInstance(String factoryClassName) {
+    try {
+      Class clazz = Class.forName(factoryClassName);
+      Method method = clazz.getMethod("get", null);
+      method.setAccessible(true);
+      return method.invoke(null, null);
+    } catch (ClassNotFoundException e) {
+      throw new YarnException(e);
+    } catch (NoSuchMethodException e) {
+      throw new YarnException(e);
+    } catch (InvocationTargetException e) {
+      throw new YarnException(e);
+    } catch (IllegalAccessException e) {
+      throw new YarnException(e);
+    }
+  }
+  
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/YarnRemoteExceptionFactoryProvider.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/YarnRemoteExceptionFactoryProvider.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/YarnRemoteExceptionFactoryProvider.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/YarnRemoteExceptionFactoryProvider.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,54 @@
+package org.apache.hadoop.yarn.factory.providers;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.factories.YarnRemoteExceptionFactory;
+import org.apache.hadoop.yarn.factories.impl.pb.YarnRemoteExceptionFactoryPBImpl;
+
+public class YarnRemoteExceptionFactoryProvider {
+
+  public static String RPC_SERIALIZER_KEY = "org.apache.yarn.ipc.rpc.serializer.property";
+  public static String RPC_SERIALIZER_DEFAULT = "protocolbuffers";
+  
+  public static String EXCEPTION_FACTORY_CLASS_KEY = "org.apache.yarn.ipc.exception.factory.class";
+  
+  private YarnRemoteExceptionFactoryProvider() {
+  }
+  
+  public static YarnRemoteExceptionFactory getYarnRemoteExceptionFactory(Configuration conf) {
+    if (conf == null) {
+      conf = new Configuration();
+    }
+    String recordFactoryClassName = conf.get(EXCEPTION_FACTORY_CLASS_KEY);
+    if (recordFactoryClassName == null) {
+      String serializer = conf.get(RPC_SERIALIZER_KEY, RPC_SERIALIZER_DEFAULT);
+      if (serializer.equals(RPC_SERIALIZER_DEFAULT)) {
+        return YarnRemoteExceptionFactoryPBImpl.get();
+      } else {
+        throw new YarnException("Unknown serializer: [" + conf.get(RPC_SERIALIZER_KEY) + "]. Use keys: [" + EXCEPTION_FACTORY_CLASS_KEY + "] to specify Exception factory");
+      }
+    } else {
+      return (YarnRemoteExceptionFactory) getFactoryClassInstance(recordFactoryClassName);
+    }
+  }
+  
+  private static Object getFactoryClassInstance(String factoryClassName) {
+    try {
+      Class clazz = Class.forName(factoryClassName);
+      Method method = clazz.getMethod("get", null);
+      method.setAccessible(true);
+      return method.invoke(null, null);
+    } catch (ClassNotFoundException e) {
+      throw new YarnException(e);
+    } catch (NoSuchMethodException e) {
+      throw new YarnException(e);
+    } catch (InvocationTargetException e) {
+      throw new YarnException(e);
+    } catch (IllegalAccessException e) {
+      throw new YarnException(e);
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnProtoRPC.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnProtoRPC.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnProtoRPC.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnProtoRPC.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,73 @@
+package org.apache.hadoop.yarn.ipc;
+
+import java.net.InetSocketAddress;
+
+import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.factory.providers.RpcFactoryProvider;
+
+/**
+ * This uses Hadoop RPC. Uses a tunnel ProtoSpecificRpcEngine over 
+ * Hadoop connection.
+ * This does not give cross-language wire compatibility, since the Hadoop 
+ * RPC wire format is non-standard, but it does permit use of Protocol Buffers
+ *  protocol versioning features for inter-Java RPCs.
+ */
+public class HadoopYarnProtoRPC extends YarnRPC {
+
+  private static final Log LOG = LogFactory.getLog(HadoopYarnRPC.class);
+
+  @Override
+  public Object getProxy(Class protocol, InetSocketAddress addr,
+      Configuration conf) {
+    Configuration myConf = new Configuration(conf);
+    LOG.info("Creating a HadoopYarnProtoRpc proxy for protocol " + protocol);
+    LOG.debug("Configured SecurityInfo class name is "
+        + myConf.get(CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME));
+    
+    return RpcFactoryProvider.getClientFactory(myConf).getClient(protocol, 1, addr, myConf);
+  }
+
+  @Override
+  public Server getServer(Class protocol, Object instance,
+      InetSocketAddress addr, Configuration conf,
+      SecretManager<? extends TokenIdentifier> secretManager) {
+    LOG.info("Creating a HadoopYarnProtoRpc server for protocol " + protocol);
+    LOG.info("Configured SecurityInfo class name is "
+        + conf.get(CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME));
+    
+    final RPC.Server hadoopServer;
+    hadoopServer = RpcFactoryProvider.getServerFactory(conf).getServer(protocol, instance, addr, conf, secretManager);
+
+    Server server = new Server() {
+      @Override
+      public void close() {
+        hadoopServer.stop();
+      }
+
+      @Override
+      public int getPort() {
+        return hadoopServer.getListenerAddress().getPort();
+      }
+
+      @Override
+      public void join() throws InterruptedException {
+        hadoopServer.join();
+      }
+
+      @Override
+      public void start() {
+        hadoopServer.start();
+      }
+    };
+    return server;
+
+  }
+
+}
\ No newline at end of file

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,296 @@
+package org.apache.hadoop.yarn.ipc;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcEngine;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.ipc.WritableRpcEngine;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
+import org.apache.hadoop.yarn.ipc.RpcProtos.ProtoSpecificRpcRequest;
+import org.apache.hadoop.yarn.ipc.RpcProtos.ProtoSpecificRpcResponse;
+
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.ServiceException;
+
+
+@InterfaceStability.Evolving
+public class ProtoOverHadoopRpcEngine implements RpcEngine {
+  private static final Log LOG = LogFactory.getLog(RPC.class);
+
+  private static int VERSION = 0;
+  
+  private static final RpcEngine ENGINE = new WritableRpcEngine();
+
+  /** Tunnel a Proto RPC request and response through Hadoop's RPC. */
+  private static interface TunnelProtocol extends VersionedProtocol {
+    /** All Proto methods and responses go through this. */
+    ProtoSpecificResponseWritable call(ProtoSpecificRequestWritable request) throws IOException;
+  }
+
+
+  @Override
+  public Object getProxy(Class<?> protocol, long clientVersion,
+      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+      SocketFactory factory, int rpcTimeout) throws IOException {
+
+
+    return Proxy.newProxyInstance(protocol.getClassLoader(),
+        new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
+            factory, rpcTimeout));
+  }
+
+  @Override
+  public void stopProxy(Object proxy) {
+    try {
+      ((Invoker) Proxy.getInvocationHandler(proxy)).close();
+    } catch (IOException e) {
+      LOG.warn("Error while stopping " + proxy, e);
+    }
+  }
+
+  private class Invoker implements InvocationHandler, Closeable {
+    private TunnelProtocol tunnel;
+    private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>();
+
+    public Invoker(Class<?> protocol, InetSocketAddress addr,
+        UserGroupInformation ticket, Configuration conf, SocketFactory factory,
+        int rpcTimeout) throws IOException {
+      this.tunnel = (TunnelProtocol) ENGINE.getProxy(TunnelProtocol.class,
+          VERSION, addr, ticket, conf, factory, rpcTimeout);
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args)
+        throws Throwable {
+      ProtoSpecificRpcRequest rpcRequest = constructRpcRequest(method, args);
+      ProtoSpecificResponseWritable val = null;
+      try {
+        val = tunnel.call(new ProtoSpecificRequestWritable(rpcRequest));
+      } catch (Exception e) {
+        throw new ServiceException(e);
+      }
+      
+      ProtoSpecificRpcResponse response = val.message;
+
+      if (response.hasIsError() && response.getIsError() == true) {
+        YarnRemoteExceptionPBImpl exception = new YarnRemoteExceptionPBImpl(response.getException());
+        exception.fillInStackTrace();
+        ServiceException se = new ServiceException(exception);
+        throw se;
+      }
+      
+      Message prototype = null;
+      try {
+        prototype = getReturnProtoType(method);
+      } catch (Exception e) {
+        throw new ServiceException(e);
+//        YarnRemoteExceptionPBImpl exception = new YarnRemoteExceptionPBImpl("Could not get prototype PB return type for method: [" + method.getName() + "]", e);
+      }
+      Message actualReturnMessage = prototype.newBuilderForType().mergeFrom(response.getResponseProto()).build();
+      return actualReturnMessage;
+    }
+
+    public void close() throws IOException {
+      ENGINE.stopProxy(tunnel);
+    }
+    
+    private Message getReturnProtoType(Method method) throws Exception {
+      if (returnTypes.containsKey(method.getName())) {
+        return returnTypes.get(method.getName());
+      } else {
+        Class<?> returnType = method.getReturnType();
+
+        Method newInstMethod = returnType.getMethod("getDefaultInstance", null);
+        newInstMethod.setAccessible(true);
+        Message prototype = (Message) newInstMethod.invoke(null, null);
+        returnTypes.put(method.getName(), prototype);
+        return prototype;
+      }
+    }
+  }
+
+  private class TunnelResponder implements TunnelProtocol {
+    BlockingService service;
+    
+    public TunnelResponder(Class<?> iface, Object impl) {
+      this.service = (BlockingService)impl;
+    }
+
+    public long getProtocolVersion(String protocol, long version)
+        throws IOException {
+      return VERSION;
+    }
+
+    public ProtoSpecificResponseWritable call(final ProtoSpecificRequestWritable request)
+        throws IOException {
+      ProtoSpecificRpcRequest rpcRequest = request.message;
+      String methodName = rpcRequest.getMethodName();
+      MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
+      
+      Message prototype = service.getRequestPrototype(methodDescriptor);
+      Message param = prototype.newBuilderForType().mergeFrom(rpcRequest.getRequestProto()).build();
+      
+      Message result;
+      try {
+        result = service.callBlockingMethod(methodDescriptor, null, param);
+      } catch (ServiceException e) {
+        return handleException(e);
+      } catch (Exception e) {
+        return handleException(e);
+      }
+      
+      ProtoSpecificRpcResponse response = constructProtoSpecificRpcSuccessResponse(result);
+      return new ProtoSpecificResponseWritable(response);
+    }
+    
+    private ProtoSpecificResponseWritable handleException (Throwable e) {
+      ProtoSpecificRpcResponse.Builder builder = ProtoSpecificRpcResponse.newBuilder();
+      builder.setIsError(true);
+      if (e.getCause() instanceof YarnRemoteExceptionPBImpl) {
+        builder.setException(((YarnRemoteExceptionPBImpl)e.getCause()).getProto());
+      } else {
+        builder.setException(new YarnRemoteExceptionPBImpl(e).getProto());
+      }
+      ProtoSpecificRpcResponse response = builder.build();
+      return new ProtoSpecificResponseWritable(response);
+    }
+  }
+
+  @Override
+  public Object[] call(Method method, Object[][] params,
+      InetSocketAddress[] addrs, UserGroupInformation ticket, Configuration conf)
+      throws IOException, InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  
+  @Override
+  public RPC.Server getServer(Class<?> protocol, Object instance,
+      String bindAddress, int port, int numHandlers, boolean verbose,
+      Configuration conf, SecretManager<? extends TokenIdentifier> secretManager)
+      throws IOException {
+
+    return ENGINE
+        .getServer(TunnelProtocol.class,
+            new TunnelResponder(protocol, instance), bindAddress, port,
+            numHandlers, verbose, conf, secretManager);
+  }
+
+  
+  private Class<?>[] getRequestParameterTypes(Message[] messages) {
+    Class<?> [] paramTypes = new Class<?>[messages.length];
+    for (int i = 0 ; i < messages.length ; i++) {
+      paramTypes[i] = messages[i].getClass();
+    }
+    return paramTypes;
+  }
+
+  private ProtoSpecificRpcRequest constructRpcRequest(Method method,
+      Object[] params) throws ServiceException {
+    ProtoSpecificRpcRequest rpcRequest;
+    ProtoSpecificRpcRequest.Builder builder;
+
+    builder = ProtoSpecificRpcRequest.newBuilder();
+    builder.setMethodName(method.getName());
+
+    if (params.length != 2) { //RpcController + Message
+      throw new ServiceException("Too many parameters for request. Method: [" + method.getName() + "]" + ", Expected: 2, Actual: " + params.length);
+    }
+    if (params[1] == null) {
+      throw new ServiceException("null param while calling Method: [" + method.getName() +"]");
+    }
+
+    Message param = (Message) params[1];
+    builder.setRequestProto(param.toByteString());
+
+    rpcRequest = builder.build();
+    return rpcRequest;
+  }
+
+  private ProtoSpecificRpcResponse constructProtoSpecificRpcSuccessResponse(Message message) {
+    ProtoSpecificRpcResponse res = ProtoSpecificRpcResponse.newBuilder().setResponseProto(message.toByteString()).build();
+    return res;
+  }
+
+  
+  /**
+   * Writable Wrapper for Protocol Buffer Responses
+   */
+  private static class ProtoSpecificResponseWritable implements Writable {
+    ProtoSpecificRpcResponse message;
+
+    public ProtoSpecificResponseWritable() {
+    }
+    
+    ProtoSpecificResponseWritable(ProtoSpecificRpcResponse message) {
+      this.message = message;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+//      System.err.println("XXX: writing length: " + message.toByteArray().length);
+      out.writeInt(message.toByteArray().length);
+      out.write(message.toByteArray());
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      int length = in.readInt();
+//      System.err.println("YYY: Reading length: " + length);
+      byte[] bytes = new byte[length];
+      in.readFully(bytes);
+      message = ProtoSpecificRpcResponse.parseFrom(bytes);
+    }
+  }
+  
+  /**
+   * Writable Wrapper for Protocol Buffer Requests
+   */
+  private static class ProtoSpecificRequestWritable implements Writable {
+    ProtoSpecificRpcRequest message;
+
+    public ProtoSpecificRequestWritable() {
+    }
+    
+    ProtoSpecificRequestWritable(ProtoSpecificRpcRequest message) {
+      this.message = message;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(message.toByteArray().length);
+      out.write(message.toByteArray());
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      int length = in.readInt();
+      byte[] bytes = new byte[length];
+      in.readFully(bytes);
+      message = ProtoSpecificRpcRequest.parseFrom(bytes);
+    }
+  }
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/RPCUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/RPCUtil.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/RPCUtil.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/RPCUtil.java Thu Mar 31 22:23:22 2011
@@ -18,50 +18,38 @@
 
 package org.apache.hadoop.yarn.ipc;
 
-import org.apache.hadoop.yarn.YarnRemoteException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.YarnRemoteExceptionFactory;
+import org.apache.hadoop.yarn.factory.providers.YarnRemoteExceptionFactoryProvider;
 
 public class RPCUtil {
 
+
+  /**
+   * Relying on the default factory configuration to be set correctly
+   * for the default configuration.
+   */
+  private static Configuration conf = new Configuration();
+  private static YarnRemoteExceptionFactory exceptionFactory = YarnRemoteExceptionFactoryProvider.getYarnRemoteExceptionFactory(conf);
+  
   /**
-   * Returns the YarnRemoteException which is serializable by 
-   * Avro.
+   * Returns the YarnRemoteException which is serializable. 
    */
   public static YarnRemoteException getRemoteException(Throwable t) {
-    YarnRemoteException e = new YarnRemoteException();
-    if (t != null) {
-      e.message = t.getMessage();
-      StringBuilder buf = new StringBuilder();
-      StackTraceElement[] trace = t.getStackTrace();
-      if (trace != null) {
-        for (StackTraceElement element : trace) {
-          buf.append(element.toString() + "\n    at ");
-        }
-        e.trace = buf.toString();
-      }
-      Throwable cause = t.getCause();
-      if (cause != null) {
-        e.cause = getRemoteException(cause);
-      }
-    }
-    return e;
+    return exceptionFactory.createYarnRemoteException(t);
   }
 
   /**
-   * Returns the YarnRemoteException which is serializable by 
-   * Avro.
+   * Returns the YarnRemoteException which is serializable.
    */
   public static YarnRemoteException getRemoteException(String message) {
-    YarnRemoteException e = new YarnRemoteException();
-    if (message != null) {
-      e.message = message;
-    }
-    return e;
+    return exceptionFactory.createYarnRemoteException(message);
   }
 
   public static String toString(YarnRemoteException e) {
-    return (e.message == null ? "" : e.message) + 
-        (e.trace == null ? "" : "\n StackTrace: " + e.trace) +
-        (e.cause == null ? "" : "\n Caused by: " + toString(e.cause));
-        
+    return (e.getMessage() == null ? "" : e.getMessage()) + 
+      (e.getRemoteTrace() == null ? "" : "\n StackTrace: " + e.getRemoteTrace()) + 
+      (e.getCause() == null ? "" : "\n Caused by: " + toString(e.getCause()));
   }
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java Thu Mar 31 22:23:22 2011
@@ -36,7 +36,7 @@ public abstract class YarnRPC {
 
   //use the default as Hadoop RPC
   public static final String DEFAULT_RPC 
-      = "org.apache.hadoop.yarn.ipc.HadoopYarnRPC";
+      = "org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC";
 
   public abstract Object getProxy(Class protocol, InetSocketAddress addr,
       Configuration conf);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java Thu Mar 31 22:23:22 2011
@@ -25,7 +25,7 @@ import java.io.IOException;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 
 // TODO: Make it avro-ish. TokenIdentifier really isn't serialized
 // as writable but simply uses readFields method in SaslRpcServer
@@ -39,8 +39,8 @@ public class ApplicationTokenIdentifier 
   // TODO: Add more information in the tokenID such that it is not
   // transferrable, more secure etc.
 
-  public ApplicationTokenIdentifier(ApplicationID id) {
-    this.appId = new Text(Integer.toString(id.id));
+  public ApplicationTokenIdentifier(ApplicationId id) {
+    this.appId = new Text(Integer.toString(id.getId()));
   }
 
   public ApplicationTokenIdentifier() {

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java Thu Mar 31 22:23:22 2011
@@ -27,9 +27,10 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
 public class ContainerTokenIdentifier extends TokenIdentifier {
 
@@ -38,12 +39,12 @@ public class ContainerTokenIdentifier ex
 
   public static final Text KIND = new Text("ContainerToken");
 
-  private ContainerID containerID;
+  private ContainerId containerId;
   private String nmHostName;
   private Resource resource;
 
-  public ContainerTokenIdentifier(ContainerID containerID, String hostName, Resource r) {
-    this.containerID = containerID;
+  public ContainerTokenIdentifier(ContainerId containerID, String hostName, Resource r) {
+    this.containerId = containerID;
     this.nmHostName = hostName;
     this.resource = r;
   }
@@ -51,8 +52,8 @@ public class ContainerTokenIdentifier ex
   public ContainerTokenIdentifier() {
   }
 
-  public ContainerID getContainerID() {
-    return containerID;
+  public ContainerId getContainerID() {
+    return containerId;
   }
 
   public String getNmHostName() {
@@ -66,21 +67,21 @@ public class ContainerTokenIdentifier ex
   @Override
   public void write(DataOutput out) throws IOException {
     LOG.info("Writing ContainerTokenIdentifier to RPC layer");
-    out.writeInt(this.containerID.appID.id);
-    out.writeInt(this.containerID.id);
+    out.writeInt(this.containerId.getAppId().getId());
+    out.writeInt(this.containerId.getId());
     out.writeUTF(this.nmHostName);
-    out.writeInt(this.resource.memory); // TODO: more resources.
+    out.writeInt(this.resource.getMemory()); // TODO: more resources.
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    this.containerID = new ContainerID();
-    this.containerID.appID = new ApplicationID();
-    this.containerID.appID.id = in.readInt();
-    this.containerID.id = in.readInt();
+    this.containerId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ContainerId.class);
+    this.containerId.setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
+    this.containerId.getAppId().setId(in.readInt());
+    this.containerId.setId(in.readInt());
     this.nmHostName = in.readUTF();
-    this.resource = new Resource();
-    this.resource.memory = in.readInt(); // TODO: more resources.
+    this.resource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
+    this.resource.setMemory(in.readInt()); // TODO: more resources.
   }
 
   @Override
@@ -90,7 +91,7 @@ public class ContainerTokenIdentifier ex
 
   @Override
   public UserGroupInformation getUser() {
-    return UserGroupInformation.createRemoteUser(this.containerID.toString());
+    return UserGroupInformation.createRemoteUser(this.containerId.toString());
   }
 
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Apps.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Apps.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Apps.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Apps.java Thu Mar 31 22:23:22 2011
@@ -21,7 +21,8 @@ package org.apache.hadoop.yarn.util;
 import java.util.Iterator;
 
 import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
 import static org.apache.hadoop.yarn.util.StringHelper.*;
 
@@ -32,25 +33,25 @@ public class Apps {
   public static final String APP = "app";
   public static final String ID = "ID";
 
-  public static String toString(ApplicationID id) {
-    return _join("app", id.clusterTimeStamp, id.id);
+  public static String toString(ApplicationId id) {
+    return _join("app", id.getClusterTimestamp(), id.getId());
   }
 
-  public static ApplicationID toAppID(String aid) {
+  public static ApplicationId toAppID(String aid) {
     Iterator<String> it = _split(aid).iterator();
     return toAppID(APP, aid, it);
   }
 
-  public static ApplicationID toAppID(String prefix, String s, Iterator<String> it) {
+  public static ApplicationId toAppID(String prefix, String s, Iterator<String> it) {
     if (!it.hasNext() || !it.next().equals(prefix)) {
       throwParseException(sjoin(prefix, ID), s);
     }
     shouldHaveNext(prefix, s, it);
-    ApplicationID appID = new ApplicationID();
-    appID.clusterTimeStamp = Long.parseLong(it.next());
+    ApplicationId appId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
+    appId.setClusterTimestamp(Long.parseLong(it.next()));
     shouldHaveNext(prefix, s, it);
-    appID.id = Integer.parseInt(it.next());
-    return appID;
+    appId.setId(Integer.parseInt(it.next()));
+    return appId;
   }
 
   public static void shouldHaveNext(String prefix, String s, Iterator<String> it) {

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java Thu Mar 31 22:23:22 2011
@@ -20,9 +20,10 @@ package org.apache.hadoop.yarn.util;
 
 import java.net.URI;
 
-import org.apache.hadoop.yarn.LocalResource;
-import org.apache.hadoop.yarn.LocalResourceType;
-import org.apache.hadoop.yarn.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
 /**
  * Builder utilities to construct various objects.
@@ -33,12 +34,12 @@ public class BuilderUtils {
   public static LocalResource newLocalResource(URI uri, 
       LocalResourceType type, LocalResourceVisibility visibility, 
       long size, long timestamp) {
-    LocalResource resource = new LocalResource();
-    resource.resource = AvroUtil.getYarnUrlFromURI(uri);
-    resource.type = type;
-    resource.state = visibility;
-    resource.size = size;
-    resource.timestamp = timestamp;
+    LocalResource resource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(LocalResource.class);
+    resource.setResource(ConverterUtils.getYarnUrlFromURI(uri));
+    resource.setType(type);
+    resource.setVisibility(visibility);
+    resource.setSize(size);
+    resource.setTimestamp(timestamp);
     return resource;
   }
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ContainerBuilderHelper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ContainerBuilderHelper.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ContainerBuilderHelper.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ContainerBuilderHelper.java Thu Mar 31 22:23:22 2011
@@ -20,7 +20,8 @@ package org.apache.hadoop.yarn.util;
 
 import java.util.regex.Pattern;
 
-import org.apache.hadoop.yarn.YarnContainerTags;
+import org.apache.hadoop.yarn.api.records.YarnContainerTags;
+
 
 // TODO: Remove this and related stuff?
 public class ContainerBuilderHelper {

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,127 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.util;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.text.NumberFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+/**
+ * This class contains a set of utilities which help converting data structures
+ * from/to 'serializableFormat' to/from hadoop/nativejava data structures.
+ *
+ */
+public class ConverterUtils {
+
+  /**
+   * return a hadoop path from a given url
+   * 
+   * @param url
+   *          url to convert
+   * @return
+   * @throws URISyntaxException
+   */
+  public static Path getPathFromYarnURL(URL url) throws URISyntaxException {
+    String scheme = url.getScheme() == null ? "" : url.getScheme();
+    String authority = url.getHost() != null ? url.getHost() + ":" + url.getPort()
+        : "";
+    return new Path(
+        (new URI(scheme, authority, url.getFile(), null, null))
+            .normalize());
+  }
+  
+  /**
+   * change from CharSequence to string for map key and value
+   * @param env
+   * @return
+   */
+  public static Map<String, String> convertToString(
+      Map<CharSequence, CharSequence> env) {
+    
+    Map<String, String> stringMap = new HashMap<String, String>();
+    for (Entry<CharSequence, CharSequence> entry: env.entrySet()) {
+      stringMap.put(entry.getKey().toString(), entry.getValue().toString());
+    }
+    return stringMap;
+   }
+
+  public static URL getYarnUrlFromPath(Path path) {
+    return getYarnUrlFromURI(path.toUri());
+  }
+  
+  public static URL getYarnUrlFromURI(URI uri) {
+    URL url = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(URL.class);
+    if (uri.getHost() != null) {
+      url.setHost(uri.getHost());
+    }
+    url.setPort(uri.getPort());
+    url.setScheme(uri.getScheme());
+    url.setFile(uri.getPath());
+    return url;
+  }
+
+  // TODO: Why thread local?
+  private static final ThreadLocal<NumberFormat> appIdFormat =
+    new ThreadLocal<NumberFormat>() {
+      @Override
+      public NumberFormat initialValue() {
+        NumberFormat fmt = NumberFormat.getInstance();
+        fmt.setGroupingUsed(false);
+        fmt.setMinimumIntegerDigits(4);
+        return fmt;
+      }
+    };
+
+  // TODO: Why thread local?
+  private static final ThreadLocal<NumberFormat> containerIdFormat =
+      new ThreadLocal<NumberFormat>() {
+        @Override
+        public NumberFormat initialValue() {
+          NumberFormat fmt = NumberFormat.getInstance();
+          fmt.setGroupingUsed(false);
+          fmt.setMinimumIntegerDigits(6);
+          return fmt;
+        }
+      };
+
+  public static String toString(ApplicationId appId) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("application_").append(appId.getClusterTimestamp()).append("_");
+    sb.append(appIdFormat.get().format(appId.getId()));
+    return sb.toString();
+  }
+
+  public static String toString(ContainerId cId) {
+    StringBuilder sb = new StringBuilder();
+    ApplicationId appId = cId.getAppId();
+    sb.append("container_").append(appId.getClusterTimestamp()).append("_");
+    sb.append(appIdFormat.get().format(appId.getId())).append("_");
+    sb.append(containerIdFormat.get().format(cId.getId()));
+    return sb.toString();
+  }
+}
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/SecurityUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/SecurityUtil.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/SecurityUtil.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/SecurityUtil.java Thu Mar 31 22:23:22 2011
@@ -22,24 +22,21 @@ import java.io.IOException;
 
 import javax.crypto.SecretKey;
 
-import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.KerberosName;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
 import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
-import org.apache.hadoop.yarn.ApplicationID;
 
 public class SecurityUtil {
 
@@ -48,7 +45,7 @@ public class SecurityUtil {
   // imitation of operations done by client to upload FileSystem
   // DelegationTokens
   public static void uploadFileSystemDelegationTokens(Configuration conf,
-      ApplicationID appId, Path[] resourcePaths, Credentials credentials)
+      ApplicationId appId, Path[] resourcePaths, Credentials credentials)
       throws IOException {
 
     getFileSystemTokens(conf, resourcePaths, credentials);
@@ -56,7 +53,7 @@ public class SecurityUtil {
     FileSystem defaultFS = FileSystem.get(conf);
     // TODO: fix
     credentials.writeTokenStorageFile(
-        new Path("yarn", Integer.toString(appId.id),
+        new Path("yarn", Integer.toString(appId.getId()),
             YarnConfiguration.FS_TOKENS_FILE_NAME).makeQualified(
             FileSystem.getDefaultUri(conf), defaultFS.getWorkingDirectory()),
         conf);
@@ -86,7 +83,7 @@ public class SecurityUtil {
 
   // TODO: ApplicationMaster needs one token for each NodeManager. This should
   // be created by ResourceManager and sent to ApplicationMaster via RPC.
-  public static void loadContainerManagerTokens(ApplicationID appId,
+  public static void loadContainerManagerTokens(ApplicationId appId,
       Configuration conf, String nmServiceAddress) throws IOException {
 
     Path masterKeyFile =

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/proto/yarnprototunnelrpc.proto
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/proto/yarnprototunnelrpc.proto?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/proto/yarnprototunnelrpc.proto (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/proto/yarnprototunnelrpc.proto Thu Mar 31 22:23:22 2011
@@ -0,0 +1,17 @@
+option java_package = "org.apache.hadoop.yarn.ipc";
+option java_outer_classname = "RpcProtos";
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+
+message ProtoSpecificRpcRequest {
+    required string method_name = 1;
+    optional bytes request_proto = 2;
+}
+
+message ProtoSpecificRpcResponse {
+    optional bytes response_proto = 1;
+
+	optional bool is_error = 2;
+	optional YarnRemoteExceptionProto exception = 3;
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java Thu Mar 31 22:23:22 2011
@@ -24,6 +24,12 @@ import com.google.common.collect.Lists;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.yarn.Application;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.api.records.ApplicationStatus;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
 /**
  * Utilities to generate fake test apps
  */
@@ -67,14 +73,14 @@ public class MockApps {
   }
 
   public static Application newApp(int i) {
-    final ApplicationID id = newAppID(i);
+    final ApplicationId id = newAppID(i);
     final ApplicationStatus status = newAppStatus();
     final ApplicationState state = newAppState();
     final String user = newUserName();
     final String name = newAppName();
     final String queue = newQueue();
     return new Application() {
-      @Override public ApplicationID id() { return id; }
+      @Override public ApplicationId id() { return id; }
       @Override public CharSequence user() { return user; }
       @Override public CharSequence name() { return name; }
       @Override public ApplicationStatus status() { return status; }
@@ -95,17 +101,17 @@ public class MockApps {
     };
   }
 
-  public static ApplicationID newAppID(int i) {
-    ApplicationID id = new ApplicationID();
-    id.clusterTimeStamp = TS;
-    id.id = i;
+  public static ApplicationId newAppID(int i) {
+    ApplicationId id = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
+    id.setClusterTimestamp(TS);
+    id.setId(i);
     return id;
   }
 
   public static ApplicationStatus newAppStatus() {
-    ApplicationStatus status = new ApplicationStatus();
-    status.progress = (float) Math.random();
-    status.lastSeen = System.currentTimeMillis();
+    ApplicationStatus status = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationStatus.class);
+    status.setProgress((float)Math.random());
+    status.setLastSeen(System.currentTimeMillis());
     return status;
   }
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java Thu Mar 31 22:23:22 2011
@@ -24,39 +24,56 @@ import java.util.HashMap;
 
 import junit.framework.Assert;
 
-import org.apache.avro.ipc.AvroRemoteException;
 import org.apache.avro.ipc.Server;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.CleanupContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CleanupContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.factory.providers.YarnRemoteExceptionFactoryProvider;
 import org.apache.hadoop.yarn.ipc.AvroYarnRPC;
+import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
 import org.apache.hadoop.yarn.ipc.HadoopYarnRPC;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerLaunchContext;
-import org.apache.hadoop.yarn.ContainerManager;
-import org.apache.hadoop.yarn.ContainerState;
-import org.apache.hadoop.yarn.ContainerStatus;
-import org.apache.hadoop.yarn.Resource;
-import org.apache.hadoop.yarn.YarnRemoteException;
 import org.junit.Test;
 
 public class TestRPC {
 
   private static final String EXCEPTION_MSG = "test error";
   private static final String EXCEPTION_CAUSE = "exception cause";
+  private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   
-  @Test
-  public void testAvroRPC() throws Exception {
-    test(AvroYarnRPC.class.getName());
-  }
+//  @Test
+//  public void testAvroRPC() throws Exception {
+//    test(AvroYarnRPC.class.getName());
+//  }
+//
+//  @Test
+//  public void testHadoopNativeRPC() throws Exception {
+//    test(HadoopYarnRPC.class.getName());
+//  }
 
   @Test
-  public void testHadoopNativeRPC() throws Exception {
-    test(HadoopYarnRPC.class.getName());
+  public void testHadoopProtoRPC() throws Exception {
+    test(HadoopYarnProtoRPC.class.getName());
   }
-
+  
   private void test(String rpcClass) throws Exception {
     Configuration conf = new Configuration();
     conf.set(YarnRPC.RPC_CLASSNAME, rpcClass);
@@ -69,33 +86,44 @@ public class TestRPC {
     ContainerManager proxy = (ContainerManager) 
         rpc.getProxy(ContainerManager.class, 
             NetUtils.createSocketAddr("localhost:" + server.getPort()), conf);
-    ContainerLaunchContext containerLaunchContext = new ContainerLaunchContext();
-    containerLaunchContext.user = "dummy-user";
-    containerLaunchContext.id = new ContainerID();
-    containerLaunchContext.id.appID = new ApplicationID();
-    containerLaunchContext.id.appID.id = 0;
-    containerLaunchContext.id.id = 100;  
-    containerLaunchContext.env = new HashMap<CharSequence, CharSequence>();
-    containerLaunchContext.resource = new Resource();
-    containerLaunchContext.command = new ArrayList<CharSequence>();
-    proxy.startContainer(containerLaunchContext);
-    ContainerStatus status = proxy.getContainerStatus(containerLaunchContext.id);
+    ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class);
+    containerLaunchContext.setUser("dummy-user");
+    containerLaunchContext.setContainerId(recordFactory.newRecordInstance(ContainerId.class));
+    containerLaunchContext.getContainerId().setAppId(recordFactory.newRecordInstance(ApplicationId.class));
+    containerLaunchContext.getContainerId().getAppId().setId(0);
+    containerLaunchContext.getContainerId().setId(100);
+    containerLaunchContext.setResource(recordFactory.newRecordInstance(Resource.class));
+//    containerLaunchContext.env = new HashMap<CharSequence, CharSequence>();
+//    containerLaunchContext.command = new ArrayList<CharSequence>();
+    
+    StartContainerRequest scRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
+    scRequest.setContainerLaunchContext(containerLaunchContext);
+    proxy.startContainer(scRequest);
+    
+    GetContainerStatusRequest gcsRequest = recordFactory.newRecordInstance(GetContainerStatusRequest.class);
+    gcsRequest.setContainerId(containerLaunchContext.getContainerId());
+    GetContainerStatusResponse response =  proxy.getContainerStatus(gcsRequest);
+    ContainerStatus status = response.getStatus();
     
     //test remote exception
     boolean exception = false;
     try {
-      proxy.stopContainer(containerLaunchContext.id);
+      StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
+      stopRequest.setContainerId(containerLaunchContext.getContainerId());
+      proxy.stopContainer(stopRequest);
     } catch (YarnRemoteException e) {
       exception = true;
-      Assert.assertTrue(EXCEPTION_MSG.equals(e.message.toString()));
-      Assert.assertTrue(EXCEPTION_CAUSE.equals(e.cause.message.toString()));
+      System.err.println(e.getMessage());
+      System.err.println(e.getCause().getMessage());
+      Assert.assertTrue(EXCEPTION_MSG.equals(e.getMessage()));
+      Assert.assertTrue(EXCEPTION_CAUSE.equals(e.getCause().getMessage()));
       System.out.println("Test Exception is " + RPCUtil.toString(e));
     }
     Assert.assertTrue(exception);
     
     server.close();
     Assert.assertNotNull(status);
-    Assert.assertEquals(ContainerState.RUNNING, status.state.RUNNING);
+    Assert.assertEquals(ContainerState.RUNNING, status.getState().RUNNING);
   }
 
   public class DummyContainerManager implements ContainerManager {
@@ -103,34 +131,35 @@ public class TestRPC {
     private ContainerStatus status = null;
 
     @Override
-    public Void cleanupContainer(ContainerID containerId) 
-        throws AvroRemoteException {
-      return null;
+    public CleanupContainerResponse cleanupContainer(CleanupContainerRequest request) throws YarnRemoteException {
+      CleanupContainerResponse response = recordFactory.newRecordInstance(CleanupContainerResponse.class);
+      return response;
     }
-
+    
+    
     @Override
-    public ContainerStatus getContainerStatus(ContainerID containerId)
-        throws AvroRemoteException {
-      return status;
+    public GetContainerStatusResponse getContainerStatus(GetContainerStatusRequest request) throws YarnRemoteException {
+      GetContainerStatusResponse response = recordFactory.newRecordInstance(GetContainerStatusResponse.class);
+      response.setStatus(status);
+      return response;
     }
 
     @Override
-    public Void startContainer(ContainerLaunchContext container)
-        throws AvroRemoteException {
-      status = new ContainerStatus();
-      status.state = ContainerState.RUNNING;
-      status.containerID = container.id;
-      status.exitStatus = 0;
-      return null;
+    public StartContainerResponse startContainer(StartContainerRequest request) throws YarnRemoteException {
+      ContainerLaunchContext container = request.getContainerLaunchContext();
+      StartContainerResponse response = recordFactory.newRecordInstance(StartContainerResponse.class);
+      status = recordFactory.newRecordInstance(ContainerStatus.class);
+      status.setState(ContainerState.RUNNING);
+      status.setContainerId(container.getContainerId());
+      status.setExitStatus(0);
+      return response;
     }
 
     @Override
-    public Void stopContainer(ContainerID containerId)
-        throws AvroRemoteException {
+    public StopContainerResponse stopContainer(StopContainerRequest request) throws YarnRemoteException {
       Exception e = new Exception(EXCEPTION_MSG, 
           new Exception(EXCEPTION_CAUSE));
-      throw RPCUtil.getRemoteException(e);
+      throw YarnRemoteExceptionFactoryProvider.getYarnRemoteExceptionFactory(null).createYarnRemoteException(e);
     }
-    
   }
 }

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPCFactories.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPCFactories.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPCFactories.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPCFactories.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,107 @@
+package org.apache.hadoop.yarn;
+
+import java.net.InetSocketAddress;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
+import org.junit.Test;
+
+public class TestRPCFactories {
+  
+  
+  
+  @Test
+  public void test() {
+    testPbServerFactory();
+    
+    testPbClientFactory();
+  }
+  
+  
+  
+  private void testPbServerFactory() {
+    InetSocketAddress addr = new InetSocketAddress(0);
+    Configuration conf = new Configuration();
+    AMRMProtocol instance = new AMRMProtocolTestImpl();
+    Server server = null;
+    try {
+      server = RpcServerFactoryPBImpl.get().getServer(AMRMProtocol.class, instance, addr, conf, null);
+      server.start();
+    } catch (YarnException e) {
+      e.printStackTrace();
+      Assert.fail("Failed to create server");
+    } finally {
+      if (server != null) {
+        server.stop();
+      }
+    }
+  }
+
+  
+  private void testPbClientFactory() {
+    InetSocketAddress addr = new InetSocketAddress(0);
+    System.err.println(addr.getHostName() + addr.getPort());
+    Configuration conf = new Configuration();
+    AMRMProtocol instance = new AMRMProtocolTestImpl();
+    Server server = null;
+    try {
+      server = RpcServerFactoryPBImpl.get().getServer(AMRMProtocol.class, instance, addr, conf, null);
+      server.start();
+      System.err.println(server.getListenerAddress());
+      System.err.println(NetUtils.getConnectAddress(server));
+
+      AMRMProtocol amrmClient = null;
+      try {
+        amrmClient = (AMRMProtocol) RpcClientFactoryPBImpl.get().getClient(AMRMProtocol.class, 1, NetUtils.getConnectAddress(server), conf);
+      } catch (YarnException e) {
+        e.printStackTrace();
+        Assert.fail("Failed to create client");
+      }
+      
+    } catch (YarnException e) {
+      e.printStackTrace();
+      Assert.fail("Failed to create server");
+    } finally {
+      if (server != null) {
+        server.stop();
+      }
+    }     
+  }
+
+  public class AMRMProtocolTestImpl implements AMRMProtocol {
+
+    @Override
+    public RegisterApplicationMasterResponse registerApplicationMaster(
+        RegisterApplicationMasterRequest request) throws YarnRemoteException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public FinishApplicationMasterResponse finishApplicationMaster(
+        FinishApplicationMasterRequest request) throws YarnRemoteException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public AllocateResponse allocate(AllocateRequest request)
+        throws YarnRemoteException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,36 @@
+package org.apache.hadoop.yarn;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.impl.pb.AMResponsePBImpl;
+import org.junit.Test;
+
+public class TestRecordFactory {
+  
+  @Test
+  public void testPbRecordFactory() {
+    RecordFactory pbRecordFactory = RecordFactoryPBImpl.get();
+    
+    try {
+      AMResponse response = pbRecordFactory.newRecordInstance(AMResponse.class);
+      Assert.assertEquals(AMResponsePBImpl.class, response.getClass());
+    } catch (YarnException e) {
+      e.printStackTrace();
+      Assert.fail("Failed to crete record");
+    }
+    
+    try {
+      AllocateRequest response = pbRecordFactory.newRecordInstance(AllocateRequest.class);
+      Assert.assertEquals(AllocateRequestPBImpl.class, response.getClass());
+    } catch (YarnException e) {
+      e.printStackTrace();
+      Assert.fail("Failed to crete record");
+    }
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRpcFactoryProvider.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRpcFactoryProvider.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRpcFactoryProvider.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/test/java/org/apache/hadoop/yarn/TestRpcFactoryProvider.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,54 @@
+package org.apache.hadoop.yarn;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.factories.RpcClientFactory;
+import org.apache.hadoop.yarn.factories.RpcServerFactory;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
+import org.apache.hadoop.yarn.factory.providers.RpcFactoryProvider;
+import org.junit.Test;
+
+public class TestRpcFactoryProvider {
+
+  @Test
+  public void testFactoryProvider() {
+    Configuration conf = new Configuration();
+    RpcClientFactory clientFactory = null;
+    RpcServerFactory serverFactory = null;
+    
+    
+    clientFactory = RpcFactoryProvider.getClientFactory(conf);
+    serverFactory = RpcFactoryProvider.getServerFactory(conf);
+    Assert.assertEquals(RpcClientFactoryPBImpl.class, clientFactory.getClass());
+    Assert.assertEquals(RpcServerFactoryPBImpl.class, serverFactory.getClass());
+    
+    conf.set(RpcFactoryProvider.RPC_SERIALIZER_KEY, "writable");
+    try {
+      clientFactory = RpcFactoryProvider.getClientFactory(conf);
+      Assert.fail("Expected an exception - unknown serializer");
+    } catch (YarnException e) {
+    }
+    try {
+      serverFactory = RpcFactoryProvider.getServerFactory(conf);
+      Assert.fail("Expected an exception - unknown serializer");
+    } catch (YarnException e) {
+    }
+    
+    conf = new Configuration();
+    conf.set(RpcFactoryProvider.RPC_CLIENT_FACTORY_CLASS_KEY, "NonExistantClass");
+    conf.set(RpcFactoryProvider.RPC_SERVER_FACTORY_CLASS_KEY, RpcServerFactoryPBImpl.class.getName());
+    
+    try {
+      clientFactory = RpcFactoryProvider.getClientFactory(conf);
+      Assert.fail("Expected an exception - unknown class");
+    } catch (YarnException e) {
+    }
+    try {
+      serverFactory = RpcFactoryProvider.getServerFactory(conf);
+    } catch (YarnException e) {
+      Assert.fail("Error while loading factory using reflection: [" + RpcServerFactoryPBImpl.class.getName() + "]");
+    }
+  }
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/pom.xml?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/pom.xml (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/pom.xml Thu Mar 31 22:23:22 2011
@@ -1,4 +1,5 @@
-<?xml version="1.0"?><project>
+<?xml version="1.0"?>
+<project>
   <parent>
     <artifactId>yarn-server</artifactId>
     <groupId>org.apache.hadoop</groupId>
@@ -27,7 +28,7 @@
       <artifactId>zookeeper</artifactId>
       <version>3.3.1</version>
       <scope>compile</scope>
-       <exclusions>
+      <exclusions>
         <exclusion>
           <groupId>com.sun.jdmk</groupId>
           <artifactId>jmxtools</artifactId>
@@ -38,11 +39,16 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>2.4.0a</version>
+    </dependency>
   </dependencies>
 
   <build>
     <plugins>
-      <plugin>
+      <!--plugin>
         <groupId>org.apache.avro</groupId>
         <artifactId>avro-maven-plugin</artifactId>
         <version>1.4.0-SNAPSHOT</version>
@@ -54,7 +60,76 @@
             </goals>
           </execution>
         </executions>
+      </plugin-->
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>create_generate_src_dirctory</id>
+            <phase>initialize</phase>
+            <configuration>
+              <tasks>
+                <mkdir dir="target/generated-sources/proto" />
+              </tasks>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
       </plugin>
+
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>1.2</version>
+        <executions>
+          <execution>
+            <id>generate-sources</id>
+            <phase>generate-sources</phase>
+            <configuration>
+              <executable>protoc</executable>
+              <arguments>
+                <argument>-I../../yarn-api/src/main/proto/</argument>
+                <argument>-Isrc/main/proto/</argument>
+                <argument>--java_out=target/generated-sources/proto</argument>
+                <argument>src/main/proto/yarn_server_common_protos.proto</argument>
+                <argument>src/main/proto/yarn_server_common_service_protos.proto</argument>
+                <argument>src/main/proto/ResourceTracker.proto</argument>
+              </arguments>
+            </configuration>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.5</version>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>target/generated-sources/proto</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+
+
+
     </plugins>
   </build>
 </project>

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceTracker.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,14 @@
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+
+public interface ResourceTracker {
+  
+  public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnRemoteException;
+  public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException;
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceTrackerPBClientImpl.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,70 @@
+package org.apache.hadoop.yarn.server.api.impl.pb.client;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine;
+import org.apache.hadoop.yarn.proto.ResourceTracker.ResourceTrackerService;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl;
+
+import com.google.protobuf.ServiceException;
+
+public class ResourceTrackerPBClientImpl implements ResourceTracker {
+
+private ResourceTrackerService.BlockingInterface proxy;
+  
+  public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
+    RPC.setProtocolEngine(conf, ResourceTrackerService.BlockingInterface.class, ProtoOverHadoopRpcEngine.class);
+    proxy = (ResourceTrackerService.BlockingInterface)RPC.getProxy(
+        ResourceTrackerService.BlockingInterface.class, clientVersion, addr, conf);
+  }
+  
+  @Override
+  public RegisterNodeManagerResponse registerNodeManager(
+      RegisterNodeManagerRequest request) throws YarnRemoteException {
+    RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
+    try {
+      return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager(null, requestProto));
+    } catch (ServiceException e) {
+      if (e.getCause() instanceof YarnRemoteException) {
+        throw (YarnRemoteException)e.getCause();
+      } else if (e.getCause() instanceof UndeclaredThrowableException) {
+        throw (UndeclaredThrowableException)e.getCause();
+      } else {
+        throw new UndeclaredThrowableException(e);
+      }
+    }
+  }
+
+  @Override
+  public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+      throws YarnRemoteException {
+    NodeHeartbeatRequestProto requestProto = ((NodeHeartbeatRequestPBImpl)request).getProto();
+    try {
+      return new NodeHeartbeatResponsePBImpl(proxy.nodeHeartbeat(null, requestProto));
+    } catch (ServiceException e) {
+      if (e.getCause() instanceof YarnRemoteException) {
+        throw (YarnRemoteException)e.getCause();
+      } else if (e.getCause() instanceof UndeclaredThrowableException) {
+        throw (UndeclaredThrowableException)e.getCause();
+      } else {
+        throw new UndeclaredThrowableException(e);
+      }
+    }
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceTrackerPBServiceImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceTrackerPBServiceImpl.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceTrackerPBServiceImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceTrackerPBServiceImpl.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,53 @@
+package org.apache.hadoop.yarn.server.api.impl.pb.service;
+
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.proto.ResourceTracker.ResourceTrackerService.BlockingInterface;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProto;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class ResourceTrackerPBServiceImpl implements BlockingInterface {
+
+  private ResourceTracker real;
+  
+  public ResourceTrackerPBServiceImpl(ResourceTracker impl) {
+    this.real = impl;
+  }
+  
+  @Override
+  public RegisterNodeManagerResponseProto registerNodeManager(
+      RpcController controller, RegisterNodeManagerRequestProto proto)
+      throws ServiceException {
+    RegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);
+    try {
+      RegisterNodeManagerResponse response = real.registerNodeManager(request);
+      return ((RegisterNodeManagerResponsePBImpl)response).getProto();
+    } catch (YarnRemoteException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public NodeHeartbeatResponseProto nodeHeartbeat(RpcController controller,
+      NodeHeartbeatRequestProto proto) throws ServiceException {
+    NodeHeartbeatRequestPBImpl request = new NodeHeartbeatRequestPBImpl(proto);
+    try {
+      NodeHeartbeatResponse response = real.nodeHeartbeat(request);
+      return ((NodeHeartbeatResponsePBImpl)response).getProto();
+    } catch (YarnRemoteException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,10 @@
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+
+
+public interface NodeHeartbeatRequest {
+  public abstract NodeStatus getNodeStatus();
+  
+  public abstract void setNodeStatus(NodeStatus status);
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,9 @@
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+
+public interface NodeHeartbeatResponse {
+  public abstract HeartbeatResponse getHeartbeatResponse();
+  
+  public abstract void setHeartbeatResponse(HeartbeatResponse heartbeatResponse);
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,11 @@
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public interface RegisterNodeManagerRequest {
+  public abstract String getNode();
+  public abstract Resource getResource();
+  
+  public abstract void setNode(String node);
+  public abstract void setResource(Resource resource);
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,10 @@
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
+
+public interface RegisterNodeManagerResponse {
+  public abstract RegistrationResponse getRegistrationResponse();
+  
+  public abstract void setRegistrationResponse(RegistrationResponse registrationResponse);
+
+}