You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by sm...@apache.org on 2014/07/11 06:17:52 UTC

[12/50] [abbrv] git commit: YARN-913 some code reviews and while copying some of this code over to hadoop-trunk branch

YARN-913 some code reviews and while copying some of this code over to hadoop-trunk branch


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/0b339b7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/0b339b7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/0b339b7f

Branch: refs/heads/master
Commit: 0b339b7fc76ae95bc2417cb4626d07cb5ce026f2
Parents: 0f9344b
Author: Steve Loughran <st...@apache.org>
Authored: Mon Jul 7 13:57:17 2014 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Jul 7 13:57:17 2014 +0100

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 .../apache/slider/common/SliderXmlConfKeys.java |   2 +
 .../slider/core/main/ServiceLauncher.java       | 109 +++++++++++++++----
 .../server/appmaster/SliderAppMaster.java       |  46 ++++++--
 .../slider/server/appmaster/state/AppState.java |   2 +-
 .../server/appmaster/web/SliderAMWebApp.java    |   2 +-
 .../server/appmaster/web/SliderAmIpFilter.java  |   1 +
 .../server/services/curator/CuratorService.java |   2 +-
 .../resources/webapps/static/yarn.dt.plugins.js |   4 +-
 9 files changed, 128 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0b339b7f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0076561..c6980be 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,7 +132,7 @@
     <commons-compress.version>1.4.1</commons-compress.version>
     <commons-logging.version>1.1.3</commons-logging.version>
     <commons-io.version>2.4</commons-io.version>
