You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ji...@apache.org on 2011/05/26 20:28:50 UTC

svn commit: r1128022 - in /hadoop/mapreduce/branches/MR-279: mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ src/contrib/raid/src/java/org...

Author: jitendra
Date: Thu May 26 18:28:50 2011
New Revision: 1128022

URL: http://svn.apache.org/viewvc?rev=1128022&view=rev
Log:
Merged r1064923 for MAPREDUCE-2263 from trunk.

Modified:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
    hadoop/mapreduce/branches/MR-279/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
    hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1128022&r1=1128021&r2=1128022&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Thu May 26 18:28:50 2011
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapreduce.TypeConverter;
@@ -411,4 +412,10 @@ public class TaskAttemptListenerImpl ext
     taskHeartbeatHandler.unregister(attemptID);
   }
 
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignature.getProtocolSigature(this, protocol, clientVersion,
+        clientMethodsHash);
+  }
 }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1128022&r1=1128021&r2=1128022&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Thu May 26 18:28:50 2011
@@ -30,6 +30,7 @@ import java.util.Vector;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
@@ -538,4 +539,11 @@ public class YARNRunner implements Clien
   public AccessControlList getQueueAdmins(String arg0) throws IOException {
     return new AccessControlList("*");
   }
+  
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignature.getProtocolSigature(this, protocol, clientVersion,
+        clientMethodsHash);
+  }
 }

Modified: hadoop/mapreduce/branches/MR-279/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=1128022&r1=1128021&r2=1128022&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java Thu May 26 18:28:50 2011
@@ -194,6 +194,13 @@ public class RaidNode implements RaidPro
     }
   }
 
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignature.getProtocolSigature(
+        this, protocol, clientVersion, clientMethodsHash);
+  }
+
   /**
    * Wait for service to finish.
    * (Normally, it runs forever.)

Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=1128022&r1=1128021&r2=1128022&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/IsolationRunner.java Thu May 26 18:28:50 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.mapred.JvmTask;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
@@ -57,6 +58,13 @@ public class IsolationRunner {
       return TaskUmbilicalProtocol.versionID;
     }
     
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol,
+        long clientVersion, int clientMethodsHash) throws IOException {
+      return ProtocolSignature.getProtocolSigature(
+          this, protocol, clientVersion, clientMethodsHash);
+    }
+
     public void done(TaskAttemptID taskid) throws IOException {
       LOG.info("Task " + taskid + " reporting done.");
     }

Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=1128022&r1=1128021&r2=1128022&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java Thu May 26 18:28:50 2011
@@ -70,6 +70,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
@@ -322,6 +323,13 @@ public class JobTracker implements MRCon
     }
   }
   
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignature.getProtocolSigature(
+        this, protocol, clientVersion, clientMethodsHash);
+  }
+
   /**
    * A thread to timeout tasks that have been assigned to task trackers,
    * but that haven't reported back yet.

Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1128022&r1=1128021&r2=1128022&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu May 26 18:28:50 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.QueueInfo;
@@ -92,6 +93,13 @@ public class LocalJobRunner implements C
     return ClientProtocol.versionID;
   }
 
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignature.getProtocolSigature(
+        this, protocol, clientVersion, clientMethodsHash);
+  }
+
   private class Job extends Thread implements TaskUmbilicalProtocol {
     // The job directory on the system: JobClient places job configurations here.
     // This is analogous to JobTracker's system directory.
@@ -125,6 +133,13 @@ public class LocalJobRunner implements C
       return TaskUmbilicalProtocol.versionID;
     }
     
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol,
+        long clientVersion, int clientMethodsHash) throws IOException {
+      return ProtocolSignature.getProtocolSigature(
+          this, protocol, clientVersion, clientMethodsHash);
+    }
+
     public Job(JobID jobid, String jobSubmitDir) throws IOException {
       this.systemJobDir = new Path(jobSubmitDir);
       this.systemJobFile = new Path(systemJobDir, "job.xml");

Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1128022&r1=1128021&r2=1128022&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu May 26 18:28:50 2011
@@ -70,6 +70,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
@@ -376,6 +377,13 @@ public class TaskTracker 
   private BlockingQueue<TaskTrackerAction> tasksToCleanup = 
     new LinkedBlockingQueue<TaskTrackerAction>();
     
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignature.getProtocolSigature(
+        this, protocol, clientVersion, clientMethodsHash);
+  }
+
   /**
    * A daemon-thread that pulls tips off the list of things to cleanup.
    */

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java?rev=1128022&r1=1128021&r2=1128022&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java Thu May 26 18:28:50 2011
@@ -20,6 +20,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RpcEngine;
+import org.apache.hadoop.ipc.ProtocolProxy;
+import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.ipc.WritableRpcEngine;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -38,27 +40,28 @@ import com.google.protobuf.ServiceExcept
 @InterfaceStability.Evolving
 public class ProtoOverHadoopRpcEngine implements RpcEngine {
   private static final Log LOG = LogFactory.getLog(RPC.class);
-
-  private static int VERSION = 0;
   
   private static final RpcEngine ENGINE = new WritableRpcEngine();
 
   /** Tunnel a Proto RPC request and response through Hadoop's RPC. */
   private static interface TunnelProtocol extends VersionedProtocol {
+    /** WritableRpcEngine requires a versionID */
+    public static final long versionID = 1L;
+
     /** All Proto methods and responses go through this. */
     ProtoSpecificResponseWritable call(ProtoSpecificRequestWritable request) throws IOException;
   }
 
 
   @Override
-  public Object getProxy(Class<?> protocol, long clientVersion,
+  @SuppressWarnings("unchecked")
+  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
       InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
       SocketFactory factory, int rpcTimeout) throws IOException {
 
-
-    return Proxy.newProxyInstance(protocol.getClassLoader(),
-        new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
-            factory, rpcTimeout));
+    return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(protocol
+        .getClassLoader(), new Class[] { protocol }, new Invoker(protocol,
+        addr, ticket, conf, factory, rpcTimeout)), false);
   }
 
   @Override
@@ -77,8 +80,9 @@ public class ProtoOverHadoopRpcEngine im
     public Invoker(Class<?> protocol, InetSocketAddress addr,
         UserGroupInformation ticket, Configuration conf, SocketFactory factory,
         int rpcTimeout) throws IOException {
-      this.tunnel = (TunnelProtocol) ENGINE.getProxy(TunnelProtocol.class,
-          VERSION, addr, ticket, conf, factory, rpcTimeout);
+      this.tunnel = ENGINE.getProxy(TunnelProtocol.class,
+          TunnelProtocol.versionID, addr, ticket, conf, factory, rpcTimeout)
+          .getProxy();
     }
 
     @Override
@@ -140,7 +144,14 @@ public class ProtoOverHadoopRpcEngine im
 
     public long getProtocolVersion(String protocol, long version)
         throws IOException {
-      return VERSION;
+      return TunnelProtocol.versionID;
+    }
+    
+    @Override
+    public ProtocolSignature getProtocolSignature(
+        String protocol, long version, int clientMethodsHashCode)
+      throws IOException {
+      return new ProtocolSignature(TunnelProtocol.versionID, null);
     }
 
     public ProtoSpecificResponseWritable call(final ProtoSpecificRequestWritable request)