You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cn...@apache.org on 2014/01/09 00:10:06 UTC

svn commit: r1556665 [2/2] - in /hadoop/common/branches/HDFS-4685/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-api/src/main/proto/server/ hadoop-yarn/hadoop-yarn-client/ hadoop-y...

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1556665&r1=1556664&r2=1556665&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java Wed Jan  8 23:10:02 2014
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@@ -98,6 +99,7 @@ public class MiniYARNCluster extends Com
 
   private boolean useFixedPorts;
   private boolean useRpc = false;
+  private int failoverTimeout;
 
   private ConcurrentMap<ApplicationAttemptId, Long> appMasters =
       new ConcurrentHashMap<ApplicationAttemptId, Long>(16, 0.75f, 2);
@@ -189,12 +191,15 @@ public class MiniYARNCluster extends Com
         YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS);
     useRpc = conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC,
         YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC);
+    failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
+        YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
 
     if (useRpc && !useFixedPorts) {
       throw new YarnRuntimeException("Invalid configuration!" +
           " Minicluster can use rpc only when configured to use fixed ports");
     }
 
+    conf.setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true);
     if (resourceManagers.length > 1) {
       conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
       if (conf.get(YarnConfiguration.RM_HA_IDS) == null) {
@@ -218,6 +223,13 @@ public class MiniYARNCluster extends Com
           // Don't try to login using keytab in the testcases.
         }
       };
+      if (!useFixedPorts) {
+        if (HAUtil.isHAEnabled(conf)) {
+          setHARMConfiguration(i, conf);
+        } else {
+          setNonHARMConfiguration(conf);
+        }
+      }
       addService(new ResourceManagerWrapper(i));
     }
     for(int index = 0; index < nodeManagers.length; index++) {
@@ -230,18 +242,103 @@ public class MiniYARNCluster extends Com
         conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
   }
 