-    <curator.version>2.4.1</curator.version>
+    <curator.version>2.5.0</curator.version>
     <easymock.version>3.1</easymock.version>
     <guava.version>11.0.2</guava.version>
     <gson.version>2.2.2</gson.version>

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0b339b7f/slider-core/src/main/java/org/apache/slider/common/SliderXmlConfKeys.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/common/SliderXmlConfKeys.java b/slider-core/src/main/java/org/apache/slider/common/SliderXmlConfKeys.java
index c7b8ea5..3f16f25 100644
--- a/slider-core/src/main/java/org/apache/slider/common/SliderXmlConfKeys.java
+++ b/slider-core/src/main/java/org/apache/slider/common/SliderXmlConfKeys.java
@@ -153,4 +153,6 @@ public interface SliderXmlConfKeys {
 
   String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH =
       "ipc.client.fallback-to-simple-auth-allowed";
+  String HADOOP_HTTP_FILTER_INITIALIZERS =
+      "hadoop.http.filter.initializers";
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0b339b7f/slider-core/src/main/java/org/apache/slider/core/main/ServiceLauncher.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/main/ServiceLauncher.java b/slider-core/src/main/java/org/apache/slider/core/main/ServiceLauncher.java
index e22ff54..c92dfda 100644
--- a/slider-core/src/main/java/org/apache/slider/core/main/ServiceLauncher.java
+++ b/slider-core/src/main/java/org/apache/slider/core/main/ServiceLauncher.java
@@ -65,7 +65,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 @SuppressWarnings("UseOfSystemOutOrSystemErr")
 public class ServiceLauncher<S extends Service>
-  implements LauncherExitCodes, IrqHandler.Interrupted {
+  implements LauncherExitCodes, IrqHandler.Interrupted,
+    Thread.UncaughtExceptionHandler {
   private static final Logger LOG = LoggerFactory.getLogger(
       ServiceLauncher.class);
 
@@ -144,8 +145,8 @@ public class ServiceLauncher<S extends Service>
    * @param conf configuration
    * @param processedArgs arguments after the configuration parameters
    * have been stripped out.
-   * @param addShutdownHook should a shutdown hook be added to terminate
-   * this service on shutdown. Tests should set this to false.
+   * @param addProcessHooks should process failure handlers be added to
+   * terminate this service on shutdown. Tests should set this to false.
    * @throws ClassNotFoundException classname not on the classpath
    * @throws IllegalAccessException not allowed at the class
    * @throws InstantiationException not allowed to instantiate it
@@ -154,13 +155,13 @@ public class ServiceLauncher<S extends Service>
    */
   public int launchService(Configuration conf,
       String[] processedArgs,
-      boolean addShutdownHook)
+      boolean addProcessHooks)
     throws Throwable {
 
     instantiateService(conf);
 
-    // and the shutdown hook if requested
-    if (addShutdownHook) {
+    // add any process shutdown hooks
+    if (addProcessHooks) {
       ServiceShutdownHook shutdownHook = new ServiceShutdownHook(service);
       ShutdownHookManager.get().addShutdownHook(shutdownHook, PRIORITY);
     }
@@ -279,6 +280,53 @@ public class ServiceLauncher<S extends Service>
   }
 
   /**
+   * Uncaught exception handler.
+   * If an error is raised: shutdown
+   * The state of the system is unknown at this point -attempting
+   * a clean shutdown is dangerous. Instead: exit
+   * @param thread thread that failed
+   * @param exception exception
+   */
+  @Override
+  public void uncaughtException(Thread thread, Throwable exception) {
+    if (ShutdownHookManager.get().isShutdownInProgress()) {
+      LOG.error("Thread {} threw an error during shutdown: {}.",
+          thread.toString(),
+          exception,
+          exception);
+    } else if (exception instanceof Error) {
+      try {
+        LOG.error("Thread {} threw an error: {}. Shutting down",
+            thread.toString(),
+            exception,
+            exception);
+      } catch (Throwable err) {
+        // We don't want to not exit because of an issue with logging
+      }
+      if (exception instanceof OutOfMemoryError) {
+        // After catching an OOM java says it is undefined behavior, so don't
+        // even try to clean up or we can get stuck on shutdown.
+        try {
+          System.err.println("Halting due to Out Of Memory Error...");
+        } catch (Throwable err) {
+          // Again we don't want to exit because of logging issues.
+        }
+        ExitUtil.halt(EXIT_EXCEPTION_THROWN);
+      } else {
+        // error other than OutOfMemory
+        exit(convertToExitException(exception));
+      }
+    } else {
+      // simple exception in a thread. There's a policy decision here:
+      // terminate the service vs. keep going after a thread has failed
+      LOG.error("Thread {} threw an exception: {}",
+          thread.toString(),
+          exception,
+          exception);
+    }
+  }
+
+  /**
    * Print a warning: currently this goes to stderr
    * @param text
    */
@@ -286,7 +334,6 @@ public class ServiceLauncher<S extends Service>
     System.err.println(text);
   }
 
-
   /**
    * Report an error. The message is printed to stderr; the exception
    * is logged via the current logger.
@@ -448,25 +495,39 @@ public class ServiceLauncher<S extends Service>
     } catch (ExitUtil.ExitException ee) {
       exitException = ee;
     } catch (Throwable thrown) {
-      int exitCode;
-      String message = thrown.getMessage();
-      if (message == null) {
-        message = thrown.toString();
-      }
-      if (thrown instanceof ExitCodeProvider) {
-        exitCode = ((ExitCodeProvider) thrown).getExitCode();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("While running {}: {}", getServiceName(), message, thrown);
-        }
-        LOG.error(message);
-      } else {
-        // not any of the service launcher exceptions -assume something worse
-        error(message, thrown);
-        exitCode = EXIT_EXCEPTION_THROWN;
+      exitException = convertToExitException(thrown);
+    }
+    return exitException;
+  }
+
+  /**
+   * Convert the exception to one that can be handed off to ExitUtils;
+   * if it is of the write type it is passed throw as is. If not, a 
+   * new exception with the exit code {@link #EXIT_EXCEPTION_THROWN}
+   * is created, with the argument <code>thrown</code> as the inner cause
+   * @param thrown the exception thrown
+   * @return an exception to terminate the process with
+   */
+  protected ExitUtil.ExitException convertToExitException(Throwable thrown) {
+    ExitUtil.ExitException exitException;
+    int exitCode;
+    String message = thrown.getMessage();
+    if (message == null) {
+      message = thrown.toString();
+    }
+    if (thrown instanceof ExitCodeProvider) {
+      exitCode = ((ExitCodeProvider) thrown).getExitCode();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("While running {}: {}", getServiceName(), message, thrown);
       }
-      exitException = new ExitUtil.ExitException(exitCode, message);
-      exitException.initCause(thrown);
+      LOG.error(message);
+    } else {
+      // not any of the service launcher exceptions -assume something worse
+      error(message, thrown);
+      exitCode = EXIT_EXCEPTION_THROWN;
     }
+    exitException = new ExitUtil.ExitException(exitCode, message);
+    exitException.initCause(thrown);
     return exitException;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0b339b7f/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index 0142028..739d22c 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -179,14 +179,18 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
   private YarnRPC yarnRPC;
 
   /** Handle to communicate with the Resource Manager*/
+  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
   private AMRMClientAsync asyncRMClient;
 
+  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+
   private RMOperationHandler rmOperationHandler;
 
   /** Handle to communicate with the Node Manager*/
+  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
   public NMClientAsync nmClientAsync;
 
-  YarnConfiguration conf;
+//  YarnConfiguration conf;
   /**
    * token blob
    */
@@ -197,13 +201,15 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
   /**
    * Secret manager
    */
-  ClientToAMTokenSecretManager secretManager;
+  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+  private ClientToAMTokenSecretManager secretManager;
   
   /** Hostname of the container*/
   private String appMasterHostname = "";
   /* Port on which the app master listens for status updates from clients*/
   private int appMasterRpcPort = 0;
   /** Tracking url to which app master publishes info for clients to monitor*/
+  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
   private String appMasterTrackingUrl = "";
 
   /** Application Attempt Id ( combination of attemptId and fail count )*/
@@ -235,6 +241,10 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
   private final ReentrantLock AMExecutionStateLock = new ReentrantLock();
   private final Condition isAMCompleted = AMExecutionStateLock.newCondition();
 
+  /**
+   * Exit code for the AM to return
+   */
+  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
   private int amExitCode =  0;
   
   /**
@@ -247,22 +257,30 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
   /**
    * Flag to set if the process exit code was set before shutdown started
    */
+  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
   private boolean spawnedProcessExitedBeforeShutdownTriggered;
 
 
   /** Arguments passed in : raw*/
+  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
   private SliderAMArgs serviceArgs;
 
   /**
    * ID of the AM container
    */
+  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
   private ContainerId appMasterContainerID;
 
   /**
    * ProviderService of this cluster
    */
+  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
   private ProviderService providerService;
 
+  /**
+   * The registry service
+   */
+  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
   private SliderRegistryService registry;
   
   /**
@@ -277,13 +295,16 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
   private int containerMaxMemory;
   private String amCompletionReason;
 
+  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
   private RoleLaunchService launchService;
   
   //username -null if it is not known/not to be set
+  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
   private String hadoop_user_name;
   private String service_user_name;
   
   private SliderAMWebApp webApp;
+  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
   private InetSocketAddress rpcServiceAddress;
   private ProviderService sliderAMProvider;
 
@@ -327,7 +348,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
                SliderUtils.getKerberosRealm());
       UserGroupInformation.setConfiguration(conf);
       UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-      log.debug("Authenticating as " + ugi.toString());
+      log.debug("Authenticating as {}", ugi);
       SliderUtils.verifyPrincipalSet(conf,
           DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY);
       // always enforce protocol to be token-based.
@@ -356,11 +377,11 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
   @Override // RunService
   public Configuration bindArgs(Configuration config, String... args) throws
                                                                       Exception {
-    config = super.bindArgs(config, args);
+    YarnConfiguration yarnConfiguration = new YarnConfiguration(
+        super.bindArgs(config, args));
     serviceArgs = new SliderAMArgs(args);
     serviceArgs.parse();
     //yarn-ify
-    YarnConfiguration yarnConfiguration = new YarnConfiguration(config);
     return SliderUtils.patchConfiguration(yarnConfiguration);
   }
 
@@ -450,11 +471,10 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
 
     Configuration serviceConf = getConfig();
     // Try to get the proper filtering of static resources through the yarn proxy working
-    serviceConf.set("hadoop.http.filter.initializers",
+    serviceConf.set(HADOOP_HTTP_FILTER_INITIALIZERS,
                     SliderAmFilterInitializer.NAME);
     serviceConf.set(SliderAmIpFilter.WS_CONTEXT_ROOT, WS_CONTEXT_ROOT);
     
-    conf = new YarnConfiguration(serviceConf);
     //get our provider
     MapOperations globalInternalOptions =
       instanceDefinition.getInternalOperations().getGlobalOptions();
@@ -471,9 +491,9 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
     sliderAMProvider = new SliderAMProviderService();
     initAndAddService(sliderAMProvider);
     
-    InetSocketAddress address = SliderUtils.getRmSchedulerAddress(conf);
+    InetSocketAddress address = SliderUtils.getRmSchedulerAddress(serviceConf);
     log.info("RM is at {}", address);
-    yarnRPC = YarnRPC.create(conf);
+    yarnRPC = YarnRPC.create(serviceConf);
 
     /*
      * Extract the container ID. This is then
@@ -580,7 +600,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
       WebAppService<SliderAMWebApp> webAppService =
         new WebAppService<>("slider", webApp);
 
-      webAppService.init(conf);
+      webAppService.init(serviceConf);
       webAppService.start();
       addService(webAppService);
 
@@ -613,7 +633,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
         applicationACLs = response.getApplicationACLs();
 
         //tell the server what the ACLs are 
-        rpcService.getServer().refreshServiceAcl(conf, new SliderAMPolicyProvider());
+        rpcService.getServer().refreshServiceAcl(serviceConf,
+            new SliderAMPolicyProvider());
       }
 
       // extract container list
@@ -979,7 +1000,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
 
       // non complete containers should not be here
       assert (status.getState() == ContainerState.COMPLETE);
-      AppState.NodeCompletionResult result = appState.onCompletedNode(conf, status);
+      AppState.NodeCompletionResult result = appState.onCompletedNode(
+          getConfig(), status);
       if (result.containerFailed) {
         RoleInstance ri = result.roleInstance;
         log.error("Role instance {} failed ", ri);

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0b339b7f/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
index 9981f68..cc238ff 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
@@ -1171,7 +1171,7 @@ public class AppState {
    * @param status the node that has just completed
    * @return NodeCompletionResult
    */
-  public synchronized NodeCompletionResult onCompletedNode(YarnConfiguration amConf,
+  public synchronized NodeCompletionResult onCompletedNode(Configuration amConf,
       ContainerStatus status) {
     ContainerId containerId = status.getContainerId();
     NodeCompletionResult result = new NodeCompletionResult();

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0b339b7f/slider-core/src/main/java/org/apache/slider/server/appmaster/web/SliderAMWebApp.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/SliderAMWebApp.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/SliderAMWebApp.java
index 0e51f88..4f290af 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/SliderAMWebApp.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/SliderAMWebApp.java
@@ -50,7 +50,7 @@ public class SliderAMWebApp extends WebApp {
   public final SliderRegistryService registry;
 
   public SliderAMWebApp(SliderRegistryService registry) {
-    Preconditions.checkNotNull(registry);
+    Preconditions.checkArgument(registry != null, "registry null");
     this.registry = registry;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0b339b7f/slider-core/src/main/java/org/apache/slider/server/appmaster/web/SliderAmIpFilter.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/SliderAmIpFilter.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/SliderAmIpFilter.java
index fc96284..aba344e 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/SliderAmIpFilter.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/SliderAmIpFilter.java
@@ -49,6 +49,7 @@ public class SliderAmIpFilter implements Filter {
   private static final long updateInterval = 5 * 60 * 1000;
   public static final String WS_CONTEXT_ROOT = "slider.rest.context.root";
 
+  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
   private String proxyHost;
   private Set<String> proxyAddresses = null;
   private long lastUpdate;

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0b339b7f/slider-core/src/main/java/org/apache/slider/server/services/curator/CuratorService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/curator/CuratorService.java b/slider-core/src/main/java/org/apache/slider/server/services/curator/CuratorService.java
index 657fa57..645bc8f 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/curator/CuratorService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/curator/CuratorService.java
@@ -42,7 +42,7 @@ public class CuratorService extends AbstractService {
                         CuratorFramework curator,
                         String basePath) {
     super(name);
-    this.curator = Preconditions.checkNotNull(curator, "null client");
+    this.curator = Preconditions.checkNotNull(curator, "null curator");
     this.basePath = basePath;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0b339b7f/slider-core/src/main/resources/webapps/static/yarn.dt.plugins.js
----------------------------------------------------------------------
diff --git a/slider-core/src/main/resources/webapps/static/yarn.dt.plugins.js b/slider-core/src/main/resources/webapps/static/yarn.dt.plugins.js
index d0bde29..6b8d16c 100644
--- a/slider-core/src/main/resources/webapps/static/yarn.dt.plugins.js
+++ b/slider-core/src/main/resources/webapps/static/yarn.dt.plugins.js
@@ -22,7 +22,7 @@ if (!jQuery.fn.dataTableExt.fnVersionCheck("1.7.5")) {
 // don't filter on hidden html elements for an sType of title-numeric
 $.fn.dataTableExt.ofnSearch['title-numeric'] = function ( sData ) {
    return sData.replace(/\n/g," ").replace( /<.*?>/g, "" );
-}
+};
 
 // 'title-numeric' sort type
 jQuery.fn.dataTableExt.oSort['title-numeric-asc']  = function(a,b) {
@@ -71,7 +71,7 @@ jQuery.fn.dataTableExt.oApi.fnSetFilteringDelay = function ( oSettings, iDelay )
     return this;
   } );
   return this;
-}
+};
 
 function renderHadoopDate(data, type, full) {
   if (type === 'display' || type === 'filter') {