You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/07/12 19:10:59 UTC

svn commit: r1145679 - in /hadoop/common/trunk/mapreduce: ./ src/java/META-INF/ src/java/META-INF/services/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/protocol/

Author: acmurthy
Date: Tue Jul 12 17:10:59 2011
New Revision: 1145679

URL: http://svn.apache.org/viewvc?rev=1145679&view=rev
Log:
MAPREDUCE-2400. Remove Cluster's dependency on JobTracker via a ServiceProvider for the actual implementation. Contributed by Tom White.

Added:
    hadoop/common/trunk/mapreduce/src/java/META-INF/
    hadoop/common/trunk/mapreduce/src/java/META-INF/services/
    hadoop/common/trunk/mapreduce/src/java/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobTrackerClientProtocolProvider.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocolProvider.java
Modified:
    hadoop/common/trunk/mapreduce/CHANGES.txt
    hadoop/common/trunk/mapreduce/build.xml
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/Cluster.java
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java

Modified: hadoop/common/trunk/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/CHANGES.txt?rev=1145679&r1=1145678&r2=1145679&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/CHANGES.txt (original)
+++ hadoop/common/trunk/mapreduce/CHANGES.txt Tue Jul 12 17:10:59 2011
@@ -37,6 +37,9 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    MAPREDUCE-2400. Remove Cluster's dependency on JobTracker via a 
+    ServiceProvider for the actual implementation. (tomwhite via acmurthy) 
+ 
     MAPREDUCE-2596. [Gridmix] Summarize Gridmix runs. (amarrk)
 
     MAPREDUCE-2563. [Gridmix] Add High-Ram emulation system tests to 

Modified: hadoop/common/trunk/mapreduce/build.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/build.xml?rev=1145679&r1=1145678&r2=1145679&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/build.xml (original)
+++ hadoop/common/trunk/mapreduce/build.xml Tue Jul 12 17:10:59 2011
@@ -416,6 +416,7 @@
     
     <copy todir="${build.classes}">
       <fileset dir="${mapred.src.dir}" includes="**/*.properties"/>
+      <fileset dir="${mapred.src.dir}" includes="**/META-INF/services/*"/>
       <fileset dir="${mapred.src.dir}" includes="mapred-default.xml"/>
       <fileset dir="${mapred.src.dir}" includes="mapred-queues-default.xml"/>
     </copy>