+  private void setNonHARMConfiguration(Configuration conf) {
+    String hostname = MiniYARNCluster.getHostname();
+    conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
+    conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0");
+    conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0");
+    conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0");
+    WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0);
+  }
+
+  private void setHARMConfiguration(final int index, Configuration conf) {
+    String hostname = MiniYARNCluster.getHostname();
+    for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
+      conf.set(HAUtil.addSuffix(confKey, rmIds[index]), hostname + ":0");
+    }
+  }
+
+  private synchronized void initResourceManager(int index, Configuration conf) {
+    if (HAUtil.isHAEnabled(conf)) {
+      conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
+    }
+    resourceManagers[index].init(conf);
+    resourceManagers[index].getRMContext().getDispatcher().register(
+        RMAppAttemptEventType.class,
+        new EventHandler<RMAppAttemptEvent>() {
+          public void handle(RMAppAttemptEvent event) {
+            if (event instanceof RMAppAttemptRegistrationEvent) {
+              appMasters.put(event.getApplicationAttemptId(),
+                  event.getTimestamp());
+            } else if (event instanceof RMAppAttemptUnregistrationEvent) {
+              appMasters.remove(event.getApplicationAttemptId());
+            }
+          }
+        });
+  }
+
+  private synchronized void startResourceManager(final int index) {
+    try {
+      Thread rmThread = new Thread() {
+        public void run() {
+          resourceManagers[index].start();
+        }
+      };
+      rmThread.setName("RM-" + index);
+      rmThread.start();
+      int waitCount = 0;
+      while (resourceManagers[index].getServiceState() == STATE.INITED
+          && waitCount++ < 60) {
+        LOG.info("Waiting for RM to start...");
+        Thread.sleep(1500);
+      }
+      if (resourceManagers[index].getServiceState() != STATE.STARTED) {
+        // RM could have failed.
+        throw new IOException(
+            "ResourceManager failed to start. Final state is "
+                + resourceManagers[index].getServiceState());
+      }
+    } catch (Throwable t) {
+      throw new YarnRuntimeException(t);
+    }
+    LOG.info("MiniYARN ResourceManager address: " +
+        getConfig().get(YarnConfiguration.RM_ADDRESS));
+    LOG.info("MiniYARN ResourceManager web address: " +
+        WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  public synchronized void stopResourceManager(int index) {
+    if (resourceManagers[index] != null) {
+      resourceManagers[index].stop();
+      resourceManagers[index] = null;
+    }
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  public synchronized void restartResourceManager(int index)
+      throws InterruptedException {
+    if (resourceManagers[index] != null) {
+      resourceManagers[index].stop();
+      resourceManagers[index] = null;
+    }
+    Configuration conf = getConfig();
+    resourceManagers[index] = new ResourceManager();
+    initResourceManager(index, getConfig());
+    startResourceManager(index);
+  }
+
   public File getTestWorkDir() {
     return testWorkDir;
   }
 
   /**
-   * In a HA cluster, go through all the RMs and find the Active RM. If none
-   * of them are active, wait upto 5 seconds for them to transition to Active.
-   *
-   * In an non-HA cluster, return the index of the only RM.
+   * In a HA cluster, go through all the RMs and find the Active RM. In a
+   * non-HA cluster, return the index of the only RM.
    *
-   * @return index of the active RM or -1 if none of them transition to
-   * active even after 5 seconds of waiting
+   * @return index of the active RM or -1 if none of them turn active
    */
   @InterfaceAudience.Private
   @VisibleForTesting
@@ -250,9 +347,12 @@ public class MiniYARNCluster extends Com
       return 0;
     }
 
-    int numRetriesForRMBecomingActive = 5;
+    int numRetriesForRMBecomingActive = failoverTimeout / 100;
     while (numRetriesForRMBecomingActive-- > 0) {
       for (int i = 0; i < resourceManagers.length; i++) {
+        if (resourceManagers[i] == null) {
+          continue;
+        }
         try {
           if (HAServiceProtocol.HAServiceState.ACTIVE ==
               resourceManagers[i].getRMContext().getRMAdminService()
@@ -265,7 +365,7 @@ public class MiniYARNCluster extends Com
         }
       }
       try {
-        Thread.sleep(1000);
+        Thread.sleep(100);
       } catch (InterruptedException e) {
         throw new YarnRuntimeException("Interrupted while waiting for one " +
             "of the ResourceManagers to become active");
@@ -282,7 +382,7 @@ public class MiniYARNCluster extends Com
     int activeRMIndex = getActiveRMIndex();
     return activeRMIndex == -1
         ? null
-        : this.resourceManagers[getActiveRMIndex()];
+        : this.resourceManagers[activeRMIndex];
   }
 
   public ResourceManager getResourceManager(int i) {
@@ -310,82 +410,21 @@ public class MiniYARNCluster extends Com
       index = i;
     }
 
-    private void setNonHARMConfiguration(Configuration conf) {
-      String hostname = MiniYARNCluster.getHostname();
-      conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
-      conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0");
-      conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0");
-      conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0");
-      WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0);
-    }
-
-    private void setHARMConfiguration(Configuration conf) {
-      String hostname = MiniYARNCluster.getHostname();
-      for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
-        for (String id : HAUtil.getRMHAIds(conf)) {
-          conf.set(HAUtil.addSuffix(confKey, id), hostname + ":0");
-        }
-      }
-    }
-
     @Override
     protected synchronized void serviceInit(Configuration conf)
         throws Exception {
-      conf.setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true);
-
-      if (!useFixedPorts) {
-        if (HAUtil.isHAEnabled(conf)) {
-          setHARMConfiguration(conf);
-        } else {
-          setNonHARMConfiguration(conf);
-        }
-      }
-      if (HAUtil.isHAEnabled(conf)) {
-        conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
-      }
-      resourceManagers[index].init(conf);
-      resourceManagers[index].getRMContext().getDispatcher().register
-          (RMAppAttemptEventType.class,
-          new EventHandler<RMAppAttemptEvent>() {
-            public void handle(RMAppAttemptEvent event) {
-              if (event instanceof RMAppAttemptRegistrationEvent) {
-                appMasters.put(event.getApplicationAttemptId(), event.getTimestamp());
-              } else if (event instanceof RMAppAttemptUnregistrationEvent) {
-                appMasters.remove(event.getApplicationAttemptId());
-              }
-            }
-          });
+      initResourceManager(index, conf);
       super.serviceInit(conf);
     }
 
     @Override
     protected synchronized void serviceStart() throws Exception {
-      try {
-        new Thread() {
-          public void run() {
-            resourceManagers[index].start();
-          }
-        }.start();
-        int waitCount = 0;
-        while (resourceManagers[index].getServiceState() == STATE.INITED
-            && waitCount++ < 60) {
-          LOG.info("Waiting for RM to start...");
-          Thread.sleep(1500);
-        }
-        if (resourceManagers[index].getServiceState() != STATE.STARTED) {
-          // RM could have failed.
-          throw new IOException(
-              "ResourceManager failed to start. Final state is "
-                  + resourceManagers[index].getServiceState());
-        }
-        super.serviceStart();
-      } catch (Throwable t) {
-        throw new YarnRuntimeException(t);
-      }
+      startResourceManager(index);
       LOG.info("MiniYARN ResourceManager address: " +
                getConfig().get(YarnConfiguration.RM_ADDRESS));
       LOG.info("MiniYARN ResourceManager web address: " +
                WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
+      super.serviceStart();
     }
 
     private void waitForAppMastersToFinish(long timeoutMillis) throws InterruptedException {
@@ -406,7 +445,6 @@ public class MiniYARNCluster extends Com
         waitForAppMastersToFinish(5000);
         resourceManagers[index].stop();
       }
-      super.serviceStop();
 
       if (Shell.WINDOWS) {
         // On Windows, clean up the short temporary symlink that was created to
@@ -420,6 +458,7 @@ public class MiniYARNCluster extends Com
             testWorkDir.getAbsolutePath());
         }
       }
