You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ji...@apache.org on 2011/05/26 20:28:50 UTC
svn commit: r1128022 - in /hadoop/mapreduce/branches/MR-279:
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/
src/contrib/raid/src/java/org...
Author: jitendra
Date: Thu May 26 18:28:50 2011
New Revision: 1128022
URL: http://svn.apache.org/viewvc?rev=1128022&view=rev
Log:
Merged r1064923 for MAPREDUCE-2263 from trunk.
Modified:
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
hadoop/mapreduce/branches/MR-279/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/IsolationRunner.java
hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java
hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1128022&r1=1128021&r2=1128022&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Thu May 26 18:28:50 2011
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.mapred.SortedRanges.Range;
import org.apache.hadoop.mapreduce.TypeConverter;
@@ -411,4 +412,10 @@ public class TaskAttemptListenerImpl ext
taskHeartbeatHandler.unregister(attemptID);
}
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSigature(this, protocol, clientVersion,
+ clientMethodsHash);
+ }
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1128022&r1=1128021&r2=1128022&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Thu May 26 18:28:50 2011
@@ -30,6 +30,7 @@ import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
@@ -538,4 +539,11 @@ public class YARNRunner implements Clien
public AccessControlList getQueueAdmins(String arg0) throws IOException {
return new AccessControlList("*");
}
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSigature(this, protocol, clientVersion,
+ clientMethodsHash);
+ }
}
Modified: hadoop/mapreduce/branches/MR-279/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=1128022&r1=1128021&r2=1128022&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java Thu May 26 18:28:50 2011
@@ -194,6 +194,13 @@ public class RaidNode implements RaidPro
}
}
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSigature(
+ this, protocol, clientVersion, clientMethodsHash);
+ }
+
/**
* Wait for service to finish.
* (Normally, it runs forever.)
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=1128022&r1=1128021&r2=1128022&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/IsolationRunner.java Thu May 26 18:28:50 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.mapred.JvmTask;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
@@ -57,6 +58,13 @@ public class IsolationRunner {
return TaskUmbilicalProtocol.versionID;
}
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSigature(
+ this, protocol, clientVersion, clientMethodsHash);
+ }
+
public void done(TaskAttemptID taskid) throws IOException {
LOG.info("Task " + taskid + " reporting done.");
}
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=1128022&r1=1128021&r2=1128022&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java Thu May 26 18:28:50 2011
@@ -70,6 +70,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
@@ -322,6 +323,13 @@ public class JobTracker implements MRCon
}
}
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSigature(
+ this, protocol, clientVersion, clientMethodsHash);
+ }
+
/**
* A thread to timeout tasks that have been assigned to task trackers,
* but that haven't reported back yet.
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1128022&r1=1128021&r2=1128022&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu May 26 18:28:50 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.QueueInfo;
@@ -92,6 +93,13 @@ public class LocalJobRunner implements C
return ClientProtocol.versionID;
}
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSigature(
+ this, protocol, clientVersion, clientMethodsHash);
+ }
+
private class Job extends Thread implements TaskUmbilicalProtocol {
// The job directory on the system: JobClient places job configurations here.
// This is analogous to JobTracker's system directory.
@@ -125,6 +133,13 @@ public class LocalJobRunner implements C
return TaskUmbilicalProtocol.versionID;
}
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSigature(
+ this, protocol, clientVersion, clientMethodsHash);
+ }
+
public Job(JobID jobid, String jobSubmitDir) throws IOException {
this.systemJobDir = new Path(jobSubmitDir);
this.systemJobFile = new Path(systemJobDir, "job.xml");
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1128022&r1=1128021&r2=1128022&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu May 26 18:28:50 2011
@@ -70,6 +70,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
@@ -376,6 +377,13 @@ public class TaskTracker
private BlockingQueue<TaskTrackerAction> tasksToCleanup =
new LinkedBlockingQueue<TaskTrackerAction>();
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSigature(
+ this, protocol, clientVersion, clientMethodsHash);
+ }
+
/**
* A daemon-thread that pulls tips off the list of things to cleanup.
*/
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java?rev=1128022&r1=1128021&r2=1128022&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java Thu May 26 18:28:50 2011
@@ -20,6 +20,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcEngine;
+import org.apache.hadoop.ipc.ProtocolProxy;
+import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.security.UserGroupInformation;
@@ -38,27 +40,28 @@ import com.google.protobuf.ServiceExcept
@InterfaceStability.Evolving
public class ProtoOverHadoopRpcEngine implements RpcEngine {
private static final Log LOG = LogFactory.getLog(RPC.class);
-
- private static int VERSION = 0;
private static final RpcEngine ENGINE = new WritableRpcEngine();
/** Tunnel a Proto RPC request and response through Hadoop's RPC. */
private static interface TunnelProtocol extends VersionedProtocol {
+ /** WritableRpcEngine requires a versionID */
+ public static final long versionID = 1L;
+
/** All Proto methods and responses go through this. */
ProtoSpecificResponseWritable call(ProtoSpecificRequestWritable request) throws IOException;
}
@Override
- public Object getProxy(Class<?> protocol, long clientVersion,
+ @SuppressWarnings("unchecked")
+ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
-
- return Proxy.newProxyInstance(protocol.getClassLoader(),
- new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
- factory, rpcTimeout));
+ return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(protocol
+ .getClassLoader(), new Class[] { protocol }, new Invoker(protocol,
+ addr, ticket, conf, factory, rpcTimeout)), false);
}
@Override
@@ -77,8 +80,9 @@ public class ProtoOverHadoopRpcEngine im
public Invoker(Class<?> protocol, InetSocketAddress addr,
UserGroupInformation ticket, Configuration conf, SocketFactory factory,
int rpcTimeout) throws IOException {
- this.tunnel = (TunnelProtocol) ENGINE.getProxy(TunnelProtocol.class,
- VERSION, addr, ticket, conf, factory, rpcTimeout);
+ this.tunnel = ENGINE.getProxy(TunnelProtocol.class,
+ TunnelProtocol.versionID, addr, ticket, conf, factory, rpcTimeout)
+ .getProxy();
}
@Override
@@ -140,7 +144,14 @@ public class ProtoOverHadoopRpcEngine im
public long getProtocolVersion(String protocol, long version)
throws IOException {
- return VERSION;
+ return TunnelProtocol.versionID;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(
+ String protocol, long version, int clientMethodsHashCode)
+ throws IOException {
+ return new ProtocolSignature(TunnelProtocol.versionID, null);
}
public ProtoSpecificResponseWritable call(final ProtoSpecificRequestWritable request)