Added: hadoop/common/trunk/mapreduce/src/java/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider?rev=1145679&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider (added)
+++ hadoop/common/trunk/mapreduce/src/java/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider Tue Jul 12 17:10:59 2011
@@ -0,0 +1,15 @@
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+org.apache.hadoop.mapred.JobTrackerClientProtocolProvider
+org.apache.hadoop.mapred.LocalClientProtocolProvider

Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobTrackerClientProtocolProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobTrackerClientProtocolProvider.java?rev=1145679&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobTrackerClientProtocolProvider.java (added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/JobTrackerClientProtocolProvider.java Tue Jul 12 17:10:59 2011
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+@InterfaceAudience.Private
+public class JobTrackerClientProtocolProvider extends ClientProtocolProvider {
+
+  @Override
+  public ClientProtocol create(Configuration conf) throws IOException {
+    String framework = conf.get(MRConfig.FRAMEWORK_NAME);
+    if (framework != null && !framework.equals("classic")) {
+      return null;
+    }
+    String tracker = conf.get(JTConfig.JT_IPC_ADDRESS, "local");
+    if (!"local".equals(tracker)) {
+      return createRPCProxy(JobTracker.getAddress(conf), conf);
+    }
+    return null;
+  }
+
+  @Override
+  public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException {
+    return createRPCProxy(addr, conf);
+  }
+  
+  private ClientProtocol createRPCProxy(InetSocketAddress addr,
+      Configuration conf) throws IOException {
+    return (ClientProtocol) RPC.getProxy(ClientProtocol.class,
+      ClientProtocol.versionID, addr, UserGroupInformation.getCurrentUser(),
+      conf, NetUtils.getSocketFactory(conf, ClientProtocol.class));
+  }
+
+  @Override
+  public void close(ClientProtocol clientProtocol) throws IOException {
+    RPC.stopProxy(clientProtocol);
+  }
+
+}

Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java?rev=1145679&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java (added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java Tue Jul 12 17:10:59 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+
+@InterfaceAudience.Private
+public class LocalClientProtocolProvider extends ClientProtocolProvider {
+
+  @Override
+  public ClientProtocol create(Configuration conf) throws IOException {
+    String framework = conf.get(MRConfig.FRAMEWORK_NAME);
+    if (framework != null && !framework.equals("local")) {
+      return null;
+    }
+    if ("local".equals(conf.get(JTConfig.JT_IPC_ADDRESS, "local"))) {
+      conf.setInt("mapreduce.job.maps", 1);
+      return new LocalJobRunner(conf);
+    }
+    return null;
+  }
+
+  @Override
+  public ClientProtocol create(InetSocketAddress addr, Configuration conf) {
+    return null; // LocalJobRunner doesn't use a socket
+  }
+
+  @Override
+  public void close(ClientProtocol clientProtocol) {
+    // no clean up required
+  }
+
+}

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/Cluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/Cluster.java?rev=1145679&r1=1145678&r2=1145679&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/Cluster.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/Cluster.java Tue Jul 12 17:10:59 2011
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.ServiceLoader;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -30,14 +31,13 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobTracker;
-import org.apache.hadoop.mapred.LocalJobRunner;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.State;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.net.NetUtils;
@@ -56,6 +56,7 @@ public class Cluster {
   @InterfaceStability.Evolving
   public static enum JobTrackerStatus {INITIALIZING, RUNNING};
   
+  private ClientProtocolProvider clientProtocolProvider;
   private ClientProtocol client;
   private UserGroupInformation ugi;
   private Configuration conf;
@@ -71,35 +72,30 @@ public class Cluster {
   public Cluster(Configuration conf) throws IOException {
     this.conf = conf;
     this.ugi = UserGroupInformation.getCurrentUser();
-    client = createClient(conf);
+    for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) {
+      ClientProtocol clientProtocol = provider.create(conf);
+      if (clientProtocol != null) {
+        clientProtocolProvider = provider;
+        client = clientProtocol;
+        break;
+      }
+    }
   }
 
   public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
       throws IOException {
     this.conf = conf;
     this.ugi = UserGroupInformation.getCurrentUser();
-    client = createRPCProxy(jobTrackAddr, conf);
-  }
-
-  private ClientProtocol createRPCProxy(InetSocketAddress addr,
-      Configuration conf) throws IOException {
-    return (ClientProtocol) RPC.getProxy(ClientProtocol.class,
-      ClientProtocol.versionID, addr, ugi, conf,
-      NetUtils.getSocketFactory(conf, ClientProtocol.class));
-  }
-
-  private ClientProtocol createClient(Configuration conf) throws IOException {
-    ClientProtocol client;
-    String tracker = conf.get("mapreduce.jobtracker.address", "local");
-    if ("local".equals(tracker)) {
-      conf.setInt("mapreduce.job.maps", 1);
-      client = new LocalJobRunner(conf);
-    } else {
-      client = createRPCProxy(JobTracker.getAddress(conf), conf);
+    for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) {
+      ClientProtocol clientProtocol = provider.create(jobTrackAddr, conf);
+      if (clientProtocol != null) {
+        clientProtocolProvider = provider;
+        client = clientProtocol;
+        break;
+      }
     }
-    return client;
   }
-  
+
   ClientProtocol getClient() {
     return client;
   }
@@ -112,9 +108,7 @@ public class Cluster {
    * Close the <code>Cluster</code>.
    */
   public synchronized void close() throws IOException {
-    if (!(client instanceof LocalJobRunner)) {
-      RPC.stopProxy(client);
-    }
+    clientProtocolProvider.close(client);
   }
 
   private Job[] getJobs(JobStatus[] stats) throws IOException {
@@ -353,7 +347,8 @@ public class Cluster {
       getDelegationToken(Text renewer) throws IOException, InterruptedException{
     Token<DelegationTokenIdentifier> result =
       client.getDelegationToken(renewer);
-    InetSocketAddress addr = JobTracker.getAddress(conf);
+    InetSocketAddress addr = NetUtils.createSocketAddr(
+        conf.get(JTConfig.JT_IPC_ADDRESS, "localhost:8012"));
     StringBuilder service = new StringBuilder();
     service.append(NetUtils.normalizeHostName(addr.getAddress().
                                               getHostAddress()));

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1145679&r1=1145678&r2=1145679&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/MRConfig.java Tue Jul 12 17:10:59 2011
@@ -57,4 +57,6 @@ public interface MRConfig {
     "mapreduce.cluster.delegation.token.max-lifetime";
   public static final long    DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 
     7*24*60*60*1000; // 7 days
+  
+  public static final String FRAMEWORK_NAME  = "mapreduce.framework.name";
 }

Added: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocolProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocolProvider.java?rev=1145679&view=auto
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocolProvider.java (added)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocolProvider.java Tue Jul 12 17:10:59 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.protocol;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+@InterfaceAudience.Private
+public abstract class ClientProtocolProvider {
+  
+  public abstract ClientProtocol create(Configuration conf) throws IOException;
+  
+  public abstract ClientProtocol create(InetSocketAddress addr,
+      Configuration conf) throws IOException;
+
+  public abstract void close(ClientProtocol clientProtocol) throws IOException;
+
+}