+      super.serviceStop();
     }
   }
 

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java?rev=1556665&r1=1556664&r2=1556665&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java Wed Jan  8 23:10:02 2014
@@ -19,21 +19,20 @@
 package org.apache.hadoop.yarn.server.webproxy;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
 
 /**
  * This class abstracts away how ApplicationReports are fetched.
@@ -50,16 +49,12 @@ public class AppReportFetcher {
    */
   public AppReportFetcher(Configuration conf) {
     this.conf = conf;
-    YarnRPC rpc = YarnRPC.create(this.conf);
-    InetSocketAddress rmAddress = conf.getSocketAddr(
-            YarnConfiguration.RM_ADDRESS,
-            YarnConfiguration.DEFAULT_RM_ADDRESS,
-            YarnConfiguration.DEFAULT_RM_PORT);
-    LOG.info("Connecting to ResourceManager at " + rmAddress);
-    applicationsManager =
-        (ApplicationClientProtocol) rpc.getProxy(ApplicationClientProtocol.class,
-            rmAddress, this.conf);
-    LOG.info("Connected to ResourceManager at " + rmAddress);  
+    try {
+      applicationsManager = ClientRMProxy.createRMProxy(conf,
+          ApplicationClientProtocol.class);
+    } catch (IOException e) {
+      throw new YarnRuntimeException(e);
+    }
   }
   
   /**
@@ -91,4 +86,10 @@ public class AppReportFetcher {
         .getApplicationReport(request);
     return response.getApplicationReport();
   }
+
+  public void stop() {
+    if (this.applicationsManager != null) {
+      RPC.stopProxy(this.applicationsManager);
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java?rev=1556665&r1=1556664&r2=1556665&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/WebAppProxy.java Wed Jan  8 23:10:02 2014
@@ -117,6 +117,9 @@ public class WebAppProxy extends Abstrac
         throw new YarnRuntimeException("Error stopping proxy web server",e);
       }
     }
+    if(this.fetcher != null) {
+      this.fetcher.stop();
+    }
     super.serviceStop();
   }