You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xg...@apache.org on 2016/12/11 23:35:59 UTC

[1/8] hadoop git commit: HADOOP-13824. FsShell can suppress the real error if no error message is present. Contributed by John Zhuge.

Repository: hadoop
Updated Branches:
  refs/heads/YARN-5734 5bd7dece9 -> 4c38f11ce


HADOOP-13824. FsShell can suppress the real error if no error message is present. Contributed by  John Zhuge.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b606e025
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b606e025
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b606e025

Branch: refs/heads/YARN-5734
Commit: b606e025f10daed18b90b45ac00cd0c82e818581
Parents: 5bd7dec
Author: Wei-Chiu Chuang <we...@apache.org>
Authored: Fri Dec 9 15:22:21 2016 -0800
Committer: Wei-Chiu Chuang <we...@apache.org>
Committed: Fri Dec 9 15:22:21 2016 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/fs/FsShell.java |  7 +-
 .../java/org/apache/hadoop/fs/TestFsShell.java  | 67 ++++++++++--------
 .../apache/hadoop/test/GenericTestUtils.java    | 72 +++++++++++++++++++-
 3 files changed, 117 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b606e025/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsShell.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsShell.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsShell.java
index 1de9374..59d15c2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsShell.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsShell.java
@@ -328,7 +328,12 @@ public class FsShell extends Configured implements Tool {
           scope.close();
         }
       } catch (IllegalArgumentException e) {
-        displayError(cmd, e.getLocalizedMessage());
+        if (e.getMessage() == null) {
+          displayError(cmd, "Null exception message");
+          e.printStackTrace(System.err);
+        } else {
+          displayError(cmd, e.getLocalizedMessage());
+        }
         printUsage(System.err);
         if (instance != null) {
           printInstanceUsage(System.err, instance);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b606e025/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFsShell.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFsShell.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFsShell.java
index 376f8a6..162a942 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFsShell.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFsShell.java
@@ -17,18 +17,19 @@
  */
 package org.apache.hadoop.fs;
 
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-
 import junit.framework.AssertionFailedError;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.fs.shell.Command;
+import org.apache.hadoop.fs.shell.CommandFactory;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.tracing.SetSpanReceiver;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.htrace.core.AlwaysSampler;
 import org.apache.htrace.core.Tracer;
+import org.hamcrest.core.StringContains;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class TestFsShell {
 
@@ -73,30 +74,42 @@ public class TestFsShell {
 
   @Test
   public void testDFSWithInvalidCommmand() throws Throwable {
-    Configuration conf = new Configuration();
-    FsShell shell = new FsShell(conf);
-    String[] args = new String[1];
-    args[0] = "dfs -mkdirs";
-    final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
-    final PrintStream out = new PrintStream(bytes);
-    final PrintStream oldErr = System.err;
-    try {
-      System.setErr(out);
-      ToolRunner.run(shell, args);
-      String errorValue=new String(bytes.toString());
-      Assert
-      .assertTrue(
-          "FSShell dfs command did not print the error " +
-          "message when invalid command is passed",
-          errorValue.contains("-mkdirs: Unknown command"));
-      Assert
-          .assertTrue(
-              "FSShell dfs command did not print help " +
+    FsShell shell = new FsShell(new Configuration());
+    try (GenericTestUtils.SystemErrCapturer capture =
+             new GenericTestUtils.SystemErrCapturer()) {
+      ToolRunner.run(shell, new String[]{"dfs -mkdirs"});
+      Assert.assertThat("FSShell dfs command did not print the error " +
               "message when invalid command is passed",
-          errorValue.contains("Usage: hadoop fs [generic options]"));
-    } finally {
-      IOUtils.closeStream(out);
-      System.setErr(oldErr);
+          capture.getOutput(), StringContains.containsString(
+              "-mkdirs: Unknown command"));
+      Assert.assertThat("FSShell dfs command did not print help " +
+              "message when invalid command is passed",
+          capture.getOutput(), StringContains.containsString(
+              "Usage: hadoop fs [generic options]"));
+    }
+  }
+
+  @Test
+  public void testExceptionNullMessage() throws Exception {
+    final String cmdName = "-cmdExNullMsg";
+    final Command cmd = Mockito.mock(Command.class);
+    Mockito.when(cmd.run(Mockito.anyVararg())).thenThrow(
+        new IllegalArgumentException());
+    Mockito.when(cmd.getUsage()).thenReturn(cmdName);
+
+    final CommandFactory cmdFactory = Mockito.mock(CommandFactory.class);
+    final String[] names = {cmdName};
+    Mockito.when(cmdFactory.getNames()).thenReturn(names);
+    Mockito.when(cmdFactory.getInstance(cmdName)).thenReturn(cmd);
+
+    FsShell shell = new FsShell(new Configuration());
+    shell.commandFactory = cmdFactory;
+    try (GenericTestUtils.SystemErrCapturer capture =
+             new GenericTestUtils.SystemErrCapturer()) {
+      ToolRunner.run(shell, new String[]{cmdName});
+      Assert.assertThat(capture.getOutput(),
+          StringContains.containsString(cmdName
+              + ": Null exception message"));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b606e025/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
index 0b73cf5..96ba123 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
@@ -18,10 +18,13 @@
 package org.apache.hadoop.test;
 
 import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
 import java.io.StringWriter;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
@@ -272,7 +275,74 @@ public abstract class GenericTestUtils {
         "Thread diagnostics:\n" +
         TimedOutTestsListener.buildThreadDiagnosticString());
   }
-  
+
+  /**
+   * Prints output to one {@link PrintStream} while copying to the other.
+   * <p>
+   * Closing the main {@link PrintStream} will NOT close the other.
+   */
+  public static class TeePrintStream extends PrintStream {
+    private final PrintStream other;
+
+    public TeePrintStream(OutputStream main, PrintStream other) {
+      super(main);
+      this.other = other;
+    }
+
+    @Override
+    public void flush() {
+      super.flush();
+      other.flush();
+    }
+
+    @Override
+    public void write(byte[] buf, int off, int len) {
+      super.write(buf, off, len);
+      other.write(buf, off, len);
+    }
+  }
+
+  /**
+   * Capture output printed to {@link System#err}.
+   * <p>
+   * Usage:
+   * <pre>
+   *   try (SystemErrCapturer capture = new SystemErrCapturer()) {
+   *     ...
+   *     // Call capture.getOutput() to get the output string
+   *   }
+   * </pre>
+   *
+   * TODO: Add lambda support once Java 8 is common.
+   * <pre>
+   *   SystemErrCapturer.withCapture(capture -> {
+   *     ...
+   *   })
+   * </pre>
+   */
+  public static class SystemErrCapturer implements AutoCloseable {
+    final private ByteArrayOutputStream bytes;
+    final private PrintStream bytesPrintStream;
+    final private PrintStream oldErr;
+
+    public SystemErrCapturer() {
+      bytes = new ByteArrayOutputStream();
+      bytesPrintStream = new PrintStream(bytes);
+      oldErr = System.err;
+      System.setErr(new TeePrintStream(oldErr, bytesPrintStream));
+    }
+
+    public String getOutput() {
+      return bytes.toString();
+    }
+
+    @Override
+    public void close() throws Exception {
+      IOUtils.closeQuietly(bytesPrintStream);
+      System.setErr(oldErr);
+    }
+  }
+
   public static class LogCapturer {
     private StringWriter sw = new StringWriter();
     private WriterAppender appender;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[8/8] hadoop git commit: HADOOP-13565. KerberosAuthenticationHandler#authenticate should not rebuild SPN based on client request. Contributed by Xiaoyu Yao.

Posted by xg...@apache.org.
HADOOP-13565. KerberosAuthenticationHandler#authenticate should not rebuild SPN based on client request. Contributed by Xiaoyu Yao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4c38f11c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4c38f11c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4c38f11c

Branch: refs/heads/YARN-5734
Commit: 4c38f11cec0664b70e52f9563052dca8fb17c33f
Parents: 92a8917
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Fri Dec 9 21:27:04 2016 -0800
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Fri Dec 9 21:27:04 2016 -0800

----------------------------------------------------------------------
 .../server/KerberosAuthenticationHandler.java   | 253 +++++++++++++------
 1 file changed, 176 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c38f11c/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/KerberosAuthenticationHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/KerberosAuthenticationHandler.java b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/KerberosAuthenticationHandler.java
index c6d1881..f51bbd6 100644
--- a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/KerberosAuthenticationHandler.java
+++ b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/KerberosAuthenticationHandler.java
@@ -18,6 +18,7 @@ import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.ietf.jgss.GSSException;
 import org.ietf.jgss.GSSContext;
 import org.ietf.jgss.GSSCredential;
 import org.ietf.jgss.GSSManager;
@@ -48,25 +49,32 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import com.google.common.collect.HashMultimap;
+
 import static org.apache.hadoop.util.PlatformName.IBM_JAVA;
 
 /**
- * The {@link KerberosAuthenticationHandler} implements the Kerberos SPNEGO authentication mechanism for HTTP.
+ * The {@link KerberosAuthenticationHandler} implements the Kerberos SPNEGO
+ * authentication mechanism for HTTP.
  * <p>
  * The supported configuration properties are:
  * <ul>
- * <li>kerberos.principal: the Kerberos principal to used by the server. As stated by the Kerberos SPNEGO
- * specification, it should be <code>HTTP/${HOSTNAME}@{REALM}</code>. The realm can be omitted from the
- * principal as the JDK GSS libraries will use the realm name of the configured default realm.
+ * <li>kerberos.principal: the Kerberos principal to used by the server. As
+ * stated by the Kerberos SPNEGO specification, it should be
+ * <code>HTTP/${HOSTNAME}@{REALM}</code>. The realm can be omitted from the
+ * principal as the JDK GSS libraries will use the realm name of the configured
+ * default realm.
  * It does not have a default value.</li>
- * <li>kerberos.keytab: the keytab file containing the credentials for the Kerberos principal.
+ * <li>kerberos.keytab: the keytab file containing the credentials for the
+ * Kerberos principal.
  * It does not have a default value.</li>
- * <li>kerberos.name.rules: kerberos names rules to resolve principal names, see 
+ * <li>kerberos.name.rules: kerberos names rules to resolve principal names, see
  * {@link KerberosName#setRules(String)}</li>
  * </ul>
  */
 public class KerberosAuthenticationHandler implements AuthenticationHandler {
-  private static Logger LOG = LoggerFactory.getLogger(KerberosAuthenticationHandler.class);
+  private static final Logger LOG = LoggerFactory.getLogger(
+      KerberosAuthenticationHandler.class);
 
   /**
    * Kerberos context configuration for the JDK GSS library.
@@ -117,8 +125,8 @@ public class KerberosAuthenticationHandler implements AuthenticationHandler {
 
       return new AppConfigurationEntry[]{
           new AppConfigurationEntry(KerberosUtil.getKrb5LoginModuleName(),
-                                  AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
-                                  options),};
+              AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+              options), };
     }
   }
 
@@ -128,12 +136,14 @@ public class KerberosAuthenticationHandler implements AuthenticationHandler {
   public static final String TYPE = "kerberos";
 
   /**
-   * Constant for the configuration property that indicates the kerberos principal.
+   * Constant for the configuration property that indicates the kerberos
+   * principal.
    */
   public static final String PRINCIPAL = TYPE + ".principal";
 
   /**
-   * Constant for the configuration property that indicates the keytab file path.
+   * Constant for the configuration property that indicates the keytab
+   * file path.
    */
   public static final String KEYTAB = TYPE + ".keytab";
 
@@ -148,6 +158,42 @@ public class KerberosAuthenticationHandler implements AuthenticationHandler {
   private GSSManager gssManager;
   private Subject serverSubject = new Subject();
   private List<LoginContext> loginContexts = new ArrayList<LoginContext>();
+  /**
+   * HADOOP-10158 added support of running HTTP with multiple SPNs
+   * but implicit requirements is that they must come from the SAME local realm.
+   *
+   * This is a regression for use cases where HTTP service needs to run with
+   * with SPN from foreign realm, which is not supported after HADOOP-10158.
+   *
+   * HADOOP-13565 brings back support of SPNs from foreign realms
+   * without dependency on specific Kerberos domain_realm mapping mechanism.
+   *
+   * There are several reasons for not using native Kerberos domain_realm
+   * mapping:
+   * 1. As commented in KerberosUtil#getDomainRealm(), JDK's
+   * domain_realm mapping routines are private to the security.krb5
+   * package. As a result, KerberosUtil#getDomainRealm() always return local
+   * realm.
+   *
+   * 2. Server krb5.conf is not the only place that contains the domain_realm
+   * mapping in real deployment. Based on MIT KDC document here:
+   * https://web.mit.edu/kerberos/krb5-1.13/doc/admin/realm_config.html, the
+   * Kerberos domain_realm mapping can be implemented in one of the three
+   * mechanisms:
+   * 1) Server host-based krb5.conf on HTTP server
+   * 2) KDC-based krb5.conf on KDC server
+   * 3) DNS-based with TXT record with _kerberos prefix to the hostname.
+   *
+   * We choose to maintain domain_realm mapping based on HTTP principals
+   * from keytab. The mapping is built at login time with HTTP principals
+   * key-ed by server name and is used later to
+   * looked up SPNs based on server name from request for authentication.
+   * The multi-map implementation allows SPNs of same server from
+   * different realms.
+   *
+   */
+  private HashMultimap<String, String> serverPrincipalMap =
+      HashMultimap.create();
 
   /**
    * Creates a Kerberos SPNEGO authentication handler with the default
@@ -170,7 +216,8 @@ public class KerberosAuthenticationHandler implements AuthenticationHandler {
   /**
    * Initializes the authentication handler instance.
    * <p>
-   * It creates a Kerberos context using the principal and keytab specified in the configuration.
+   * It creates a Kerberos context using the principal and keytab specified in
+   * the configuration.
    * <p>
    * This method is invoked by the {@link AuthenticationFilter#init} method.
    *
@@ -225,15 +272,27 @@ public class KerberosAuthenticationHandler implements AuthenticationHandler {
           throw new AuthenticationException(le);          
         }
         loginContexts.add(loginContext);
+        KerberosName kerbName = new KerberosName(spnegoPrincipal);
+        if (kerbName.getHostName() != null
+            && kerbName.getRealm() != null
+            && kerbName.getServiceName() != null
+            && kerbName.getServiceName().equals("HTTP")) {
+          LOG.trace("Map server: {} to principal: {}", kerbName.getHostName(),
+              spnegoPrincipal);
+          serverPrincipalMap.put(kerbName.getHostName(), spnegoPrincipal);
+        } else {
+          LOG.warn("HTTP principal: {} is invalid for SPNEGO!",
+              spnegoPrincipal);
+        }
       }
       try {
-        gssManager = Subject.doAs(serverSubject, new PrivilegedExceptionAction<GSSManager>() {
-
-          @Override
-          public GSSManager run() throws Exception {
-            return GSSManager.getInstance();
-          }
-        });
+        gssManager = Subject.doAs(serverSubject,
+            new PrivilegedExceptionAction<GSSManager>() {
+              @Override
+              public GSSManager run() throws Exception {
+                return GSSManager.getInstance();
+              }
+            });
       } catch (PrivilegedActionException ex) {
         throw ex.getException();
       }
@@ -312,91 +371,84 @@ public class KerberosAuthenticationHandler implements AuthenticationHandler {
   }
 
   /**
-   * It enforces the the Kerberos SPNEGO authentication sequence returning an {@link AuthenticationToken} only
-   * after the Kerberos SPNEGO sequence has completed successfully.
+   * It enforces the the Kerberos SPNEGO authentication sequence returning an
+   * {@link AuthenticationToken} only after the Kerberos SPNEGO sequence has
+   * completed successfully.
    *
    * @param request the HTTP client request.
    * @param response the HTTP client response.
    *
-   * @return an authentication token if the Kerberos SPNEGO sequence is complete and valid,
-   *         <code>null</code> if it is in progress (in this case the handler handles the response to the client).
+   * @return an authentication token if the Kerberos SPNEGO sequence is complete
+   * and valid, <code>null</code> if it is in progress (in this case the handler
+   * handles the response to the client).
    *
    * @throws IOException thrown if an IO error occurred.
    * @throws AuthenticationException thrown if Kerberos SPNEGO sequence failed.
    */
   @Override
-  public AuthenticationToken authenticate(HttpServletRequest request, final HttpServletResponse response)
-    throws IOException, AuthenticationException {
+  public AuthenticationToken authenticate(HttpServletRequest request,
+      final HttpServletResponse response)
+      throws IOException, AuthenticationException {
     AuthenticationToken token = null;
-    String authorization = request.getHeader(KerberosAuthenticator.AUTHORIZATION);
+    String authorization = request.getHeader(
+        KerberosAuthenticator.AUTHORIZATION);
 
-    if (authorization == null || !authorization.startsWith(KerberosAuthenticator.NEGOTIATE)) {
+    if (authorization == null
+        || !authorization.startsWith(KerberosAuthenticator.NEGOTIATE)) {
       response.setHeader(WWW_AUTHENTICATE, KerberosAuthenticator.NEGOTIATE);
       response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
       if (authorization == null) {
-        LOG.trace("SPNEGO starting");
+        LOG.trace("SPNEGO starting for url: {}", request.getRequestURL());
       } else {
-        LOG.warn("'" + KerberosAuthenticator.AUTHORIZATION + "' does not start with '" +
+        LOG.warn("'" + KerberosAuthenticator.AUTHORIZATION +
+            "' does not start with '" +
             KerberosAuthenticator.NEGOTIATE + "' :  {}", authorization);
       }
     } else {
-      authorization = authorization.substring(KerberosAuthenticator.NEGOTIATE.length()).trim();
+      authorization = authorization.substring(
+          KerberosAuthenticator.NEGOTIATE.length()).trim();
       final Base64 base64 = new Base64(0);
       final byte[] clientToken = base64.decode(authorization);
       final String serverName = InetAddress.getByName(request.getServerName())
                                            .getCanonicalHostName();
       try {
-        token = Subject.doAs(serverSubject, new PrivilegedExceptionAction<AuthenticationToken>() {
-
-          @Override
-          public AuthenticationToken run() throws Exception {
-            AuthenticationToken token = null;
-            GSSContext gssContext = null;
-            GSSCredential gssCreds = null;
-            try {
-              gssCreds = gssManager.createCredential(
-                  gssManager.createName(
-                      KerberosUtil.getServicePrincipal("HTTP", serverName),
-                      KerberosUtil.getOidInstance("NT_GSS_KRB5_PRINCIPAL")),
-                  GSSCredential.INDEFINITE_LIFETIME,
-                  new Oid[]{
-                    KerberosUtil.getOidInstance("GSS_SPNEGO_MECH_OID"),
-                    KerberosUtil.getOidInstance("GSS_KRB5_MECH_OID")},
-                  GSSCredential.ACCEPT_ONLY);
-              gssContext = gssManager.createContext(gssCreds);
-              byte[] serverToken = gssContext.acceptSecContext(clientToken, 0, clientToken.length);
-              if (serverToken != null && serverToken.length > 0) {
-                String authenticate = base64.encodeToString(serverToken);
-                response.setHeader(KerberosAuthenticator.WWW_AUTHENTICATE,
-                                   KerberosAuthenticator.NEGOTIATE + " " + authenticate);
+        token = Subject.doAs(serverSubject,
+            new PrivilegedExceptionAction<AuthenticationToken>() {
+              private Set<String> serverPrincipals =
+                  serverPrincipalMap.get(serverName);
+              @Override
+              public AuthenticationToken run() throws Exception {
+                if (LOG.isTraceEnabled()) {
+                  LOG.trace("SPNEGO with principals: {}",
+                      serverPrincipals.toString());
+                }
+                AuthenticationToken token = null;
+                Exception lastException = null;
+                for (String serverPrincipal : serverPrincipals) {
+                  try {
+                    token = runWithPrincipal(serverPrincipal, clientToken,
+                        base64, response);
+                  } catch (Exception ex) {
+                    lastException = ex;
+                    LOG.trace("Auth {} failed with {}", serverPrincipal, ex);
+                  } finally {
+                      if (token != null) {
+                        LOG.trace("Auth {} successfully", serverPrincipal);
+                        break;
+                    }
+                  }
+                }
+                if (token != null) {
+                  return token;
+                } else {
+                  throw new AuthenticationException(lastException);
+                }
               }
-              if (!gssContext.isEstablished()) {
-                response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
-                LOG.trace("SPNEGO in progress");
-              } else {
-                String clientPrincipal = gssContext.getSrcName().toString();
-                KerberosName kerberosName = new KerberosName(clientPrincipal);
-                String userName = kerberosName.getShortName();
-                token = new AuthenticationToken(userName, clientPrincipal, getType());
-                response.setStatus(HttpServletResponse.SC_OK);
-                LOG.trace("SPNEGO completed for principal [{}]", clientPrincipal);
-              }
-            } finally {
-              if (gssContext != null) {
-                gssContext.dispose();
-              }
-              if (gssCreds != null) {
-                gssCreds.dispose();
-              }
-            }
-            return token;
-          }
-        });
+            });
       } catch (PrivilegedActionException ex) {
         if (ex.getException() instanceof IOException) {
           throw (IOException) ex.getException();
-        }
-        else {
+        } else {
           throw new AuthenticationException(ex.getException());
         }
       }
@@ -404,4 +456,51 @@ public class KerberosAuthenticationHandler implements AuthenticationHandler {
     return token;
   }
 
+  private AuthenticationToken runWithPrincipal(String serverPrincipal,
+      byte[] clientToken, Base64 base64, HttpServletResponse response) throws
+      IOException, AuthenticationException, ClassNotFoundException,
+      GSSException, IllegalAccessException, NoSuchFieldException {
+    GSSContext gssContext = null;
+    GSSCredential gssCreds = null;
+    AuthenticationToken token = null;
+    try {
+      LOG.trace("SPNEGO initiated with principal {}", serverPrincipal);
+      gssCreds = this.gssManager.createCredential(
+          this.gssManager.createName(serverPrincipal,
+              KerberosUtil.getOidInstance("NT_GSS_KRB5_PRINCIPAL")),
+          GSSCredential.INDEFINITE_LIFETIME,
+          new Oid[]{
+              KerberosUtil.getOidInstance("GSS_SPNEGO_MECH_OID"),
+              KerberosUtil.getOidInstance("GSS_KRB5_MECH_OID")},
+          GSSCredential.ACCEPT_ONLY);
+      gssContext = this.gssManager.createContext(gssCreds);
+      byte[] serverToken = gssContext.acceptSecContext(clientToken, 0,
+          clientToken.length);
+      if (serverToken != null && serverToken.length > 0) {
+        String authenticate = base64.encodeToString(serverToken);
+        response.setHeader(KerberosAuthenticator.WWW_AUTHENTICATE,
+                           KerberosAuthenticator.NEGOTIATE + " " +
+                           authenticate);
+      }
+      if (!gssContext.isEstablished()) {
+        response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
+        LOG.trace("SPNEGO in progress");
+      } else {
+        String clientPrincipal = gssContext.getSrcName().toString();
+        KerberosName kerberosName = new KerberosName(clientPrincipal);
+        String userName = kerberosName.getShortName();
+        token = new AuthenticationToken(userName, clientPrincipal, getType());
+        response.setStatus(HttpServletResponse.SC_OK);
+        LOG.trace("SPNEGO completed for principal [{}]", clientPrincipal);
+      }
+    } finally {
+      if (gssContext != null) {
+        gssContext.dispose();
+      }
+      if (gssCreds != null) {
+        gssCreds.dispose();
+      }
+    }
+    return token;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[4/8] hadoop git commit: YARN-5982. Simplify opportunistic container parameters and metrics. (Konstantinos Karanasos via asuresh)

Posted by xg...@apache.org.
YARN-5982. Simplify opportunistic container parameters and metrics. (Konstantinos Karanasos via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b0aace21
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b0aace21
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b0aace21

Branch: refs/heads/YARN-5734
Commit: b0aace21b1ef3436ba9d516186208fee9a9ceef2
Parents: 55f5886
Author: Arun Suresh <as...@apache.org>
Authored: Fri Dec 9 16:41:25 2016 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Fri Dec 9 16:41:25 2016 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     | 41 ------------
 .../src/main/resources/yarn-default.xml         | 57 -----------------
 .../scheduler/ContainerScheduler.java           | 10 +--
 .../nodemanager/metrics/NodeManagerMetrics.java | 65 ++++++++++----------
 ...pportunisticContainerAllocatorAMService.java | 60 ++++--------------
 .../scheduler/SchedulerNode.java                |  6 +-
 .../resourcemanager/webapp/NodesPage.java       |  6 +-
 .../resourcemanager/webapp/dao/NodeInfo.java    | 10 +--
 .../webapp/TestRMWebServicesNodes.java          | 10 +--
 9 files changed, 68 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0aace21/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 4934964..69c7b00 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -323,47 +323,6 @@ public class YarnConfiguration extends Configuration {
   public static final boolean
       OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT = false;
 
-  /** Minimum memory (in MB) used for allocating an opportunistic container. */
-  public static final String OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB =
-      YARN_PREFIX + "opportunistic-containers.min-memory-mb";
-  public static final int OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT = 512;
-
-  /** Minimum virtual CPU cores used for allocating an opportunistic container.
-   * */
-  public static final String OPPORTUNISTIC_CONTAINERS_MIN_VCORES =
-      YARN_PREFIX + "opportunistic-containers.min-vcores";
-  public static final int OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT = 1;
-
-  /** Maximum memory (in MB) used for allocating an opportunistic container. */
-  public static final String OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB =
-      YARN_PREFIX + "opportunistic-containers.max-memory-mb";
-  public static final int OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT = 2048;
-
-  /** Maximum virtual CPU cores used for allocating an opportunistic container.
-   * */
-  public static final String OPPORTUNISTIC_CONTAINERS_MAX_VCORES =
-      YARN_PREFIX + "opportunistic-containers.max-vcores";
-  public static final int OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT = 4;
-
-  /** Incremental memory (in MB) used for allocating an opportunistic container.
-   * */
-  public static final String OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB =
-      YARN_PREFIX + "opportunistic-containers.incr-memory-mb";
-  public static final int OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT =
-      512;
-
-  /** Incremental virtual CPU cores used for allocating an opportunistic
-   * container. */
-  public static final String OPPORTUNISTIC_CONTAINERS_INCR_VCORES =
-      YARN_PREFIX + "opportunistic-containers.incr-vcores";
-  public static final int OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT = 1;
-
-  /** Container token expiry for opportunistic containers. */
-  public static final String OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS =
-      YARN_PREFIX + "opportunistic-containers.container-token-expiry-ms";
-  public static final int OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT =
-      600000;
-
   /** Number of nodes to be used by the Opportunistic Container allocator for
    * dispatching containers during container allocation. */
   public static final String OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0aace21/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 47d12d1..c8c4edd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2766,63 +2766,6 @@
 
   <property>
     <description>
-      Minimum memory (in MB) used for allocating an opportunistic container.
-    </description>
-    <name>yarn.opportunistic-containers.min-memory-mb</name>
-    <value>512</value>
-  </property>
-
-  <property>
-    <description>
-      Minimum virtual CPU cores used for allocating an opportunistic container.
-    </description>
-    <name>yarn.opportunistic-containers.min-vcores</name>
-    <value>1</value>
-  </property>
-
-  <property>
-    <description>
-    Maximum memory (in MB) used for allocating an opportunistic container.
-    </description>
-    <name>yarn.opportunistic-containers.max-memory-mb</name>
-    <value>2048</value>
-  </property>
-
-  <property>
-    <description>
-    Maximum virtual CPU cores used for allocating an opportunistic container.
-    </description>
-    <name>yarn.opportunistic-containers.max-vcores</name>
-    <value>4</value>
-  </property>
-
-  <property>
-    <description>
-    Incremental memory (in MB) used for allocating an opportunistic container.
-    </description>
-    <name>yarn.opportunistic-containers.incr-memory-mb</name>
-    <value>512</value>
-  </property>
-
-  <property>
-    <description>
-    Incremental virtual CPU cores used for allocating an opportunistic
-      container.
-    </description>
-    <name>yarn.opportunistic-containers.incr-vcores</name>
-    <value>1</value>
-  </property>
-
-  <property>
-    <description>
-    Container token expiry for opportunistic containers.
-    </description>
-    <name>yarn.opportunistic-containers.container-token-expiry-ms</name>
-    <value>600000</value>
-  </property>
-
-  <property>
-    <description>
     Number of nodes to be used by the Opportunistic Container Allocator for
     dispatching containers during container allocation.
     </description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0aace21/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index 0c2b1ac..753bf04 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -170,11 +170,11 @@ public class ContainerScheduler extends AbstractService implements
     this.opportunisticContainersStatus.setWaitQueueLength(
         getNumQueuedContainers());
     this.opportunisticContainersStatus.setOpportMemoryUsed(
-        metrics.getOpportMemoryUsed());
+        metrics.getAllocatedOpportunisticGB());
     this.opportunisticContainersStatus.setOpportCoresUsed(
-        metrics.getOpportCoresUsed());
+        metrics.getAllocatedOpportunisticVCores());
     this.opportunisticContainersStatus.setRunningOpportContainers(
-        metrics.getRunningOpportContainers());
+        metrics.getRunningOpportunisticContainers());
     return this.opportunisticContainersStatus;
   }
 
@@ -196,7 +196,7 @@ public class ContainerScheduler extends AbstractService implements
       this.utilizationTracker.subtractContainerResource(container);
       if (container.getContainerTokenIdentifier().getExecutionType() ==
           ExecutionType.OPPORTUNISTIC) {
-        this.metrics.opportunisticContainerCompleted(container);
+        this.metrics.completeOpportunisticContainer(container.getResource());
       }
       startPendingContainers();
     }
@@ -298,7 +298,7 @@ public class ContainerScheduler extends AbstractService implements
     this.utilizationTracker.addContainerResources(container);
     if (container.getContainerTokenIdentifier().getExecutionType() ==
         ExecutionType.OPPORTUNISTIC) {
-      this.metrics.opportunisticContainerStarted(container);
+      this.metrics.startOpportunisticContainer(container.getResource());
     }
     container.sendLaunchEvent();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0aace21/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java
index b001b63..291b488 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java
@@ -29,8 +29,6 @@ import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.yarn.api.records.Resource;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
-    .Container;
 
 @Metrics(about="Metrics for node manager", context="yarn")
 public class NodeManagerMetrics {
@@ -64,12 +62,12 @@ public class NodeManagerMetrics {
   @Metric("Disk utilization % on good log dirs")
       MutableGaugeInt goodLogDirsDiskUtilizationPerc;
 
-  @Metric("Memory used by Opportunistic Containers in MB")
-      MutableGaugeLong opportMemoryUsed;
-  @Metric("# of Virtual Cores used by opportunistic containers")
-      MutableGaugeInt opportCoresUsed;
+  @Metric("Current allocated memory by opportunistic containers in GB")
+      MutableGaugeLong allocatedOpportunisticGB;
+  @Metric("Current allocated Virtual Cores by opportunistic containers")
+      MutableGaugeInt allocatedOpportunisticVCores;
   @Metric("# of running opportunistic containers")
-      MutableGaugeInt runningOpportContainers;
+      MutableGaugeInt runningOpportunisticContainers;
 
   // CHECKSTYLE:ON:VisibilityModifier
 
@@ -77,6 +75,7 @@ public class NodeManagerMetrics {
 
   private long allocatedMB;
   private long availableMB;
+  private long allocatedOpportunisticMB;
 
   public NodeManagerMetrics(JvmMetrics jvmMetrics) {
     this.jvmMetrics = jvmMetrics;
@@ -141,30 +140,6 @@ public class NodeManagerMetrics {
     containersReIniting.decr();
   }
 
-  public long getOpportMemoryUsed() {
-    return opportMemoryUsed.value();
-  }
-
-  public int getOpportCoresUsed() {
-    return opportCoresUsed.value();
-  }
-
-  public int getRunningOpportContainers() {
-    return runningOpportContainers.value();
-  }
-
-  public void opportunisticContainerCompleted(Container container) {
-    opportMemoryUsed.decr(container.getResource().getMemorySize());
-    opportCoresUsed.decr(container.getResource().getVirtualCores());
-    runningOpportContainers.decr();
-  }
-
-  public void opportunisticContainerStarted(Container container) {
-    opportMemoryUsed.incr(container.getResource().getMemorySize());
-    opportCoresUsed.incr(container.getResource().getVirtualCores());
-    runningOpportContainers.incr();
-  }
-
   public void allocateContainer(Resource res) {
     allocatedContainers.incr();
     allocatedMB = allocatedMB + res.getMemorySize();
@@ -196,6 +171,22 @@ public class NodeManagerMetrics {
     availableVCores.decr(deltaVCores);
   }
 
+  public void startOpportunisticContainer(Resource res) {
+    runningOpportunisticContainers.incr();
+    allocatedOpportunisticMB = allocatedOpportunisticMB + res.getMemorySize();
+    allocatedOpportunisticGB
+        .set((int) Math.ceil(allocatedOpportunisticMB / 1024d));
+    allocatedOpportunisticVCores.incr(res.getVirtualCores());
+  }
+
+  public void completeOpportunisticContainer(Resource res) {
+    runningOpportunisticContainers.decr();
+    allocatedOpportunisticMB = allocatedOpportunisticMB - res.getMemorySize();
+    allocatedOpportunisticGB
+        .set((int) Math.ceil(allocatedOpportunisticMB / 1024d));
+    allocatedOpportunisticVCores.decr(res.getVirtualCores());
+  }
+
   public void addResource(Resource res) {
     availableMB = availableMB + res.getMemorySize();
     availableGB.incr((int)Math.floor(availableMB/1024d));
@@ -272,4 +263,16 @@ public class NodeManagerMetrics {
   public int getContainersRolledbackOnFailure() {
     return containersRolledBackOnFailure.value();
   }
+
+  public long getAllocatedOpportunisticGB() {
+    return allocatedOpportunisticGB.value();
+  }
+
+  public int getAllocatedOpportunisticVCores() {
+    return allocatedOpportunisticVCores.value();
+  }
+
+  public int getRunningOpportunisticContainers() {
+    return runningOpportunisticContainers.value();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0aace21/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index 7814b84..7b7963f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -199,11 +198,12 @@ public class OpportunisticContainerAllocatorAMService
         }
       });
       int tokenExpiryInterval = getConfig()
-          .getInt(YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS,
-              YarnConfiguration.
-                  OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT);
-      opCtx.updateAllocationParams(createMinContainerResource(),
-          createMaxContainerResource(), createIncrContainerResource(),
+          .getInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
+              YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS);
+      opCtx.updateAllocationParams(
+          rmContext.getScheduler().getMinimumResourceCapability(),
+          rmContext.getScheduler().getMaximumResourceCapability(),
+          rmContext.getScheduler().getMinimumResourceCapability(),
           tokenExpiryInterval);
       appAttempt.setOpportunisticContainerContext(opCtx);
     }
@@ -273,14 +273,14 @@ public class OpportunisticContainerAllocatorAMService
     RegisterDistributedSchedulingAMResponse dsResp = recordFactory
         .newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
     dsResp.setRegisterResponse(response);
-    dsResp.setMinContainerResource(createMinContainerResource());
-    dsResp.setMaxContainerResource(createMaxContainerResource());
-    dsResp.setIncrContainerResource(createIncrContainerResource());
+    dsResp.setMinContainerResource(
+        rmContext.getScheduler().getMinimumResourceCapability());
+    dsResp.setMaxContainerResource(
+        rmContext.getScheduler().getMaximumResourceCapability());
     dsResp.setContainerTokenExpiryInterval(
         getConfig().getInt(
-            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS,
-            YarnConfiguration.
-                OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT));
+            YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS));
     dsResp.setContainerIdStart(
         this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
 
@@ -384,18 +384,6 @@ public class OpportunisticContainerAllocatorAMService
     return nodeMonitor.getThresholdCalculator();
   }
 
-  private Resource createIncrContainerResource() {
-    return Resource.newInstance(
-        getConfig().getInt(
-            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB,
-            YarnConfiguration.
-                OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT),
-        getConfig().getInt(
-            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES,
-            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT)
-    );
-  }
-
   private synchronized List<RemoteNode> getLeastLoadedNodes() {
     long currTime = System.currentTimeMillis();
     if ((currTime - lastCacheUpdateTime > cacheRefreshInterval)
@@ -425,30 +413,6 @@ public class OpportunisticContainerAllocatorAMService
         : null;
   }
 
-  private Resource createMaxContainerResource() {
-    return Resource.newInstance(
-        getConfig().getInt(
-            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB,
-            YarnConfiguration
-                .OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT),
-        getConfig().getInt(
-            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES,
-            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT)
-    );
-  }
-
-  private Resource createMinContainerResource() {
-    return Resource.newInstance(
-        getConfig().getInt(
-            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB,
-            YarnConfiguration.
-                OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT),
-        getConfig().getInt(
-            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES,
-            YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT)
-    );
-  }
-
   private static ApplicationAttemptId getAppAttemptId() throws YarnException {
     AMRMTokenIdentifier amrmTokenIdentifier =
         YarnServerSecurityUtils.authorizeRequest();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0aace21/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 995a7b0..6744c2e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -149,10 +149,10 @@ public abstract class SchedulerNode {
    */
   public synchronized void allocateContainer(RMContainer rmContainer) {
     Container container = rmContainer.getContainer();
-    if (rmContainer.getExecutionType() != ExecutionType.OPPORTUNISTIC) {
+    if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
       deductUnallocatedResource(container.getResource());
+      ++numContainers;
     }
-    ++numContainers;
 
     launchedContainers.put(container.getId(), rmContainer);
 
@@ -251,8 +251,8 @@ public abstract class SchedulerNode {
       Container container) {
     if (container.getExecutionType() == ExecutionType.GUARANTEED) {
       addUnallocatedResource(container.getResource());
+      --numContainers;
     }
-    --numContainers;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0aace21/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
index 7b68b84..c03df63 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java
@@ -49,6 +49,7 @@ class NodesPage extends RmView {
   static class NodesBlock extends HtmlBlock {
     final ResourceManager rm;
     private static final long BYTES_IN_MB = 1024 * 1024;
+    private static final long BYTES_IN_GB = 1024 * 1024 * 1024;
     private static boolean opportunisticContainersEnabled;
 
     @Inject
@@ -181,8 +182,9 @@ class NodesPage extends RmView {
           nodeTableData
               .append(String.valueOf(info.getNumRunningOpportContainers()))
               .append("\",\"").append("<br title='")
-              .append(String.valueOf(info.getUsedMemoryOpport())).append("'>")
-              .append(StringUtils.byteDesc(info.getUsedMemoryOpport()))
+              .append(String.valueOf(info.getUsedMemoryOpportGB())).append("'>")
+              .append(StringUtils.byteDesc(
+                  info.getUsedMemoryOpportGB() * BYTES_IN_GB))
               .append("\",\"")
               .append(String.valueOf(info.getUsedVirtualCoresOpport()))
               .append("\",\"")

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0aace21/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java
index aee7717..3416e52 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java
@@ -51,7 +51,7 @@ public class NodeInfo {
   protected long usedVirtualCores;
   protected long availableVirtualCores;
   private int numRunningOpportContainers;
-  private long usedMemoryOpport;    // Memory in bytes.
+  private long usedMemoryOpportGB;
   private long usedVirtualCoresOpport;
   private int numQueuedContainers;
   protected ArrayList<String> nodeLabels = new ArrayList<String>();
@@ -85,7 +85,7 @@ public class NodeInfo {
 
     // Status of opportunistic containers.
     this.numRunningOpportContainers = 0;
-    this.usedMemoryOpport = 0;
+    this.usedMemoryOpportGB = 0;
     this.usedVirtualCoresOpport = 0;
     this.numQueuedContainers = 0;
     OpportunisticContainersStatus opportStatus =
@@ -93,7 +93,7 @@ public class NodeInfo {
     if (opportStatus != null) {
       this.numRunningOpportContainers =
           opportStatus.getRunningOpportContainers();
-      this.usedMemoryOpport = opportStatus.getOpportMemoryUsed();
+      this.usedMemoryOpportGB = opportStatus.getOpportMemoryUsed();
       this.usedVirtualCoresOpport = opportStatus.getOpportCoresUsed();
       this.numQueuedContainers = opportStatus.getQueuedOpportContainers();
     }
@@ -165,8 +165,8 @@ public class NodeInfo {
     return numRunningOpportContainers;
   }
 
-  public long getUsedMemoryOpport() {
-    return usedMemoryOpport;
+  public long getUsedMemoryOpportGB() {
+    return usedMemoryOpportGB;
   }
 
   public long getUsedVirtualCoresOpport() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b0aace21/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
index 10aa92a..fb597fc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
@@ -726,7 +726,7 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
               "aggregatedContainersVirtualMemoryMB"),
           WebServicesTestUtils.getXmlFloat(element, "containersCPUUsage"),
           WebServicesTestUtils.getXmlInt(element, "numRunningOpportContainers"),
-          WebServicesTestUtils.getXmlLong(element, "usedMemoryOpport"),
+          WebServicesTestUtils.getXmlLong(element, "usedMemoryOpportGB"),
           WebServicesTestUtils.getXmlInt(element, "usedVirtualCoresOpport"),
           WebServicesTestUtils.getXmlInt(element, "numQueuedContainers"));
     }
@@ -753,7 +753,7 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
         resourceInfo.getInt("aggregatedContainersVirtualMemoryMB"),
         resourceInfo.getDouble("containersCPUUsage"),
         nodeInfo.getInt("numRunningOpportContainers"),
-        nodeInfo.getLong("usedMemoryOpport"),
+        nodeInfo.getLong("usedMemoryOpportGB"),
         nodeInfo.getInt("usedVirtualCoresOpport"),
         nodeInfo.getInt("numQueuedContainers"));
   }
@@ -766,7 +766,7 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
       int nodePhysicalMemoryMB, int nodeVirtualMemoryMB, double nodeCPUUsage,
       int containersPhysicalMemoryMB, int containersVirtualMemoryMB,
       double containersCPUUsage, int numRunningOpportContainers,
-      long usedMemoryOpport, int usedVirtualCoresOpport,
+      long usedMemoryOpportGB, int usedVirtualCoresOpport,
       int numQueuedContainers)
       throws JSONException, Exception {
 
@@ -827,8 +827,8 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
               numRunningOpportContainers,
           opportunisticStatus.getRunningOpportContainers(),
           numRunningOpportContainers);
-      assertEquals("usedMemoryOpport doesn't match: " + usedMemoryOpport,
-          opportunisticStatus.getOpportMemoryUsed(), usedMemoryOpport);
+      assertEquals("usedMemoryOpportGB doesn't match: " + usedMemoryOpportGB,
+          opportunisticStatus.getOpportMemoryUsed(), usedMemoryOpportGB);
       assertEquals(
           "usedVirtualCoresOpport doesn't match: " + usedVirtualCoresOpport,
           opportunisticStatus.getOpportCoresUsed(), usedVirtualCoresOpport);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[6/8] hadoop git commit: YARN-4457. Cleanup unchecked types for EventHandler (templedf via rkanter)

Posted by xg...@apache.org.
YARN-4457. Cleanup unchecked types for EventHandler (templedf via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4b149a1e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4b149a1e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4b149a1e

Branch: refs/heads/YARN-5734
Commit: 4b149a1e7781a52c2979fa3d367e4bfb1c4ccfe7
Parents: a6410a5
Author: Robert Kanter <rk...@apache.org>
Authored: Fri Dec 9 17:28:25 2016 -0800
Committer: Robert Kanter <rk...@apache.org>
Committed: Fri Dec 9 17:28:25 2016 -0800

----------------------------------------------------------------------
 .../jobhistory/JobHistoryCopyService.java          |  5 +++--
 .../apache/hadoop/mapreduce/v2/app/AppContext.java |  4 ++--
 .../hadoop/mapreduce/v2/app/MRAppMaster.java       |  2 +-
 .../v2/app/speculate/DefaultSpeculator.java        |  3 ++-
 .../hadoop/mapred/TestLocalContainerLauncher.java  |  2 +-
 .../mapred/TestTaskAttemptFinishingMonitor.java    |  2 +-
 .../hadoop/mapred/TestTaskAttemptListenerImpl.java | 17 +++++++++++------
 .../hadoop/mapreduce/v2/app/MockAppContext.java    |  4 ++--
 .../hadoop/mapreduce/v2/app/MockEventHandler.java  |  6 +++---
 .../v2/app/TestCheckpointPreemptionPolicy.java     |  4 +++-
 .../v2/app/TestKillAMPreemptionPolicy.java         |  4 +++-
 .../mapreduce/v2/app/TestRuntimeEstimators.java    |  3 ++-
 .../v2/app/commit/TestCommitterEventHandler.java   |  2 +-
 .../v2/app/launcher/TestContainerLauncherImpl.java | 12 ++++++------
 .../v2/app/local/TestLocalContainerAllocator.java  |  4 ++--
 .../apache/hadoop/mapreduce/v2/hs/JobHistory.java  |  3 ++-
 hadoop-project/pom.xml                             |  1 +
 .../apache/hadoop/yarn/event/AsyncDispatcher.java  |  4 ++--
 .../org/apache/hadoop/yarn/event/Dispatcher.java   |  2 +-
 .../apache/hadoop/yarn/event/DrainDispatcher.java  |  6 +++---
 .../apache/hadoop/yarn/event/InlineDispatcher.java |  2 +-
 .../server/nodemanager/TestNodeStatusUpdater.java  |  4 +++-
 .../launcher/TestContainerLaunch.java              | 10 +++++-----
 .../logaggregation/TestAppLogAggregatorImpl.java   |  4 ++--
 .../logaggregation/TestLogAggregationService.java  |  2 +-
 .../resourcemanager/NMLivelinessMonitor.java       |  3 ++-
 .../ahs/RMApplicationHistoryWriter.java            |  2 +-
 .../metrics/AbstractSystemMetricsPublisher.java    |  2 +-
 .../rmapp/attempt/AMLivelinessMonitor.java         |  3 ++-
 .../resourcemanager/TestClientRMService.java       |  6 +++---
 .../hadoop/yarn/server/resourcemanager/TestRM.java |  7 ++++---
 .../yarn/server/resourcemanager/TestRMHA.java      |  2 +-
 ...ionalCapacityPreemptionPolicyMockFramework.java |  5 ++---
 .../TestProportionalCapacityPreemptionPolicy.java  |  3 ++-
 .../recovery/RMStateStoreTestBase.java             |  2 +-
 .../rmapp/TestNodesListManager.java                |  3 ++-
 .../scheduler/capacity/TestCapacityScheduler.java  |  2 +-
 .../scheduler/capacity/TestUtils.java              |  3 ++-
 .../scheduler/fifo/TestFifoScheduler.java          |  2 +-
 39 files changed, 89 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java
index 37efbe1..4bb1e38 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryCopyService.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
@@ -43,12 +44,12 @@ public class JobHistoryCopyService extends CompositeService implements HistoryEv
   private static final Log LOG = LogFactory.getLog(JobHistoryCopyService.class);
 
   private final ApplicationAttemptId applicationAttemptId;
-  private final EventHandler handler;
+  private final EventHandler<Event> handler;
   private final JobId jobId;
 
 
   public JobHistoryCopyService(ApplicationAttemptId applicationAttemptId, 
-      EventHandler handler) {
+      EventHandler<Event> handler) {
     super("JobHistoryCopyService");
     this.applicationAttemptId = applicationAttemptId;
     this.jobId =  TypeConverter.toYarn(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
index 4af11c3..ddf4fa7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
 import org.apache.hadoop.yarn.util.Clock;
@@ -51,8 +52,7 @@ public interface AppContext {
 
   Map<JobId, Job> getAllJobs();
 
-  @SuppressWarnings("rawtypes")
-  EventHandler getEventHandler();
+  EventHandler<Event> getEventHandler();
 
   Clock getClock();
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index 78c8bdd..835c0aa 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -1120,7 +1120,7 @@ public class MRAppMaster extends CompositeService {
     }
 
     @Override
-    public EventHandler getEventHandler() {
+    public EventHandler<Event> getEventHandler() {
       return dispatcher.getEventHandler();
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
index 07a49af..ed06493 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.util.Clock;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.event.Event;
 
 public class DefaultSpeculator extends AbstractService implements
     Speculator {
@@ -106,7 +107,7 @@ public class DefaultSpeculator extends AbstractService implements
 
   private final Clock clock;
 
-  private final EventHandler<TaskEvent> eventHandler;
+  private final EventHandler<Event> eventHandler;
 
   public DefaultSpeculator(Configuration conf, AppContext context) {
     this(conf, context, context.getClock());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java
index 28a8918..232fe6b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java
@@ -103,7 +103,7 @@ public class TestLocalContainerLauncher {
     AppContext context = mock(AppContext.class);
     // a simple event handler solely to detect the container cleaned event
     final CountDownLatch isDone = new CountDownLatch(1);
-    EventHandler handler = new EventHandler() {
+    EventHandler<Event> handler = new EventHandler<Event>() {
       @Override
       public void handle(Event event) {
         LOG.info("handling event " + event.getClass() +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java
index 521c2f4..b3cefc6 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java
@@ -91,7 +91,7 @@ public class TestTaskAttemptFinishingMonitor {
 
   }
 
-  public static class MockEventHandler implements EventHandler {
+  public static class MockEventHandler implements EventHandler<Event> {
     public boolean timedOut = false;
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
index 8547be4..fa8418a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
@@ -17,7 +17,6 @@
 */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapreduce.checkpoint.EnumCounter;
 
@@ -48,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPo
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -102,7 +102,8 @@ public class TestTaskAttemptListenerImpl {
         mock(RMHeartbeatHandler.class);
     TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
     Dispatcher dispatcher = mock(Dispatcher.class);
-    EventHandler ea = mock(EventHandler.class);
+    @SuppressWarnings("unchecked")
+    EventHandler<Event> ea = mock(EventHandler.class);
     when(dispatcher.getEventHandler()).thenReturn(ea);
 
     when(appCtx.getEventHandler()).thenReturn(ea);
@@ -212,7 +213,8 @@ public class TestTaskAttemptListenerImpl {
         mock(RMHeartbeatHandler.class);
     final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
     Dispatcher dispatcher = mock(Dispatcher.class);
-    EventHandler ea = mock(EventHandler.class);
+    @SuppressWarnings("unchecked")
+    EventHandler<Event> ea = mock(EventHandler.class);
     when(dispatcher.getEventHandler()).thenReturn(ea);
     when(appCtx.getEventHandler()).thenReturn(ea);
     CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
@@ -273,7 +275,8 @@ public class TestTaskAttemptListenerImpl {
         mock(RMHeartbeatHandler.class);
     final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
     Dispatcher dispatcher = mock(Dispatcher.class);
-    EventHandler ea = mock(EventHandler.class);
+    @SuppressWarnings("unchecked")
+    EventHandler<Event> ea = mock(EventHandler.class);
     when(dispatcher.getEventHandler()).thenReturn(ea);
     when(appCtx.getEventHandler()).thenReturn(ea);
     CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
@@ -318,7 +321,8 @@ public class TestTaskAttemptListenerImpl {
     when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
 
     Dispatcher dispatcher = mock(Dispatcher.class);
-    EventHandler ea = mock(EventHandler.class);
+    @SuppressWarnings("unchecked")
+    EventHandler<Event> ea = mock(EventHandler.class);
     when(dispatcher.getEventHandler()).thenReturn(ea);
 
     RMHeartbeatHandler rmHeartbeatHandler =
@@ -398,7 +402,8 @@ public class TestTaskAttemptListenerImpl {
         mock(RMHeartbeatHandler.class);
     TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
     Dispatcher dispatcher = mock(Dispatcher.class);
-    EventHandler ea = mock(EventHandler.class);
+    @SuppressWarnings("unchecked")
+    EventHandler<Event> ea = mock(EventHandler.class);
     when(dispatcher.getEventHandler()).thenReturn(ea);
 
     when(appCtx.getEventHandler()).thenReturn(ea);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
index e690f3f..4e31b63 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
 import org.apache.hadoop.yarn.util.Clock;
@@ -92,9 +93,8 @@ public class MockAppContext implements AppContext {
     return jobs; // OK
   }
 
-  @SuppressWarnings("rawtypes")
   @Override
-  public EventHandler getEventHandler() {
+  public EventHandler<Event> getEventHandler() {
     return new MockEventHandler();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockEventHandler.java
index 80b50e7..0bf6d2a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockEventHandler.java
@@ -18,11 +18,11 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 
-public class MockEventHandler implements EventHandler<TaskAttemptEvent> {
+public class MockEventHandler implements EventHandler<Event> {
   @Override
-  public void handle(TaskAttemptEvent event) {
+  public void handle(Event event) {
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java
index 9a5d3a5..5977816 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -85,7 +86,8 @@ public class TestCheckpointPreemptionPolicy {
     jid = MRBuilderUtils.newJobId(appId, 1);
 
     mActxt = mock(RunningAppContext.class);
-    EventHandler ea = mock(EventHandler.class);
+    @SuppressWarnings("unchecked")
+    EventHandler<Event> ea = mock(EventHandler.class);
     when(mActxt.getEventHandler()).thenReturn(ea);
     for (int i = 0; i < 40; ++i) {
       ContainerId cId = ContainerId.newContainerId(appAttemptId, i);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKillAMPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKillAMPreemptionPolicy.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKillAMPreemptionPolicy.java
index 647d527..09c7f02 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKillAMPreemptionPolicy.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKillAMPreemptionPolicy.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.PreemptionContainer;
 import org.apache.hadoop.yarn.api.records.PreemptionContract;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -112,7 +113,8 @@ public class TestKillAMPreemptionPolicy {
 
   private RunningAppContext getRunningAppContext() {
     RunningAppContext mActxt = mock(RunningAppContext.class);
-    EventHandler<?> eventHandler = mock(EventHandler.class);
+    @SuppressWarnings("unchecked")
+    EventHandler<Event> eventHandler = mock(EventHandler.class);
     when(mActxt.getEventHandler()).thenReturn(eventHandler);
     return mActxt;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
index 7f9a3b9..8c7f0db 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -835,7 +836,7 @@ public class TestRuntimeEstimators {
     }
 
     @Override
-    public EventHandler getEventHandler() {
+    public EventHandler<Event> getEventHandler() {
       return dispatcher.getEventHandler();
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java
index b099bcc..a620763 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java
@@ -73,7 +73,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestCommitterEventHandler {
-  public static class WaitForItHandler implements EventHandler {
+  public static class WaitForItHandler implements EventHandler<Event> {
 
     private Event event = null;
     

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
index 7eb74d5..225570c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
@@ -167,8 +167,8 @@ public class TestContainerLauncherImpl {
   public void testHandle() throws Exception {
     LOG.info("STARTING testHandle");
     AppContext mockContext = mock(AppContext.class);
-    @SuppressWarnings("rawtypes")
-    EventHandler mockEventHandler = mock(EventHandler.class);
+    @SuppressWarnings("unchecked")
+    EventHandler<Event> mockEventHandler = mock(EventHandler.class);
     when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
     String cmAddress = "127.0.0.1:8000";
     ContainerManagementProtocolClient mockCM =
@@ -228,8 +228,8 @@ public class TestContainerLauncherImpl {
   public void testOutOfOrder() throws Exception {
     LOG.info("STARTING testOutOfOrder");
     AppContext mockContext = mock(AppContext.class);
-    @SuppressWarnings("rawtypes")
-    EventHandler mockEventHandler = mock(EventHandler.class);
+    @SuppressWarnings("unchecked")
+    EventHandler<Event> mockEventHandler = mock(EventHandler.class);
     when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
 
     ContainerManagementProtocolClient mockCM =
@@ -290,8 +290,8 @@ public class TestContainerLauncherImpl {
     LOG.info("in test Shutdown");
 
     AppContext mockContext = mock(AppContext.class);
-    @SuppressWarnings("rawtypes")
-    EventHandler mockEventHandler = mock(EventHandler.class);
+    @SuppressWarnings("unchecked")
+    EventHandler<Event> mockEventHandler = mock(EventHandler.class);
     when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
 
     ContainerManagementProtocolClient mockCM =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
index 3fa0043..fc1969b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
@@ -247,8 +247,8 @@ public class TestLocalContainerAllocator {
       ApplicationAttemptId attemptId =
           ApplicationAttemptId.newInstance(appId, 1);
       Job job = mock(Job.class);
-      @SuppressWarnings("rawtypes")
-      EventHandler eventHandler = mock(EventHandler.class);
+      @SuppressWarnings("unchecked")
+      EventHandler<Event> eventHandler = mock(EventHandler.class);
       AppContext ctx = mock(AppContext.class);
       when(ctx.getApplicationID()).thenReturn(appId);
       when(ctx.getApplicationAttemptId()).thenReturn(attemptId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
index 45075c9..c5a40b2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -344,7 +345,7 @@ public class JobHistory extends AbstractService implements HistoryContext {
 
   // TODO AppContext - Not Required
   @Override
-  public EventHandler getEventHandler() {
+  public EventHandler<Event> getEventHandler() {
     // TODO Auto-generated method stub
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 1ae60ed..336231e 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -1570,6 +1570,7 @@
               <target>${javac.version}</target>
               <compilerArguments>
                 <Xlint/>
+				<Xlint:-unchecked/>
                 <Xmaxwarns>9999</Xmaxwarns>
               </compilerArguments>
             </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
index 94bfab6..a3d306f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
@@ -68,7 +68,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
   // For drainEventsOnStop enabled only, block newly coming events into the
   // queue while stopping.
   private volatile boolean blockNewEvents = false;
-  private final EventHandler handlerInstance = new GenericEventHandler();
+  private final EventHandler<Event> handlerInstance = new GenericEventHandler();
 
   private Thread eventHandlingThread;
   protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
@@ -228,7 +228,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
   }
 
   @Override
-  public EventHandler getEventHandler() {
+  public EventHandler<Event> getEventHandler() {
     return handlerInstance;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java
index 4f3801a..e2987de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java
@@ -40,7 +40,7 @@ public interface Dispatcher {
 
   public static final boolean DEFAULT_DISPATCHER_EXIT_ON_ERROR = false;
 
-  EventHandler getEventHandler();
+  EventHandler<Event> getEventHandler();
 
   void register(Class<? extends Enum> eventType, EventHandler handler);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
index c5ba072..29033a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
@@ -89,9 +89,9 @@ public class DrainDispatcher extends AsyncDispatcher {
 
   @SuppressWarnings("unchecked")
   @Override
-  public EventHandler getEventHandler() {
-    final EventHandler actual = super.getEventHandler();
-    return new EventHandler() {
+  public EventHandler<Event> getEventHandler() {
+    final EventHandler<Event> actual = super.getEventHandler();
+    return new EventHandler<Event>() {
       @Override
       public void handle(Event event) {
         synchronized (mutex) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/InlineDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/InlineDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/InlineDispatcher.java
index eb1aa9d..6aa56d8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/InlineDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/InlineDispatcher.java
@@ -45,7 +45,7 @@ public class InlineDispatcher extends AsyncDispatcher {
     }
   }
   @Override
-  public EventHandler getEventHandler() {
+  public EventHandler<Event> getEventHandler() {
     return new TestEventHandler();
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 24bd02c..0b93c72 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.client.RMProxy;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -223,7 +224,8 @@ public class TestNodeStatusUpdater {
       LOG.info("Got heartbeat number " + heartBeatID);
       NodeManagerMetrics mockMetrics = mock(NodeManagerMetrics.class);
       Dispatcher mockDispatcher = mock(Dispatcher.class);
-      EventHandler mockEventHandler = mock(EventHandler.class);
+      @SuppressWarnings("unchecked")
+      EventHandler<Event> mockEventHandler = mock(EventHandler.class);
       when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler);
       NMStateStoreService stateStore = new NMNullStateStoreService();
       nodeStatus.setResponseId(heartBeatID++);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
index be6eadb..4ce816a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
@@ -438,7 +438,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
     when(container.getLaunchContext()).thenReturn(containerLaunchContext);
     when(container.getLocalizedResources()).thenReturn(null);
     Dispatcher dispatcher = mock(Dispatcher.class);
-    EventHandler eventHandler = new EventHandler() {
+    EventHandler<Event> eventHandler = new EventHandler<Event>() {
       public void handle(Event event) {
         Assert.assertTrue(event instanceof ContainerExitEvent);
         ContainerExitEvent exitEvent = (ContainerExitEvent) event;
@@ -602,8 +602,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
         eventHandler.isContainerExitEventOccured());
   }
 
-  private static class ContainerExitHandler
-      implements EventHandler<ContainerEvent> {
+  private static class ContainerExitHandler implements EventHandler<Event> {
     private boolean testForMultiFile;
 
     ContainerExitHandler(boolean testForMultiFile) {
@@ -616,7 +615,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
       return containerExitEventOccured;
     }
 
-    public void handle(ContainerEvent event) {
+    public void handle(Event event) {
       if (event instanceof ContainerExitEvent) {
         containerExitEventOccured = true;
         ContainerExitEvent exitEvent = (ContainerExitEvent) event;
@@ -1064,7 +1063,8 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
     when(container.getLaunchContext()).thenReturn(clc);
     when(container.getLocalizedResources()).thenReturn(null);
     Dispatcher dispatcher = mock(Dispatcher.class);
-    EventHandler eventHandler = new EventHandler() {
+    EventHandler<Event> eventHandler = new EventHandler<Event>() {
+      @Override
       public void handle(Event event) {
         Assert.assertTrue(event instanceof ContainerExitEvent);
         ContainerExitEvent exitEvent = (ContainerExitEvent) event;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
index 88d9688..2602d55 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
@@ -343,8 +343,8 @@ public class TestAppLogAggregatorImpl {
   private static Dispatcher createNullDispatcher() {
     return new Dispatcher() {
       @Override
-      public EventHandler getEventHandler() {
-        return new EventHandler() {
+      public EventHandler<Event> getEventHandler() {
+        return new EventHandler<Event>() {
           @Override
           public void handle(Event event) {
             // do nothing

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 4e9829a..bc1b4b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -157,7 +157,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
   }
   
   DrainDispatcher dispatcher;
-  EventHandler<ApplicationEvent> appEventHandler;
+  EventHandler<Event> appEventHandler;
 
   @Override
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NMLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NMLivelinessMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NMLivelinessMonitor.java
index 000cd68..a451d42 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NMLivelinessMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NMLivelinessMonitor.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -29,7 +30,7 @@ import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
 
 public class NMLivelinessMonitor extends AbstractLivelinessMonitor<NodeId> {
 
-  private EventHandler dispatcher;
+  private EventHandler<Event> dispatcher;
   
   public NMLivelinessMonitor(Dispatcher d) {
     super("NMLivelinessMonitor");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java
index bd328ab..6a15465 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java
@@ -323,7 +323,7 @@ public class RMApplicationHistoryWriter extends CompositeService {
     }
 
     @Override
-    public EventHandler getEventHandler() {
+    public EventHandler<Event> getEventHandler() {
       return new CompositEventHandler();
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractSystemMetricsPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractSystemMetricsPublisher.java
index d4a4fc3..db2b0af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractSystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractSystemMetricsPublisher.java
@@ -79,7 +79,7 @@ public abstract class AbstractSystemMetricsPublisher extends CompositeService
     }
 
     @Override
-    public EventHandler getEventHandler() {
+    public EventHandler<Event> getEventHandler() {
       return new CompositEventHandler();
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java
index b646097..7006e50 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/AMLivelinessMonitor.java
@@ -22,13 +22,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
 import org.apache.hadoop.yarn.util.Clock;
 
 public class AMLivelinessMonitor extends AbstractLivelinessMonitor<ApplicationAttemptId> {
 
-  private EventHandler dispatcher;
+  private EventHandler<Event> dispatcher;
   
   public AMLivelinessMonitor(Dispatcher d) {
     super("AMLivelinessMonitor");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index 12cdcf1..cb57f39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -892,8 +892,7 @@ public class TestClientRMService {
     final CyclicBarrier startBarrier = new CyclicBarrier(2);
     final CyclicBarrier endBarrier = new CyclicBarrier(2);
 
-    @SuppressWarnings("rawtypes")
-    EventHandler eventHandler = new EventHandler() {
+    EventHandler<Event> eventHandler = new EventHandler<Event>() {
       @Override
       public void handle(Event rawEvent) {
         if (rawEvent instanceof RMAppEvent) {
@@ -980,7 +979,8 @@ public class TestClientRMService {
       throws IOException {
     Dispatcher dispatcher = mock(Dispatcher.class);
     when(rmContext.getDispatcher()).thenReturn(dispatcher);
-    EventHandler eventHandler = mock(EventHandler.class);
+    @SuppressWarnings("unchecked")
+    EventHandler<Event> eventHandler = mock(EventHandler.class);
     when(dispatcher.getEventHandler()).thenReturn(eventHandler);
     QueueInfo queInfo = recordFactory.newRecordInstance(QueueInfo.class);
     queInfo.setQueueName("testqueue");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
index d84c77d..cdf582e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -561,7 +562,7 @@ public class TestRM extends ParameterizedSchedulerTestBase {
 
     final Dispatcher dispatcher = new DrainDispatcher() {
       @Override
-      public EventHandler getEventHandler() {
+      public EventHandler<Event> getEventHandler() {
 
         class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
           @Override
@@ -642,7 +643,7 @@ public class TestRM extends ParameterizedSchedulerTestBase {
     // this dispatcher ignores RMAppAttemptEventType.KILL event
     final Dispatcher dispatcher = new DrainDispatcher() {
       @Override
-      public EventHandler getEventHandler() {
+      public EventHandler<Event> getEventHandler() {
 
         class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
           @Override
@@ -696,7 +697,7 @@ public class TestRM extends ParameterizedSchedulerTestBase {
     // this dispatcher ignores RMAppAttemptEventType.KILL event
     final Dispatcher dispatcher = new DrainDispatcher() {
       @Override
-      public EventHandler getEventHandler() {
+      public EventHandler<Event> getEventHandler() {
 
         class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
           @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
index 5114329..15b8f04 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
@@ -761,7 +761,7 @@ public class TestRMHA {
     }
 
     @Override
-    public EventHandler getEventHandler() {
+    public EventHandler<Event> getEventHandler() {
       return null;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
index 0281c19..8663315 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preempti
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -68,9 +67,9 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hadoop.yarn.event.Event;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
@@ -95,7 +94,7 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
   Clock mClock = null;
   CapacitySchedulerConfiguration conf = null;
   CapacityScheduler cs = null;
-  EventHandler<SchedulerEvent> mDisp = null;
+  EventHandler<Event> mDisp = null;
   ProportionalCapacityPreemptionPolicy policy = null;
   Resource clusterResource = null;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 881004c..76d93ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -106,7 +107,7 @@ public class TestProportionalCapacityPreemptionPolicy {
   CapacityScheduler mCS = null;
   RMContext rmContext = null;
   RMNodeLabelsManager lm = null;
-  EventHandler<SchedulerEvent> mDisp = null;
+  EventHandler<Event> mDisp = null;
   ResourceCalculator rc = new DefaultResourceCalculator();
   Resource clusterResources = null;
   final ApplicationAttemptId appA = ApplicationAttemptId.newInstance(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 8544c13..514e9a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -117,7 +117,7 @@ public class RMStateStoreTestBase {
 
     @SuppressWarnings("rawtypes")
     @Override
-    public EventHandler getEventHandler() {
+    public EventHandler<Event> getEventHandler() {
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
index 8812ffe..e41bbc80 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -238,7 +239,7 @@ public class TestNodesListManager {
     Dispatcher dispatcher = new DrainDispatcher() {
       @SuppressWarnings({ "rawtypes", "unchecked" })
       @Override
-      public EventHandler getEventHandler() {
+      public EventHandler<Event> getEventHandler() {
 
         class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
           @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 0aeedce..3a88cff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -3348,7 +3348,7 @@ public class TestCapacityScheduler {
     // to have 0 available resource
     RMContext spyContext = Mockito.spy(resourceManager.getRMContext());
     Dispatcher mockDispatcher = mock(AsyncDispatcher.class);
-    when(mockDispatcher.getEventHandler()).thenReturn(new EventHandler() {
+    when(mockDispatcher.getEventHandler()).thenReturn(new EventHandler<Event>() {
       @Override
       public void handle(Event event) {
         if (event instanceof RMNodeResourceUpdateEvent) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index b982fab..93360b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -67,6 +67,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import com.google.common.collect.Sets;
+import org.apache.hadoop.yarn.event.Event;
 
 public class TestUtils {
   private static final Log LOG = LogFactory.getLog(TestUtils.class);
@@ -90,7 +91,7 @@ public class TestUtils {
           EventHandler handler) {
       }
       @Override
-      public EventHandler getEventHandler() {
+      public EventHandler<Event> getEventHandler() {
         return handler; 
       }
     };

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b149a1e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
index 028bcb9..bfbc7bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
@@ -1180,7 +1180,7 @@ public class TestFifoScheduler {
     // to have 0 available resource
     RMContext spyContext = Mockito.spy(resourceManager.getRMContext());
     Dispatcher mockDispatcher = mock(AsyncDispatcher.class);
-    when(mockDispatcher.getEventHandler()).thenReturn(new EventHandler() {
+    when(mockDispatcher.getEventHandler()).thenReturn(new EventHandler<Event>() {
       @Override
       public void handle(Event event) {
         if (event instanceof RMNodeResourceUpdateEvent) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[5/8] hadoop git commit: YARN-5709. Cleanup leader election configs and pluggability. Contribtued by Karthik Kambatla

Posted by xg...@apache.org.
YARN-5709. Cleanup leader election configs and pluggability. Contribtued by Karthik Kambatla


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a6410a54
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a6410a54
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a6410a54

Branch: refs/heads/YARN-5734
Commit: a6410a542e59acd9827457df4a257a843f785c29
Parents: b0aace2
Author: Jian He <ji...@apache.org>
Authored: Fri Dec 9 16:38:49 2016 -0800
Committer: Jian He <ji...@apache.org>
Committed: Fri Dec 9 17:00:37 2016 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  14 +-
 ...ActiveStandbyElectorBasedElectorService.java | 271 +++++++++++++++++++
 .../server/resourcemanager/AdminService.java    |  75 +----
 .../CuratorBasedElectorService.java             | 139 ++++++++++
 .../server/resourcemanager/EmbeddedElector.java |  41 +++
 .../resourcemanager/EmbeddedElectorService.java | 260 ------------------
 .../resourcemanager/LeaderElectorService.java   | 129 ---------
 .../yarn/server/resourcemanager/RMContext.java  |   6 +-
 .../server/resourcemanager/RMContextImpl.java   |  15 +-
 .../server/resourcemanager/ResourceManager.java |  39 ++-
 .../server/resourcemanager/webapp/RMWebApp.java |   3 +-
 .../resourcemanager/webapp/dao/ClusterInfo.java |   2 +-
 .../yarn/server/resourcemanager/MockRM.java     |  33 ++-
 .../server/resourcemanager/RMHATestBase.java    |   4 +-
 .../TestLeaderElectorService.java               |  17 +-
 .../resourcemanager/TestRMEmbeddedElector.java  |  49 ++--
 .../yarn/server/resourcemanager/TestRMHA.java   |  39 +--
 17 files changed, 605 insertions(+), 531 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 69c7b00..dc7c629 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ActiveStandbyElector;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -654,9 +655,20 @@ public class YarnConfiguration extends Configuration {
   public static final String RM_HA_FC_ELECTOR_ZK_RETRIES_KEY = RM_HA_PREFIX
       + "failover-controller.active-standby-elector.zk.retries";
 
-  @Private
+
+  /**
+   * Whether to use curator-based elector for leader election.
+   *
+   * @deprecated Eventually, we want to default to the curator-based
+   * implementation and remove the {@link ActiveStandbyElector} based
+   * implementation. We should remove this config then.
+   */
+  @Unstable
+  @Deprecated
   public static final String CURATOR_LEADER_ELECTOR =
       RM_HA_PREFIX + "curator-leader-elector.enabled";
+  @Private
+  @Unstable
   public static final boolean DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED = false;
 
   ////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
new file mode 100644
index 0000000..751eedd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.ActiveStandbyElector;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.ServiceFailedException;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ZKUtil;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.ACL;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * Leader election implementation that uses {@link ActiveStandbyElector}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ActiveStandbyElectorBasedElectorService extends AbstractService
+    implements EmbeddedElector,
+    ActiveStandbyElector.ActiveStandbyElectorCallback {
+  private static final Log LOG = LogFactory.getLog(
+      ActiveStandbyElectorBasedElectorService.class.getName());
+  private static final HAServiceProtocol.StateChangeRequestInfo req =
+      new HAServiceProtocol.StateChangeRequestInfo(
+          HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC);
+
+  private RMContext rmContext;
+
+  private byte[] localActiveNodeInfo;
+  private ActiveStandbyElector elector;
+  private long zkSessionTimeout;
+  private Timer zkDisconnectTimer;
+  @VisibleForTesting
+  final Object zkDisconnectLock = new Object();
+
+  ActiveStandbyElectorBasedElectorService(RMContext rmContext) {
+    super(ActiveStandbyElectorBasedElectorService.class.getName());
+    this.rmContext = rmContext;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf)
+      throws Exception {
+    conf = conf instanceof YarnConfiguration
+        ? conf
+        : new YarnConfiguration(conf);
+
+    String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
+    if (zkQuorum == null) {
+      throw new YarnRuntimeException("Embedded automatic failover " +
+          "is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS +
+          " is not set");
+    }
+
+    String rmId = HAUtil.getRMHAId(conf);
+    String clusterId = YarnConfiguration.getClusterId(conf);
+    localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);
+
+    String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
+        YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
+    String electionZNode = zkBasePath + "/" + clusterId;
+
+    zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
+        YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
+
+    List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
+    List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
+
+    int maxRetryNum =
+        conf.getInt(YarnConfiguration.RM_HA_FC_ELECTOR_ZK_RETRIES_KEY, conf
+          .getInt(CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
+            CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT));
+    elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
+        electionZNode, zkAcls, zkAuths, this, maxRetryNum, false);
+
+    elector.ensureParentZNode();
+    if (!isParentZnodeSafe(clusterId)) {
+      notifyFatalError(electionZNode + " znode has invalid data! "+
+          "Might need formatting!");
+    }
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    elector.joinElection(localActiveNodeInfo);
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    /**
+     * When error occurs in serviceInit(), serviceStop() can be called.
+     * We need null check for the case.
+     */
+    if (elector != null) {
+      elector.quitElection(false);
+      elector.terminateConnection();
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public void becomeActive() throws ServiceFailedException {
+    cancelDisconnectTimer();
+
+    try {
+      rmContext.getRMAdminService().transitionToActive(req);
+    } catch (Exception e) {
+      throw new ServiceFailedException("RM could not transition to Active", e);
+    }
+  }
+
+  @Override
+  public void becomeStandby() {
+    cancelDisconnectTimer();
+
+    try {
+      rmContext.getRMAdminService().transitionToStandby(req);
+    } catch (Exception e) {
+      LOG.error("RM could not transition to Standby", e);
+    }
+  }
+
+  /**
+   * Stop the disconnect timer.  Any running tasks will be allowed to complete.
+   */
+  private void cancelDisconnectTimer() {
+    synchronized (zkDisconnectLock) {
+      if (zkDisconnectTimer != null) {
+        zkDisconnectTimer.cancel();
+        zkDisconnectTimer = null;
+      }
+    }
+  }
+
+  /**
+   * When the ZK client loses contact with ZK, this method will be called to
+   * allow the RM to react. Because the loss of connection can be noticed
+   * before the session timeout happens, it is undesirable to transition
+   * immediately. Instead the method starts a timer that will wait
+   * {@link YarnConfiguration#RM_ZK_TIMEOUT_MS} milliseconds before
+   * initiating the transition into standby state.
+   */
+  @Override
+  public void enterNeutralMode() {
+    LOG.warn("Lost contact with Zookeeper. Transitioning to standby in "
+        + zkSessionTimeout + " ms if connection is not reestablished.");
+
+    // If we've just become disconnected, start a timer.  When the time's up,
+    // we'll transition to standby.
+    synchronized (zkDisconnectLock) {
+      if (zkDisconnectTimer == null) {
+        zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
+        zkDisconnectTimer.schedule(new TimerTask() {
+          @Override
+          public void run() {
+            synchronized (zkDisconnectLock) {
+              // Only run if the timer hasn't been cancelled
+              if (zkDisconnectTimer != null) {
+                becomeStandby();
+              }
+            }
+          }
+        }, zkSessionTimeout);
+      }
+    }
+  }
+
+  @SuppressWarnings(value = "unchecked")
+  @Override
+  public void notifyFatalError(String errorMessage) {
+    rmContext.getDispatcher().getEventHandler().handle(
+        new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED,
+            errorMessage));
+  }
+
+  @Override
+  public void fenceOldActive(byte[] oldActiveData) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Request to fence old active being ignored, " +
+          "as embedded leader election doesn't support fencing");
+    }
+  }
+
+  private static byte[] createActiveNodeInfo(String clusterId, String rmId)
+      throws IOException {
+    return YarnServerResourceManagerServiceProtos.ActiveRMInfoProto
+        .newBuilder()
+        .setClusterId(clusterId)
+        .setRmId(rmId)
+        .build()
+        .toByteArray();
+  }
+
+  private boolean isParentZnodeSafe(String clusterId)
+      throws InterruptedException, IOException, KeeperException {
+    byte[] data;
+    try {
+      data = elector.getActiveData();
+    } catch (ActiveStandbyElector.ActiveNotFoundException e) {
+      // no active found, parent znode is safe
+      return true;
+    }
+
+    YarnServerResourceManagerServiceProtos.ActiveRMInfoProto proto;
+    try {
+      proto = YarnServerResourceManagerServiceProtos.ActiveRMInfoProto
+          .parseFrom(data);
+    } catch (InvalidProtocolBufferException e) {
+      LOG.error("Invalid data in ZK: " + StringUtils.byteToHexString(data));
+      return false;
+    }
+
+    // Check if the passed proto corresponds to an RM in the same cluster
+    if (!proto.getClusterId().equals(clusterId)) {
+      LOG.error("Mismatched cluster! The other RM seems " +
+          "to be from a different cluster. Current cluster = " + clusterId +
+          "Other RM's cluster = " + proto.getClusterId());
+      return false;
+    }
+    return true;
+  }
+
+  // EmbeddedElector methods
+
+  @Override
+  public void rejoinElection() {
+    elector.quitElection(false);
+    elector.joinElection(localActiveNodeInfo);
+  }
+
+  @Override
+  public String getZookeeperConnectionState() {
+    return elector.getHAZookeeperConnectionState();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index c060659..028b6f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -29,7 +29,6 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ha.HAServiceProtocol;
@@ -108,8 +107,6 @@ public class AdminService extends CompositeService implements
   private String rmId;
 
   private boolean autoFailoverEnabled;
-  private boolean curatorEnabled;
-  private EmbeddedElectorService embeddedElector;
 
   private Server server;
 
@@ -134,18 +131,8 @@ public class AdminService extends CompositeService implements
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
-    if (rmContext.isHAEnabled()) {
-      curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
-          YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
-      autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
-      if (autoFailoverEnabled && !curatorEnabled) {
-        if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
-          embeddedElector = createEmbeddedElectorService();
-          addIfService(embeddedElector);
-        }
-      }
-
-    }
+    autoFailoverEnabled =
+        rmContext.isHAEnabled() && HAUtil.isAutomaticFailoverEnabled(conf);
 
     masterServiceBindAddress = conf.getSocketAddr(
         YarnConfiguration.RM_BIND_HOST,
@@ -228,17 +215,6 @@ public class AdminService extends CompositeService implements
     }
   }
 
-  protected EmbeddedElectorService createEmbeddedElectorService() {
-    return new EmbeddedElectorService(rmContext);
-  }
-
-  @InterfaceAudience.Private
-  void resetLeaderElection() {
-    if (embeddedElector != null) {
-      embeddedElector.resetLeaderElection();
-    }
-  }
-
   private UserGroupInformation checkAccess(String method) throws IOException {
     return RMServerUtils.verifyAdminAccess(authorizer, method, LOG);
   }
@@ -375,30 +351,24 @@ public class AdminService extends CompositeService implements
     }
   }
 
+  /**
+   * Return the HA status of this RM. This includes the current state and
+   * whether the RM is ready to become active.
+   *
+   * @return {@link HAServiceStatus} of the current RM
+   * @throws IOException if the caller does not have permissions
+   */
   @Override
   public synchronized HAServiceStatus getServiceStatus() throws IOException {
     checkAccess("getServiceState");
-    if (curatorEnabled) {
-      HAServiceStatus state;
-      if (rmContext.getLeaderElectorService().hasLeaderShip()) {
-        state = new HAServiceStatus(HAServiceState.ACTIVE);
-      } else {
-        state = new HAServiceStatus(HAServiceState.STANDBY);
-      }
-      // set empty string to avoid NPE at
-      // HAServiceProtocolServerSideTranslatorPB#getServiceStatus
-      state.setNotReadyToBecomeActive("");
-      return state;
+    HAServiceState haState = rmContext.getHAServiceState();
+    HAServiceStatus ret = new HAServiceStatus(haState);
+    if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
+      ret.setReadyToBecomeActive();
     } else {
-      HAServiceState haState = rmContext.getHAServiceState();
-      HAServiceStatus ret = new HAServiceStatus(haState);
-      if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
-        ret.setReadyToBecomeActive();
-      } else {
-        ret.setNotReadyToBecomeActive("State is " + haState);
-      }
-      return ret;
+      ret.setNotReadyToBecomeActive("State is " + haState);
     }
+    return ret;
   }
 
   @Override
@@ -926,19 +896,4 @@ public class AdminService extends CompositeService implements
 
     rmContext.getScheduler().setClusterMaxPriority(conf);
   }
-
-  public String getHAZookeeperConnectionState() {
-    if (!rmContext.isHAEnabled()) {
-      return "ResourceManager HA is not enabled.";
-    } else if (!autoFailoverEnabled) {
-      return "Auto Failover is not enabled.";
-    }
-    if (curatorEnabled) {
-      return "Connected to zookeeper : " + rmContext
-          .getLeaderElectorService().getCuratorClient().getZookeeperClient()
-          .isConnected();
-    } else {
-      return this.embeddedElector.getHAZookeeperConnectionState();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
new file mode 100644
index 0000000..bcdf48b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import java.io.IOException;
+
+/**
+ * Leader election implementation that uses Curator.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class CuratorBasedElectorService extends AbstractService
+    implements EmbeddedElector, LeaderLatchListener {
+  public static final Log LOG =
+      LogFactory.getLog(CuratorBasedElectorService.class);
+  private LeaderLatch leaderLatch;
+  private CuratorFramework curator;
+  private RMContext rmContext;
+  private String latchPath;
+  private String rmId;
+  private ResourceManager rm;
+
+  public CuratorBasedElectorService(RMContext rmContext, ResourceManager rm) {
+    super(CuratorBasedElectorService.class.getName());
+    this.rmContext = rmContext;
+    this.rm = rm;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    rmId = HAUtil.getRMHAId(conf);
+    String clusterId = YarnConfiguration.getClusterId(conf);
+    String zkBasePath = conf.get(
+        YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
+        YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
+    latchPath = zkBasePath + "/" + clusterId;
+    curator = rm.getCurator();
+    initAndStartLeaderLatch();
+    super.serviceInit(conf);
+  }
+
+  private void initAndStartLeaderLatch() throws Exception {
+    leaderLatch = new LeaderLatch(curator, latchPath, rmId);
+    leaderLatch.addListener(this);
+    leaderLatch.start();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    closeLeaderLatch();
+    super.serviceStop();
+  }
+
+  @Override
+  public void rejoinElection() {
+    try {
+      closeLeaderLatch();
+      Thread.sleep(1000);
+      initAndStartLeaderLatch();
+    } catch (Exception e) {
+      LOG.info("Fail to re-join election.", e);
+    }
+  }
+
+  @Override
+  public String getZookeeperConnectionState() {
+    return "Connected to zookeeper : " +
+        curator.getZookeeperClient().isConnected();
+  }
+
+  @Override
+  public void isLeader() {
+    LOG.info(rmId + "is elected leader, transitioning to active");
+    try {
+      rmContext.getRMAdminService().transitionToActive(
+          new HAServiceProtocol.StateChangeRequestInfo(
+              HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
+    } catch (Exception e) {
+      LOG.info(rmId + " failed to transition to active, giving up leadership",
+          e);
+      notLeader();
+      rejoinElection();
+    }
+  }
+
+  private void closeLeaderLatch() throws IOException {
+    if (leaderLatch != null) {
+      leaderLatch.close();
+    }
+  }
+
+  @Override
+  public void notLeader() {
+    LOG.info(rmId + " relinquish leadership");
+    try {
+      rmContext.getRMAdminService().transitionToStandby(
+          new HAServiceProtocol.StateChangeRequestInfo(
+              HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
+    } catch (Exception e) {
+      LOG.info(rmId + " did not transition to standby successfully.");
+    }
+  }
+
+  // only for testing
+  @VisibleForTesting
+  public CuratorFramework getCuratorClient() {
+    return this.curator;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElector.java
new file mode 100644
index 0000000..677ec85
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElector.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.Service;
+
+/**
+ * Interface that all embedded leader electors must implement.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface EmbeddedElector extends Service{
+  /**
+   * Leave and rejoin leader election.
+   */
+  void rejoinElection();
+
+  /**
+   * Get information about the elector's connection to Zookeeper.
+   *
+   * @return zookeeper connection state
+   */
+  String getZookeeperConnectionState();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java
deleted file mode 100644
index 88d2e10..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.ha.ActiveStandbyElector;
-import org.apache.hadoop.ha.HAServiceProtocol;
-import org.apache.hadoop.ha.ServiceFailedException;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ZKUtil;
-import org.apache.hadoop.yarn.conf.HAUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.ACL;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class EmbeddedElectorService extends AbstractService
-    implements ActiveStandbyElector.ActiveStandbyElectorCallback {
-  private static final Log LOG =
-      LogFactory.getLog(EmbeddedElectorService.class.getName());
-  private static final HAServiceProtocol.StateChangeRequestInfo req =
-      new HAServiceProtocol.StateChangeRequestInfo(
-          HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC);
-
-  private RMContext rmContext;
-
-  private byte[] localActiveNodeInfo;
-  private ActiveStandbyElector elector;
-  private long zkSessionTimeout;
-  private Timer zkDisconnectTimer;
-  @VisibleForTesting
-  final Object zkDisconnectLock = new Object();
-
-  EmbeddedElectorService(RMContext rmContext) {
-    super(EmbeddedElectorService.class.getName());
-    this.rmContext = rmContext;
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf)
-      throws Exception {
-    conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf);
-
-    String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
-    if (zkQuorum == null) {
-     throw new YarnRuntimeException("Embedded automatic failover " +
-          "is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS +
-          " is not set");
-    }
-
-    String rmId = HAUtil.getRMHAId(conf);
-    String clusterId = YarnConfiguration.getClusterId(conf);
-    localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);
-
-    String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
-        YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
-    String electionZNode = zkBasePath + "/" + clusterId;
-
-    zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
-        YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
-
-    List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
-    List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
-
-    int maxRetryNum =
-        conf.getInt(YarnConfiguration.RM_HA_FC_ELECTOR_ZK_RETRIES_KEY, conf
-          .getInt(CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
-            CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT));
-    elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
-        electionZNode, zkAcls, zkAuths, this, maxRetryNum, false);
-
-    elector.ensureParentZNode();
-    if (!isParentZnodeSafe(clusterId)) {
-      notifyFatalError(electionZNode + " znode has invalid data! "+
-          "Might need formatting!");
-    }
-
-    super.serviceInit(conf);
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    elector.joinElection(localActiveNodeInfo);
-    super.serviceStart();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    /**
-     * When error occurs in serviceInit(), serviceStop() can be called.
-     * We need null check for the case.
-     */
-    if (elector != null) {
-      elector.quitElection(false);
-      elector.terminateConnection();
-    }
-    super.serviceStop();
-  }
-
-  @Override
-  public void becomeActive() throws ServiceFailedException {
-    cancelDisconnectTimer();
-
-    try {
-      rmContext.getRMAdminService().transitionToActive(req);
-    } catch (Exception e) {
-      throw new ServiceFailedException("RM could not transition to Active", e);
-    }
-  }
-
-  @Override
-  public void becomeStandby() {
-    cancelDisconnectTimer();
-
-    try {
-      rmContext.getRMAdminService().transitionToStandby(req);
-    } catch (Exception e) {
-      LOG.error("RM could not transition to Standby", e);
-    }
-  }
-
-  /**
-   * Stop the disconnect timer.  Any running tasks will be allowed to complete.
-   */
-  private void cancelDisconnectTimer() {
-    synchronized (zkDisconnectLock) {
-      if (zkDisconnectTimer != null) {
-        zkDisconnectTimer.cancel();
-        zkDisconnectTimer = null;
-      }
-    }
-  }
-
-  /**
-   * When the ZK client loses contact with ZK, this method will be called to
-   * allow the RM to react. Because the loss of connection can be noticed
-   * before the session timeout happens, it is undesirable to transition
-   * immediately. Instead the method starts a timer that will wait
-   * {@link YarnConfiguration#RM_ZK_TIMEOUT_MS} milliseconds before
-   * initiating the transition into standby state.
-   */
-  @Override
-  public void enterNeutralMode() {
-    LOG.warn("Lost contact with Zookeeper. Transitioning to standby in "
-        + zkSessionTimeout + " ms if connection is not reestablished.");
-
-    // If we've just become disconnected, start a timer.  When the time's up,
-    // we'll transition to standby.
-    synchronized (zkDisconnectLock) {
-      if (zkDisconnectTimer == null) {
-        zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
-        zkDisconnectTimer.schedule(new TimerTask() {
-          @Override
-          public void run() {
-            synchronized (zkDisconnectLock) {
-              // Only run if the timer hasn't been cancelled
-              if (zkDisconnectTimer != null) {
-                becomeStandby();
-              }
-            }
-          }
-        }, zkSessionTimeout);
-      }
-    }
-  }
-
-  @SuppressWarnings(value = "unchecked")
-  @Override
-  public void notifyFatalError(String errorMessage) {
-    rmContext.getDispatcher().getEventHandler().handle(
-        new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, errorMessage));
-  }
-
-  @Override
-  public void fenceOldActive(byte[] oldActiveData) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Request to fence old active being ignored, " +
-          "as embedded leader election doesn't support fencing");
-    }
-  }
-
-  private static byte[] createActiveNodeInfo(String clusterId, String rmId)
-      throws IOException {
-    return YarnServerResourceManagerServiceProtos.ActiveRMInfoProto
-        .newBuilder()
-        .setClusterId(clusterId)
-        .setRmId(rmId)
-        .build()
-        .toByteArray();
-  }
-
-  private boolean isParentZnodeSafe(String clusterId)
-      throws InterruptedException, IOException, KeeperException {
-    byte[] data;
-    try {
-      data = elector.getActiveData();
-    } catch (ActiveStandbyElector.ActiveNotFoundException e) {
-      // no active found, parent znode is safe
-      return true;
-    }
-
-    YarnServerResourceManagerServiceProtos.ActiveRMInfoProto proto;
-    try {
-      proto = YarnServerResourceManagerServiceProtos.ActiveRMInfoProto
-          .parseFrom(data);
-    } catch (InvalidProtocolBufferException e) {
-      LOG.error("Invalid data in ZK: " + StringUtils.byteToHexString(data));
-      return false;
-    }
-
-    // Check if the passed proto corresponds to an RM in the same cluster
-    if (!proto.getClusterId().equals(clusterId)) {
-      LOG.error("Mismatched cluster! The other RM seems " +
-          "to be from a different cluster. Current cluster = " + clusterId +
-          "Other RM's cluster = " + proto.getClusterId());
-      return false;
-    }
-    return true;
-  }
-
-  public void resetLeaderElection() {
-    elector.quitElection(false);
-    elector.joinElection(localActiveNodeInfo);
-  }
-
-  public String getHAZookeeperConnectionState() {
-    return elector.getHAZookeeperConnectionState();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
deleted file mode 100644
index 8c1a6eb..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ha.HAServiceProtocol;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.conf.HAUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-import java.io.IOException;
-
-
-public class LeaderElectorService extends AbstractService implements
-    LeaderLatchListener {
-  public static final Log LOG = LogFactory.getLog(LeaderElectorService.class);
-  private LeaderLatch leaderLatch;
-  private CuratorFramework curator;
-  private RMContext rmContext;
-  private String latchPath;
-  private String rmId;
-  private ResourceManager rm;
-
-  public LeaderElectorService(RMContext rmContext, ResourceManager rm) {
-    super(LeaderElectorService.class.getName());
-    this.rmContext = rmContext;
-    this.rm = rm;
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    rmId = HAUtil.getRMHAId(conf);
-    String clusterId = YarnConfiguration.getClusterId(conf);
-    String zkBasePath = conf.get(
-        YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
-        YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
-    latchPath = zkBasePath + "/" + clusterId;
-    curator = rm.getCurator();
-    initAndStartLeaderLatch();
-    super.serviceInit(conf);
-  }
-
-  private void initAndStartLeaderLatch() throws Exception {
-    leaderLatch = new LeaderLatch(curator, latchPath, rmId);
-    leaderLatch.addListener(this);
-    leaderLatch.start();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    closeLeaderLatch();
-    super.serviceStop();
-  }
-
-  public boolean hasLeaderShip() {
-    return leaderLatch.hasLeadership();
-  }
-
-
-  @Override
-  public void isLeader() {
-    LOG.info(rmId + "is elected leader, transitioning to active");
-    try {
-      rmContext.getRMAdminService().transitionToActive(
-          new HAServiceProtocol.StateChangeRequestInfo(
-              HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
-    } catch (Exception e) {
-      LOG.info(rmId + " failed to transition to active, giving up leadership",
-          e);
-      notLeader();
-      reJoinElection();
-    }
-  }
-
-  public void reJoinElection() {
-    try {
-      closeLeaderLatch();
-      Thread.sleep(1000);
-      initAndStartLeaderLatch();
-    } catch (Exception e) {
-      LOG.info("Fail to re-join election.", e);
-    }
-  }
-
-  private void closeLeaderLatch() throws IOException {
-    if (leaderLatch != null) {
-      leaderLatch.close();
-    }
-  }
-  @Override
-  public void notLeader() {
-    LOG.info(rmId + " relinquish leadership");
-    try {
-      rmContext.getRMAdminService().transitionToStandby(
-          new HAServiceProtocol.StateChangeRequestInfo(
-              HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
-    } catch (Exception e) {
-      LOG.info(rmId + " did not transition to standby successfully.");
-    }
-  }
-
-  // only for testing
-  @VisibleForTesting
-  public CuratorFramework getCuratorClient() {
-    return this.curator;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index c9d185f..26ef5ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -145,13 +145,15 @@ public interface RMContext {
   
   void setQueuePlacementManager(PlacementManager placementMgr);
 
-  void setLeaderElectorService(LeaderElectorService elector);
+  void setLeaderElectorService(EmbeddedElector elector);
 
-  LeaderElectorService getLeaderElectorService();
+  EmbeddedElector getLeaderElectorService();
 
   QueueLimitCalculator getNodeManagerQueueLimitCalculator();
 
   void setRMAppLifetimeMonitor(RMAppLifetimeMonitor rmAppLifetimeMonitor);
 
   RMAppLifetimeMonitor getRMAppLifetimeMonitor();
+
+  String getHAZookeeperConnectionState();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 3f17ac6..a452f95 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -76,7 +76,7 @@ public class RMContextImpl implements RMContext {
 
   private RMApplicationHistoryWriter rmApplicationHistoryWriter;
   private SystemMetricsPublisher systemMetricsPublisher;
-  private LeaderElectorService elector;
+  private EmbeddedElector elector;
 
   private QueueLimitCalculator queueLimitCalculator;
 
@@ -143,12 +143,12 @@ public class RMContextImpl implements RMContext {
   }
 
   @Override
-  public void setLeaderElectorService(LeaderElectorService elector) {
+  public void setLeaderElectorService(EmbeddedElector elector) {
     this.elector = elector;
   }
 
   @Override
-  public LeaderElectorService getLeaderElectorService() {
+  public EmbeddedElector getLeaderElectorService() {
     return this.elector;
   }
 
@@ -513,4 +513,13 @@ public class RMContextImpl implements RMContext {
   public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
     return this.activeServiceContext.getRMAppLifetimeMonitor();
   }
+
+  public String getHAZookeeperConnectionState() {
+    if (elector == null) {
+      return "Could not find leader elector. Verify both HA and automatic " +
+          "failover are enabled.";
+    } else {
+      return elector.getZookeeperConnectionState();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 8ddbc20..110f2c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -272,16 +272,17 @@ public class ResourceManager extends CompositeService implements Recoverable {
     this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
     if (this.rmContext.isHAEnabled()) {
       HAUtil.verifyAndSetConfiguration(this.conf);
-      curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
-          YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
-      if (curatorEnabled) {
-        this.curator = createAndStartCurator(conf);
-        LeaderElectorService elector = new LeaderElectorService(rmContext, this);
-        addService(elector);
+
+      // If the RM is configured to use an embedded leader elector,
+      // initialize the leader elector.
+      if (HAUtil.isAutomaticFailoverEnabled(conf) &&
+          HAUtil.isAutomaticFailoverEmbedded(conf)) {
+        EmbeddedElector elector = createEmbeddedElector();
+        addIfService(elector);
         rmContext.setLeaderElectorService(elector);
       }
     }
-    
+
     // Set UGI and do login
     // If security is enabled, use login user
     // If security is not enabled, use current user
@@ -331,6 +332,20 @@ public class ResourceManager extends CompositeService implements Recoverable {
     super.serviceInit(this.conf);
   }
 
+  protected EmbeddedElector createEmbeddedElector() throws IOException {
+    EmbeddedElector elector;
+    curatorEnabled =
+        conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
+            YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
+    if (curatorEnabled) {
+      this.curator = createAndStartCurator(conf);
+      elector = new CuratorBasedElectorService(rmContext, this);
+    } else {
+      elector = new ActiveStandbyElectorBasedElectorService(rmContext);
+    }
+    return elector;
+  }
+
   public CuratorFramework createAndStartCurator(Configuration conf)
       throws IOException {
     String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
@@ -802,14 +817,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
         // Transition to standby and reinit active services
         LOG.info("Transitioning RM to Standby mode");
         transitionToStandby(true);
-        if (curatorEnabled) {
-          rmContext.getLeaderElectorService().reJoinElection();
-        } else {
-          adminService.resetLeaderElection();
+        EmbeddedElector elector = rmContext.getLeaderElectorService();
+        if (elector != null) {
+          elector.rejoinElection();
         }
-        return;
       } catch (Exception e) {
-        LOG.fatal("Failed to transition RM to Standby mode.");
+        LOG.fatal("Failed to transition RM to Standby mode.", e);
         ExitUtil.terminate(1, e);
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
index 2d7139f..3367cf4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
@@ -121,8 +121,7 @@ public class RMWebApp extends WebApp implements YarnWebParams {
   }
 
   public String getHAZookeeperConnectionState() {
-    return rm.getRMContext().getRMAdminService()
-      .getHAZookeeperConnectionState();
+    return getRMContext().getHAZookeeperConnectionState();
   }
 
   public RMContext getRMContext() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java
index 512a5c4..d815315 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java
@@ -64,7 +64,7 @@ public class ClusterInfo {
     this.hadoopBuildVersion = VersionInfo.getBuildVersion();
     this.hadoopVersionBuiltOn = VersionInfo.getDate();
     this.haZooKeeperConnectionState =
-        rm.getRMContext().getRMAdminService().getHAZookeeperConnectionState();
+        rm.getRMContext().getHAZookeeperConnectionState();
   }
 
   public String getState() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index ea573e2..a66b093 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -109,6 +109,7 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.junit.Assert;
 
+
 @SuppressWarnings("unchecked")
 public class MockRM extends ResourceManager {
 
@@ -123,6 +124,8 @@ public class MockRM extends ResourceManager {
   private final boolean useNullRMNodeLabelsManager;
   private boolean disableDrainEventsImplicitly;
 
+  private boolean useRealElector = false;
+
   public MockRM() {
     this(new YarnConfiguration());
   }
@@ -132,13 +135,23 @@ public class MockRM extends ResourceManager {
   }
   
   public MockRM(Configuration conf, RMStateStore store) {
-    this(conf, store, true);
+    this(conf, store, true, false);
   }
-  
+
+  public MockRM(Configuration conf, boolean useRealElector) {
+    this(conf, null, true, useRealElector);
+  }
+
+  public MockRM(Configuration conf, RMStateStore store,
+      boolean useRealElector) {
+    this(conf, store, true, useRealElector);
+  }
+
   public MockRM(Configuration conf, RMStateStore store,
-      boolean useNullRMNodeLabelsManager) {
+      boolean useNullRMNodeLabelsManager, boolean useRealElector) {
     super();
     this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager;
+    this.useRealElector = useRealElector;
     init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
     if (store != null) {
       setRMStateStore(store);
@@ -193,6 +206,15 @@ public class MockRM extends ResourceManager {
   }
 
   @Override
+  protected EmbeddedElector createEmbeddedElector() throws IOException {
+    if (useRealElector) {
+      return super.createEmbeddedElector();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
   protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
     return new EventHandler<SchedulerEvent>() {
       @Override
@@ -984,11 +1006,6 @@ public class MockRM extends ResourceManager {
       protected void stopServer() {
         // don't do anything
       }
-
-      @Override
-      protected EmbeddedElectorService createEmbeddedElectorService() {
-        return null;
-      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
index 6092f41..c9ce7d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
@@ -108,13 +108,13 @@ public abstract class RMHATestBase extends ClientBaseWithFixes{
   }
 
   protected void startRMs() throws IOException {
-    rm1 = new MockRM(confForRM1, null, false){
+    rm1 = new MockRM(confForRM1, null, false, false){
       @Override
       protected Dispatcher createDispatcher() {
         return new DrainDispatcher();
       }
     };
-    rm2 = new MockRM(confForRM2, null, false){
+    rm2 = new MockRM(confForRM2, null, false, false){
       @Override
       protected Dispatcher createDispatcher() {
         return new DrainDispatcher();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
index bb10041..121cacb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
@@ -63,7 +63,6 @@ public class TestLeaderElectorService {
     conf = new Configuration();
     conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
     conf.setBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, true);
-    conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true);
 
     conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
     conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
@@ -121,7 +120,7 @@ public class TestLeaderElectorService {
       }
     };
     memStore.init(conf);
-    rm1 = new MockRM(conf, memStore);
+    rm1 = new MockRM(conf, memStore, true);
     rm1.init(conf);
     rm1.start();
 
@@ -167,7 +166,8 @@ public class TestLeaderElectorService {
 
     rm1 = startRM("rm1", HAServiceState.ACTIVE);
 
-    LeaderElectorService service = rm1.getRMContext().getLeaderElectorService();
+    CuratorBasedElectorService service = (CuratorBasedElectorService)
+        rm1.getRMContext().getLeaderElectorService();
     CuratorZookeeperClient client =
         service.getCuratorClient().getZookeeperClient();
     // this will expire current curator client session. curator will re-establish
@@ -187,7 +187,7 @@ public class TestLeaderElectorService {
     Thread launchRM = new Thread() {
       @Override
       public void run() {
-        rm1 = new MockRM(conf) {
+        rm1 = new MockRM(conf, true) {
           @Override
           synchronized void transitionToActive() throws Exception {
             if (throwException.get()) {
@@ -217,9 +217,12 @@ public class TestLeaderElectorService {
     rm1 = startRM("rm1", HAServiceState.ACTIVE);
     rm2 = startRM("rm2", HAServiceState.STANDBY);
 
+    CuratorBasedElectorService service = (CuratorBasedElectorService)
+        rm1.getRMContext().getLeaderElectorService();
+
     ZooKeeper zkClient =
-        rm1.getRMContext().getLeaderElectorService().getCuratorClient()
-            .getZookeeperClient().getZooKeeper();
+        service.getCuratorClient().getZookeeperClient().getZooKeeper();
+
     InstanceSpec connectionInstance = zkCluster.findConnectionInstance(zkClient);
     zkCluster.killServer(connectionInstance);
 
@@ -245,7 +248,7 @@ public class TestLeaderElectorService {
   private MockRM startRM(String rmId, HAServiceState state) throws Exception{
     YarnConfiguration yarnConf = new YarnConfiguration(conf);
     yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
-    MockRM rm = new MockRM(yarnConf);
+    MockRM rm = new MockRM(yarnConf, true);
     rm.init(yarnConf);
     rm.start();
     waitFor(rm, state);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
index bfd0b4e..1fe9bbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
@@ -127,7 +127,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
     myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50);
     when(rc.getRMAdminService()).thenReturn(as);
 
-    EmbeddedElectorService ees = new EmbeddedElectorService(rc);
+    ActiveStandbyElectorBasedElectorService
+        ees = new ActiveStandbyElectorBasedElectorService(rc);
     ees.init(myConf);
 
     ees.enterNeutralMode();
@@ -164,7 +165,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
    * @throws InterruptedException if interrupted
    */
   private void testCallbackSynchronizationActive(AdminService as,
-      EmbeddedElectorService ees) throws IOException, InterruptedException {
+      ActiveStandbyElectorBasedElectorService ees)
+      throws IOException, InterruptedException {
     ees.becomeActive();
 
     Thread.sleep(100);
@@ -183,7 +185,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
    * @throws InterruptedException if interrupted
    */
   private void testCallbackSynchronizationStandby(AdminService as,
-      EmbeddedElectorService ees) throws IOException, InterruptedException {
+      ActiveStandbyElectorBasedElectorService ees)
+      throws IOException, InterruptedException {
     ees.becomeStandby();
 
     Thread.sleep(100);
@@ -201,7 +204,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
    * @throws InterruptedException if interrupted
    */
   private void testCallbackSynchronizationNeutral(AdminService as,
-      EmbeddedElectorService ees) throws IOException, InterruptedException {
+      ActiveStandbyElectorBasedElectorService ees)
+      throws IOException, InterruptedException {
     ees.enterNeutralMode();
 
     Thread.sleep(100);
@@ -220,7 +224,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
    * @throws InterruptedException if interrupted
    */
   private void testCallbackSynchronizationTimingActive(AdminService as,
-      EmbeddedElectorService ees) throws IOException, InterruptedException {
+      ActiveStandbyElectorBasedElectorService ees)
+      throws IOException, InterruptedException {
     synchronized (ees.zkDisconnectLock) {
       // Sleep while holding the lock so that the timer thread can't do
       // anything when it runs.  Sleep until we're pretty sure the timer thread
@@ -250,7 +255,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
    * @throws InterruptedException if interrupted
    */
   private void testCallbackSynchronizationTimingStandby(AdminService as,
-      EmbeddedElectorService ees) throws IOException, InterruptedException {
+      ActiveStandbyElectorBasedElectorService ees)
+      throws IOException, InterruptedException {
     synchronized (ees.zkDisconnectLock) {
       // Sleep while holding the lock so that the timer thread can't do
       // anything when it runs.  Sleep until we're pretty sure the timer thread
@@ -283,25 +289,20 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
     }
 
     @Override
-    protected AdminService createAdminService() {
-      return new AdminService(MockRMWithElector.this, getRMContext()) {
+    protected EmbeddedElector createEmbeddedElector() {
+      return new ActiveStandbyElectorBasedElectorService(getRMContext()) {
         @Override
-        protected EmbeddedElectorService createEmbeddedElectorService() {
-          return new EmbeddedElectorService(getRMContext()) {
-            @Override
-            public void becomeActive() throws
-                ServiceFailedException {
-              try {
-                callbackCalled.set(true);
-                TestRMEmbeddedElector.LOG.info("Callback called. Sleeping now");
-                Thread.sleep(delayMs);
-                TestRMEmbeddedElector.LOG.info("Sleep done");
-              } catch (InterruptedException e) {
-                e.printStackTrace();
-              }
-              super.becomeActive();
-            }
-          };
+        public void becomeActive() throws
+            ServiceFailedException {
+          try {
+            callbackCalled.set(true);
+            TestRMEmbeddedElector.LOG.info("Callback called. Sleeping now");
+            Thread.sleep(delayMs);
+            TestRMEmbeddedElector.LOG.info("Sleep done");
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+          super.becomeActive();
         }
       };
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
index 000f4a4..5114329 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
@@ -161,8 +161,8 @@ public class TestRMHA {
 
     ClientResponse response =
         webResource.path("ws").path("v1").path("cluster").path("apps")
-          .path(path).accept(MediaType.APPLICATION_JSON)
-          .get(ClientResponse.class);
+            .path(path).accept(MediaType.APPLICATION_JSON)
+            .get(ClientResponse.class);
     assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
         response.getType().toString());
     JSONObject json = response.getEntity(JSONObject.class);
@@ -178,13 +178,13 @@ public class TestRMHA {
    * 1. Standby: Should be a no-op
    * 2. Active: Active services should start
    * 3. Active: Should be a no-op.
-   *    While active, submit a couple of jobs
+   * While active, submit a couple of jobs
    * 4. Standby: Active services should stop
    * 5. Active: Active services should start
    * 6. Stop the RM: All services should stop and RM should not be ready to
    * become Active
    */
-  @Test (timeout = 30000)
+  @Test(timeout = 30000)
   public void testFailoverAndTransitions() throws Exception {
     configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
     Configuration conf = new YarnConfiguration(configuration);
@@ -204,37 +204,37 @@ public class TestRMHA {
     checkMonitorHealth();
     checkStandbyRMFunctionality();
     verifyClusterMetrics(0, 0, 0, 0, 0, 0);
-    
+
     // 1. Transition to Standby - must be a no-op
     rm.adminService.transitionToStandby(requestInfo);
     checkMonitorHealth();
     checkStandbyRMFunctionality();
     verifyClusterMetrics(0, 0, 0, 0, 0, 0);
-    
+
     // 2. Transition to active
     rm.adminService.transitionToActive(requestInfo);
     checkMonitorHealth();
     checkActiveRMFunctionality();
     verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
-    
+
     // 3. Transition to active - no-op
     rm.adminService.transitionToActive(requestInfo);
     checkMonitorHealth();
     checkActiveRMFunctionality();
     verifyClusterMetrics(1, 2, 2, 2, 2048, 2);
-    
+
     // 4. Transition to standby
     rm.adminService.transitionToStandby(requestInfo);
     checkMonitorHealth();
     checkStandbyRMFunctionality();
     verifyClusterMetrics(0, 0, 0, 0, 0, 0);
-   
+
     // 5. Transition to active to check Active->Standby->Active works
     rm.adminService.transitionToActive(requestInfo);
     checkMonitorHealth();
     checkActiveRMFunctionality();
     verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
-    
+
     // 6. Stop the RM. All services should stop and RM should not be ready to
     // become active
     rm.stop();
@@ -340,7 +340,7 @@ public class TestRMHA {
     rm.adminService.transitionToStandby(requestInfo);
     rm.adminService.transitionToActive(requestInfo);
     rm.adminService.transitionToStandby(requestInfo);
-    
+
     MyCountingDispatcher dispatcher =
         (MyCountingDispatcher) rm.getRMContext().getDispatcher();
     assertTrue(!dispatcher.isStopped());
@@ -348,24 +348,24 @@ public class TestRMHA {
     rm.adminService.transitionToActive(requestInfo);
     assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
         ((MyCountingDispatcher) rm.getRMContext().getDispatcher())
-        .getEventHandlerCount());
+            .getEventHandlerCount());
     assertEquals(errorMessageForService, expectedServiceCount,
         rm.getServices().size());
 
-    
+
     // Keep the dispatcher reference before transitioning to standby
     dispatcher = (MyCountingDispatcher) rm.getRMContext().getDispatcher();
-    
-    
+
+
     rm.adminService.transitionToStandby(requestInfo);
     assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
         ((MyCountingDispatcher) rm.getRMContext().getDispatcher())
-        .getEventHandlerCount());
+            .getEventHandlerCount());
     assertEquals(errorMessageForService, expectedServiceCount,
         rm.getServices().size());
 
     assertTrue(dispatcher.isStopped());
-    
+
     rm.stop();
   }
 
@@ -386,7 +386,8 @@ public class TestRMHA {
     assertEquals(conf.get(YarnConfiguration.RM_HA_ID), RM1_NODE_ID);
 
     //test if RM_HA_ID can not be found
-    configuration.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID+ "," + RM3_NODE_ID);
+    configuration
+        .set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM3_NODE_ID);
     configuration.unset(YarnConfiguration.RM_HA_ID);
     conf = new YarnConfiguration(configuration);
     try {
@@ -458,7 +459,7 @@ public class TestRMHA {
     checkActiveRMFunctionality();
   }
 
-  @Test(timeout = 90000)
+  @Test
   public void testTransitionedToStandbyShouldNotHang() throws Exception {
     configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
     Configuration conf = new YarnConfiguration(configuration);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[3/8] hadoop git commit: YARN-5925. Extract hbase-backend-exclusive utility methods from TimelineStorageUtil. Contributed by Haibo Chen.

Posted by xg...@apache.org.
YARN-5925. Extract hbase-backend-exclusive utility methods from TimelineStorageUtil. Contributed by Haibo Chen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/55f5886e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/55f5886e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/55f5886e

Branch: refs/heads/YARN-5734
Commit: 55f5886ea24671ff3db87a64aaba2e76b3355455
Parents: 2a28e8c
Author: Sangjin Lee <sj...@apache.org>
Authored: Fri Dec 9 16:17:24 2016 -0800
Committer: Sangjin Lee <sj...@apache.org>
Committed: Fri Dec 9 16:17:24 2016 -0800

----------------------------------------------------------------------
 ...stTimelineReaderWebServicesHBaseStorage.java |   6 +-
 .../flow/TestHBaseStorageFlowActivity.java      |  12 +-
 .../storage/flow/TestHBaseStorageFlowRun.java   |  14 +-
 .../flow/TestHBaseStorageFlowRunCompaction.java |  44 ++--
 .../storage/common/AppIdKeyConverter.java       |   5 +-
 .../common/HBaseTimelineStorageUtils.java       | 243 +++++++++++++++++++
 .../storage/common/TimelineStorageUtils.java    | 207 ----------------
 .../storage/flow/FlowActivityColumnPrefix.java  |  10 +-
 .../storage/flow/FlowActivityRowKey.java        |   4 +-
 .../storage/flow/FlowRunColumn.java             |   6 +-
 .../storage/flow/FlowRunColumnPrefix.java       |   6 +-
 .../storage/flow/FlowRunCoprocessor.java        |   6 +-
 .../storage/flow/FlowScanner.java               |  13 +-
 .../storage/common/TestRowKeys.java             |   2 +-
 14 files changed, 309 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/55f5886e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
index 17c01b5..63a75d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
@@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -78,7 +78,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
   private static HBaseTestingUtility util;
   private static long ts = System.currentTimeMillis();
   private static long dayTs =
-      TimelineStorageUtils.getTopOfTheDayTimestamp(ts);
+      HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts);
 
   @BeforeClass
   public static void setup() throws Exception {
@@ -984,7 +984,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
       assertEquals(1, entities.size());
 
       long firstFlowActivity =
-          TimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L);
+          HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L);
 
       DateFormat fmt = TimelineReaderWebServices.DATE_FORMAT.get();
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55f5886e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
index 3ddb230..2778f50 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReader
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -172,7 +172,7 @@ public class TestHBaseStorageFlowActivity {
     assertEquals(cluster, flowActivityRowKey.getClusterId());
     assertEquals(user, flowActivityRowKey.getUserId());
     assertEquals(flow, flowActivityRowKey.getFlowName());
-    Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs);
+    Long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs);
     assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
     assertEquals(1, values.size());
     checkFlowActivityRunId(runid, flowVersion, values);
@@ -303,7 +303,8 @@ public class TestHBaseStorageFlowActivity {
       assertEquals(cluster, flowActivityRowKey.getClusterId());
       assertEquals(user, flowActivityRowKey.getUserId());
       assertEquals(flow, flowActivityRowKey.getFlowName());
-      Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
+      Long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(
+          appCreatedTime);
       assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
       assertEquals(1, values.size());
       checkFlowActivityRunId(runid, flowVersion, values);
@@ -388,7 +389,7 @@ public class TestHBaseStorageFlowActivity {
         assertEquals(user, flowActivity.getUser());
         assertEquals(flow, flowActivity.getFlowName());
         long dayTs =
-            TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
+            HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
         assertEquals(dayTs, flowActivity.getDate().getTime());
         Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
         assertEquals(3, flowRuns.size());
@@ -443,7 +444,8 @@ public class TestHBaseStorageFlowActivity {
       assertEquals(cluster, flowActivityRowKey.getClusterId());
       assertEquals(user, flowActivityRowKey.getUserId());
       assertEquals(flow, flowActivityRowKey.getFlowName());
-      Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
+      Long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(
+          appCreatedTime);
       assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
 
       Map<byte[], byte[]> values = result

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55f5886e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index 377611f..7f46a5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -62,7 +62,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriter
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -109,8 +109,8 @@ public class TestHBaseStorageFlowRun {
       HRegionServer server = util.getRSForFirstRegionInTable(table);
       List<Region> regions = server.getOnlineRegions(table);
       for (Region region : regions) {
-        assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
-            hbaseConf));
+        assertTrue(HBaseTimelineStorageUtils.isFlowRunTable(
+            region.getRegionInfo(), hbaseConf));
       }
     }
 
@@ -124,8 +124,8 @@ public class TestHBaseStorageFlowRun {
       HRegionServer server = util.getRSForFirstRegionInTable(table);
       List<Region> regions = server.getOnlineRegions(table);
       for (Region region : regions) {
-        assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
-            hbaseConf));
+        assertFalse(HBaseTimelineStorageUtils.isFlowRunTable(
+            region.getRegionInfo(), hbaseConf));
       }
     }
 
@@ -139,8 +139,8 @@ public class TestHBaseStorageFlowRun {
       HRegionServer server = util.getRSForFirstRegionInTable(table);
       List<Region> regions = server.getOnlineRegions(table);
       for (Region region : regions) {
-        assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
-            hbaseConf));
+        assertFalse(HBaseTimelineStorageUtils.isFlowRunTable(
+            region.getRegionInfo(), hbaseConf));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55f5886e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
index 756d57b..eb18e28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
@@ -54,8 +54,8 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -417,8 +417,8 @@ public class TestHBaseStorageFlowRunCompaction {
     tags.add(t);
     byte[] tagByteArray = Tag.fromList(tags);
     // create a cell with a VERY old timestamp and attribute SUM_FINAL
-    Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
-        cell1Ts, Bytes.toBytes(cellValue1), tagByteArray);
+    Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+        aQualifier, cell1Ts, Bytes.toBytes(cellValue1), tagByteArray);
     currentColumnCells.add(c1);
 
     tags = new ArrayList<>();
@@ -427,8 +427,8 @@ public class TestHBaseStorageFlowRunCompaction {
     tags.add(t);
     tagByteArray = Tag.fromList(tags);
     // create a cell with a recent timestamp and attribute SUM_FINAL
-    Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
-        cell2Ts, Bytes.toBytes(cellValue2), tagByteArray);
+    Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+        aQualifier, cell2Ts, Bytes.toBytes(cellValue2), tagByteArray);
     currentColumnCells.add(c2);
 
     tags = new ArrayList<>();
@@ -437,8 +437,8 @@ public class TestHBaseStorageFlowRunCompaction {
     tags.add(t);
     tagByteArray = Tag.fromList(tags);
     // create a cell with a VERY old timestamp but has attribute SUM
-    Cell c3 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
-        cell3Ts, Bytes.toBytes(cellValue3), tagByteArray);
+    Cell c3 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+        aQualifier, cell3Ts, Bytes.toBytes(cellValue3), tagByteArray);
     currentColumnCells.add(c3);
 
     tags = new ArrayList<>();
@@ -447,8 +447,8 @@ public class TestHBaseStorageFlowRunCompaction {
     tags.add(t);
     tagByteArray = Tag.fromList(tags);
     // create a cell with a VERY old timestamp but has attribute SUM
-    Cell c4 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
-        cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
+    Cell c4 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+        aQualifier, cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
     currentColumnCells.add(c4);
 
     List<Cell> cells =
@@ -517,7 +517,7 @@ public class TestHBaseStorageFlowRunCompaction {
       tags.add(t);
       byte[] tagByteArray = Tag.fromList(tags);
       // create a cell with a VERY old timestamp and attribute SUM_FINAL
-      c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+      c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
           cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
       currentColumnCells.add(c1);
       cellTsFinal++;
@@ -531,7 +531,7 @@ public class TestHBaseStorageFlowRunCompaction {
       tags.add(t);
       byte[] tagByteArray = Tag.fromList(tags);
       // create a cell with attribute SUM
-      c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+      c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
           cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
       currentColumnCells.add(c1);
       cellTsNotFinal++;
@@ -608,7 +608,7 @@ public class TestHBaseStorageFlowRunCompaction {
       tags.add(t);
       byte[] tagByteArray = Tag.fromList(tags);
       // create a cell with a VERY old timestamp and attribute SUM_FINAL
-      c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+      c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
           cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
       currentColumnCells.add(c1);
       cellTsFinal++;
@@ -622,7 +622,7 @@ public class TestHBaseStorageFlowRunCompaction {
       tags.add(t);
       byte[] tagByteArray = Tag.fromList(tags);
       // create a cell with a VERY old timestamp and attribute SUM_FINAL
-      c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+      c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
           cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray);
       currentColumnCells.add(c1);
       cellTsFinalNotExpire++;
@@ -636,7 +636,7 @@ public class TestHBaseStorageFlowRunCompaction {
       tags.add(t);
       byte[] tagByteArray = Tag.fromList(tags);
       // create a cell with attribute SUM
-      c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+      c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
           cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
       currentColumnCells.add(c1);
       cellTsNotFinal++;
@@ -693,8 +693,8 @@ public class TestHBaseStorageFlowRunCompaction {
     SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
 
     // create a cell with a VERY old timestamp and attribute SUM_FINAL
-    Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
-        120L, Bytes.toBytes(cellValue1), tagByteArray);
+    Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+        aQualifier, 120L, Bytes.toBytes(cellValue1), tagByteArray);
     currentColumnCells.add(c1);
 
     tags = new ArrayList<>();
@@ -704,8 +704,8 @@ public class TestHBaseStorageFlowRunCompaction {
     tagByteArray = Tag.fromList(tags);
 
     // create a cell with a VERY old timestamp but has attribute SUM
-    Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
-        130L, Bytes.toBytes(cellValue2), tagByteArray);
+    Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+        aQualifier, 130L, Bytes.toBytes(cellValue2), tagByteArray);
     currentColumnCells.add(c2);
     List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
         new LongConverter(), currentTimestamp);
@@ -751,8 +751,8 @@ public class TestHBaseStorageFlowRunCompaction {
     SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
 
     // create a cell with a VERY old timestamp
-    Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
-        120L, Bytes.toBytes(1110L), tagByteArray);
+    Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+        aQualifier, 120L, Bytes.toBytes(1110L), tagByteArray);
     currentColumnCells.add(c1);
 
     List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
@@ -789,8 +789,8 @@ public class TestHBaseStorageFlowRunCompaction {
 
     SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
 
-    Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
-        currentTimestamp, Bytes.toBytes(1110L), tagByteArray);
+    Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
+        aQualifier, currentTimestamp, Bytes.toBytes(1110L), tagByteArray);
     currentColumnCells.add(c1);
     List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
         new LongConverter(), currentTimestamp);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55f5886e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
index 4cb46e6..c165801 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
@@ -54,7 +54,8 @@ public final class AppIdKeyConverter implements KeyConverter<String> {
     byte[] clusterTs = Bytes.toBytes(
         LongConverter.invertLong(appId.getClusterTimestamp()));
     System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG);
-    byte[] seqId = Bytes.toBytes(TimelineStorageUtils.invertInt(appId.getId()));
+    byte[] seqId = Bytes.toBytes(
+        HBaseTimelineStorageUtils.invertInt(appId.getId()));
     System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT);
     return appIdBytes;
   }
@@ -79,7 +80,7 @@ public final class AppIdKeyConverter implements KeyConverter<String> {
     }
     long clusterTs = LongConverter.invertLong(
         Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG));
-    int seqId = TimelineStorageUtils.invertInt(
+    int seqId = HBaseTimelineStorageUtils.invertInt(
         Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT));
     return ApplicationId.newInstance(clusterTs, seqId).toString();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55f5886e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
new file mode 100644
index 0000000..e93b470
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A bunch of utility functions used in HBase TimelineService backend.
+ */
+public final class HBaseTimelineStorageUtils {
+  /** milliseconds in one day. */
+  public static final long MILLIS_ONE_DAY = 86400000L;
+  private static final Log LOG =
+      LogFactory.getLog(HBaseTimelineStorageUtils.class);
+
+  private HBaseTimelineStorageUtils() {
+  }
+
+
+  /**
+   * Combines the input array of attributes and the input aggregation operation
+   * into a new array of attributes.
+   *
+   * @param attributes Attributes to be combined.
+   * @param aggOp Aggregation operation.
+   * @return array of combined attributes.
+   */
+  public static Attribute[] combineAttributes(Attribute[] attributes,
+                                              AggregationOperation aggOp) {
+    int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
+    Attribute[] combinedAttributes = new Attribute[newLength];
+
+    if (attributes != null) {
+      System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
+    }
+
+    if (aggOp != null) {
+      Attribute a2 = aggOp.getAttribute();
+      combinedAttributes[newLength - 1] = a2;
+    }
+    return combinedAttributes;
+  }
+
+  /**
+   * Returns a number for the new array size. The new array is the combination
+   * of input array of attributes and the input aggregation operation.
+   *
+   * @param attributes Attributes.
+   * @param aggOp Aggregation operation.
+   * @return the size for the new array
+   */
+  private static int getNewLengthCombinedAttributes(Attribute[] attributes,
+      AggregationOperation aggOp) {
+    int oldLength = getAttributesLength(attributes);
+    int aggLength = getAppOpLength(aggOp);
+    return oldLength + aggLength;
+  }
+
+  private static int getAppOpLength(AggregationOperation aggOp) {
+    if (aggOp != null) {
+      return 1;
+    }
+    return 0;
+  }
+
+  private static int getAttributesLength(Attribute[] attributes) {
+    if (attributes != null) {
+      return attributes.length;
+    }
+    return 0;
+  }
+
+  /**
+   * Returns the first seen aggregation operation as seen in the list of input
+   * tags or null otherwise.
+   *
+   * @param tags list of HBase tags.
+   * @return AggregationOperation
+   */
+  public static AggregationOperation getAggregationOperationFromTagsList(
+      List<Tag> tags) {
+    for (AggregationOperation aggOp : AggregationOperation.values()) {
+      for (Tag tag : tags) {
+        if (tag.getType() == aggOp.getTagType()) {
+          return aggOp;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Creates a {@link Tag} from the input attribute.
+   *
+   * @param attribute Attribute from which tag has to be fetched.
+   * @return a HBase Tag.
+   */
+  public static Tag getTagFromAttribute(Map.Entry<String, byte[]> attribute) {
+    // attribute could be either an Aggregation Operation or
+    // an Aggregation Dimension
+    // Get the Tag type from either
+    AggregationOperation aggOp = AggregationOperation
+        .getAggregationOperation(attribute.getKey());
+    if (aggOp != null) {
+      Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
+      return t;
+    }
+
+    AggregationCompactionDimension aggCompactDim =
+        AggregationCompactionDimension.getAggregationCompactionDimension(
+            attribute.getKey());
+    if (aggCompactDim != null) {
+      Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
+      return t;
+    }
+    return null;
+  }
+
+  /**
+   * creates a new cell based on the input cell but with the new value.
+   *
+   * @param origCell Original cell
+   * @param newValue new cell value
+   * @return cell
+   * @throws IOException while creating new cell.
+   */
+  public static Cell createNewCell(Cell origCell, byte[] newValue)
+      throws IOException {
+    return CellUtil.createCell(CellUtil.cloneRow(origCell),
+        CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
+        origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
+  }
+
+  /**
+   * creates a cell with the given inputs.
+   *
+   * @param row row of the cell to be created
+   * @param family column family name of the new cell
+   * @param qualifier qualifier for the new cell
+   * @param ts timestamp of the new cell
+   * @param newValue value of the new cell
+   * @param tags tags in the new cell
+   * @return cell
+   * @throws IOException while creating the cell.
+   */
+  public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier,
+      long ts, byte[] newValue, byte[] tags) throws IOException {
+    return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put,
+        newValue, tags);
+  }
+
+  /**
+   * returns app id from the list of tags.
+   *
+   * @param tags cell tags to be looked into
+   * @return App Id as the AggregationCompactionDimension
+   */
+  public static String getAggregationCompactionDimension(List<Tag> tags) {
+    String appId = null;
+    for (Tag t : tags) {
+      if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
+          .getType()) {
+        appId = Bytes.toString(t.getValue());
+        return appId;
+      }
+    }
+    return appId;
+  }
+
+  public static boolean isFlowRunTable(HRegionInfo hRegionInfo,
+                                       Configuration conf) {
+    String regionTableName = hRegionInfo.getTable().getNameAsString();
+    String flowRunTableName = conf.get(FlowRunTable.TABLE_NAME_CONF_NAME,
+        FlowRunTable.DEFAULT_TABLE_NAME);
+    if (HBaseTimelineStorageUtils.LOG.isDebugEnabled()) {
+      HBaseTimelineStorageUtils.LOG.debug("regionTableName=" + regionTableName);
+    }
+    if (flowRunTableName.equalsIgnoreCase(regionTableName)) {
+      if (HBaseTimelineStorageUtils.LOG.isDebugEnabled()) {
+        HBaseTimelineStorageUtils.LOG.debug(
+            "table is the flow run table!! " + flowRunTableName);
+      }
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Converts an int into it's inverse int to be used in (row) keys
+   * where we want to have the largest int value in the top of the table
+   * (scans start at the largest int first).
+   *
+   * @param key value to be inverted so that the latest version will be first in
+   *          a scan.
+   * @return inverted int
+   */
+  public static int invertInt(int key) {
+    return Integer.MAX_VALUE - key;
+  }
+
+  /**
+   * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
+   * for a given input timestamp.
+   *
+   * @param ts Timestamp.
+   * @return timestamp of that day's beginning (midnight)
+   */
+  public static long getTopOfTheDayTimestamp(long ts) {
+    long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
+    return dayTimestamp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55f5886e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
index aa9a793..9b83659 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -20,22 +20,13 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
@@ -47,10 +38,6 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
 
 /**
  * A bunch of utility functions used across TimelineReader and TimelineWriter.
@@ -63,133 +50,6 @@ public final class TimelineStorageUtils {
 
   private static final Log LOG = LogFactory.getLog(TimelineStorageUtils.class);
 
-  /** milliseconds in one day. */
-  public static final long MILLIS_ONE_DAY = 86400000L;
-
-  /**
-   * Converts an int into it's inverse int to be used in (row) keys
-   * where we want to have the largest int value in the top of the table
-   * (scans start at the largest int first).
-   *
-   * @param key value to be inverted so that the latest version will be first in
-   *          a scan.
-   * @return inverted int
-   */
-  public static int invertInt(int key) {
-    return Integer.MAX_VALUE - key;
-  }
-
-  /**
-   * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
-   * for a given input timestamp.
-   *
-   * @param ts Timestamp.
-   * @return timestamp of that day's beginning (midnight)
-   */
-  public static long getTopOfTheDayTimestamp(long ts) {
-    long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
-    return dayTimestamp;
-  }
-
-  /**
-   * Combines the input array of attributes and the input aggregation operation
-   * into a new array of attributes.
-   *
-   * @param attributes Attributes to be combined.
-   * @param aggOp Aggregation operation.
-   * @return array of combined attributes.
-   */
-  public static Attribute[] combineAttributes(Attribute[] attributes,
-      AggregationOperation aggOp) {
-    int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
-    Attribute[] combinedAttributes = new Attribute[newLength];
-
-    if (attributes != null) {
-      System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
-    }
-
-    if (aggOp != null) {
-      Attribute a2 = aggOp.getAttribute();
-      combinedAttributes[newLength - 1] = a2;
-    }
-    return combinedAttributes;
-  }
-
-  /**
-   * Returns a number for the new array size. The new array is the combination
-   * of input array of attributes and the input aggregation operation.
-   *
-   * @param attributes Attributes.
-   * @param aggOp Aggregation operation.
-   * @return the size for the new array
-   */
-  private static int getNewLengthCombinedAttributes(Attribute[] attributes,
-      AggregationOperation aggOp) {
-    int oldLength = getAttributesLength(attributes);
-    int aggLength = getAppOpLength(aggOp);
-    return oldLength + aggLength;
-  }
-
-  private static int getAppOpLength(AggregationOperation aggOp) {
-    if (aggOp != null) {
-      return 1;
-    }
-    return 0;
-  }
-
-  private static int getAttributesLength(Attribute[] attributes) {
-    if (attributes != null) {
-      return attributes.length;
-    }
-    return 0;
-  }
-
-  /**
-   * Returns the first seen aggregation operation as seen in the list of input
-   * tags or null otherwise.
-   *
-   * @param tags list of HBase tags.
-   * @return AggregationOperation
-   */
-  public static AggregationOperation getAggregationOperationFromTagsList(
-      List<Tag> tags) {
-    for (AggregationOperation aggOp : AggregationOperation.values()) {
-      for (Tag tag : tags) {
-        if (tag.getType() == aggOp.getTagType()) {
-          return aggOp;
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Creates a {@link Tag} from the input attribute.
-   *
-   * @param attribute Attribute from which tag has to be fetched.
-   * @return a HBase Tag.
-   */
-  public static Tag getTagFromAttribute(Entry<String, byte[]> attribute) {
-    // attribute could be either an Aggregation Operation or
-    // an Aggregation Dimension
-    // Get the Tag type from either
-    AggregationOperation aggOp = AggregationOperation
-        .getAggregationOperation(attribute.getKey());
-    if (aggOp != null) {
-      Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
-      return t;
-    }
-
-    AggregationCompactionDimension aggCompactDim =
-        AggregationCompactionDimension.getAggregationCompactionDimension(
-            attribute.getKey());
-    if (aggCompactDim != null) {
-      Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
-      return t;
-    }
-    return null;
-  }
-
   /**
    * Matches key-values filter. Used for relatesTo/isRelatedTo filters.
    *
@@ -516,71 +376,4 @@ public final class TimelineStorageUtils {
         (obj instanceof Long);
   }
 
-  /**
-   * creates a new cell based on the input cell but with the new value.
-   *
-   * @param origCell Original cell
-   * @param newValue new cell value
-   * @return cell
-   * @throws IOException while creating new cell.
-   */
-  public static Cell createNewCell(Cell origCell, byte[] newValue)
-      throws IOException {
-    return CellUtil.createCell(CellUtil.cloneRow(origCell),
-        CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
-        origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
-  }
-
-  /**
-   * creates a cell with the given inputs.
-   *
-   * @param row row of the cell to be created
-   * @param family column family name of the new cell
-   * @param qualifier qualifier for the new cell
-   * @param ts timestamp of the new cell
-   * @param newValue value of the new cell
-   * @param tags tags in the new cell
-   * @return cell
-   * @throws IOException while creating the cell.
-   */
-  public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier,
-      long ts, byte[] newValue, byte[] tags) throws IOException {
-    return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put,
-        newValue, tags);
-  }
-
-  /**
-   * returns app id from the list of tags.
-   *
-   * @param tags cell tags to be looked into
-   * @return App Id as the AggregationCompactionDimension
-   */
-  public static String getAggregationCompactionDimension(List<Tag> tags) {
-    String appId = null;
-    for (Tag t : tags) {
-      if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
-          .getType()) {
-        appId = Bytes.toString(t.getValue());
-        return appId;
-      }
-    }
-    return appId;
-  }
-
-  public static boolean isFlowRunTable(HRegionInfo hRegionInfo,
-      Configuration conf) {
-    String regionTableName = hRegionInfo.getTable().getNameAsString();
-    String flowRunTableName = conf.get(FlowRunTable.TABLE_NAME_CONF_NAME,
-        FlowRunTable.DEFAULT_TABLE_NAME);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("regionTableName=" + regionTableName);
-    }
-    if (flowRunTableName.equalsIgnoreCase(regionTableName)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(" table is the flow run table!! " + flowRunTableName);
-      }
-      return true;
-    }
-    return false;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55f5886e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
index 71c3d90..439e0c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
@@ -26,9 +26,9 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
 
@@ -144,8 +144,8 @@ public enum FlowActivityColumnPrefix
     }
 
     byte[] columnQualifier = getColumnPrefixBytes(qualifier);
-    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
-        attributes, this.aggOp);
+    Attribute[] combinedAttributes =
+        HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
         combinedAttributes);
   }
@@ -269,8 +269,8 @@ public enum FlowActivityColumnPrefix
     }
 
     byte[] columnQualifier = getColumnPrefixBytes(qualifier);
-    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
-        attributes, this.aggOp);
+    Attribute[] combinedAttributes =
+        HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
         combinedAttributes);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55f5886e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
index d10608a..bb77e36 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -18,10 +18,10 @@
 package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 /**
  * Represents a rowkey for the flow activity table.
@@ -59,7 +59,7 @@ public class FlowActivityRowKey {
       String flowName, boolean convertDayTsToTopOfDay) {
     this.clusterId = clusterId;
     if (convertDayTsToTopOfDay && (timestamp != null)) {
-      this.dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(timestamp);
+      this.dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(timestamp);
     } else {
       this.dayTs = timestamp;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55f5886e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
index 2e7a9d8..90dd345 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
@@ -25,9 +25,9 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
 
@@ -113,8 +113,8 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
       TypedBufferedMutator<FlowRunTable> tableMutator, Long timestamp,
       Object inputValue, Attribute... attributes) throws IOException {
 
-    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
-        attributes, aggOp);
+    Attribute[] combinedAttributes =
+        HBaseTimelineStorageUtils.combineAttributes(attributes, aggOp);
     column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
         inputValue, combinedAttributes);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55f5886e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
index e74282a..278d18e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -26,10 +26,10 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
 
@@ -136,7 +136,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
 
     byte[] columnQualifier = getColumnPrefixBytes(qualifier);
     Attribute[] combinedAttributes =
-        TimelineStorageUtils.combineAttributes(attributes, this.aggOp);
+        HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
         combinedAttributes);
   }
@@ -163,7 +163,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
 
     byte[] columnQualifier = getColumnPrefixBytes(qualifier);
     Attribute[] combinedAttributes =
-        TimelineStorageUtils.combineAttributes(attributes, this.aggOp);
+        HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
         combinedAttributes);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55f5886e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
index a9dcfaa..2be6ef8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
 
 /**
@@ -71,7 +71,7 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
     if (e instanceof RegionCoprocessorEnvironment) {
       RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
       this.region = env.getRegion();
-      isFlowRunRegion = TimelineStorageUtils.isFlowRunTable(
+      isFlowRunRegion = HBaseTimelineStorageUtils.isFlowRunTable(
           region.getRegionInfo(), env.getConfiguration());
     }
   }
@@ -107,7 +107,7 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
     List<Tag> tags = new ArrayList<>();
     if ((attributes != null) && (attributes.size() > 0)) {
       for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
-        Tag t = TimelineStorageUtils.getTagFromAttribute(attribute);
+        Tag t = HBaseTimelineStorageUtils.getTagFromAttribute(attribute);
         tags.add(t);
       }
       byte[] tagByteArray = Tag.fromList(tags);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55f5886e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index 6e67722..0e3c8ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -45,9 +45,9 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
 
@@ -249,7 +249,7 @@ class FlowScanner implements RegionScanner, Closeable {
     List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
         cell.getTagsLength());
     // We assume that all the operations for a particular column are the same
-    return TimelineStorageUtils.getAggregationOperationFromTagsList(tags);
+    return HBaseTimelineStorageUtils.getAggregationOperationFromTagsList(tags);
   }
 
   /**
@@ -323,7 +323,7 @@ class FlowScanner implements RegionScanner, Closeable {
       // only if this app has not been seen yet, add to current column cells
       List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
           cell.getTagsLength());
-      String aggDim = TimelineStorageUtils
+      String aggDim = HBaseTimelineStorageUtils
           .getAggregationCompactionDimension(tags);
       if (!alreadySeenAggDim.contains(aggDim)) {
         // if this agg dimension has already been seen,
@@ -418,7 +418,8 @@ class FlowScanner implements RegionScanner, Closeable {
       sum = converter.add(sum, currentValue);
     }
     byte[] sumBytes = converter.encodeValue(sum);
-    Cell sumCell = TimelineStorageUtils.createNewCell(mostRecentCell, sumBytes);
+    Cell sumCell =
+        HBaseTimelineStorageUtils.createNewCell(mostRecentCell, sumBytes);
     return sumCell;
   }
 
@@ -460,7 +461,7 @@ class FlowScanner implements RegionScanner, Closeable {
       // if this is the existing flow sum cell
       List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
           cell.getTagsLength());
-      String appId = TimelineStorageUtils
+      String appId = HBaseTimelineStorageUtils
           .getAggregationCompactionDimension(tags);
       if (appId == FLOW_APP_ID) {
         sum = converter.add(sum, currentValue);
@@ -502,7 +503,7 @@ class FlowScanner implements RegionScanner, Closeable {
           Bytes.toBytes(FLOW_APP_ID));
       tags.add(t);
       byte[] tagByteArray = Tag.fromList(tags);
-      Cell sumCell = TimelineStorageUtils.createNewCell(
+      Cell sumCell = HBaseTimelineStorageUtils.createNewCell(
           CellUtil.cloneRow(anyCell),
           CellUtil.cloneFamily(anyCell),
           CellUtil.cloneQualifier(anyCell),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55f5886e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
index 368b060..5beb189 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
@@ -185,7 +185,7 @@ public class TestRowKeys {
   @Test
   public void testFlowActivityRowKey() {
     Long ts = 1459900830000L;
-    Long dayTimestamp = TimelineStorageUtils.getTopOfTheDayTimestamp(ts);
+    Long dayTimestamp = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts);
     byte[] byteRowKey =
         new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME).getRowKey();
     FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(byteRowKey);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[7/8] hadoop git commit: MAPREDUCE-6820. Fix dead links in Job relevant classes. Contributed by Yiqun Lin.

Posted by xg...@apache.org.
MAPREDUCE-6820. Fix dead links in Job relevant classes. Contributed by Yiqun Lin.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/92a8917c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/92a8917c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/92a8917c

Branch: refs/heads/YARN-5734
Commit: 92a8917ca5a25bf6bee75acbb554aae87766a7e7
Parents: 4b149a1
Author: Akira Ajisaka <aa...@apache.org>
Authored: Sat Dec 10 10:55:41 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Sat Dec 10 10:55:41 2016 +0900

----------------------------------------------------------------------
 .../src/main/java/org/apache/hadoop/mapred/JobConf.java            | 2 +-
 .../src/main/java/org/apache/hadoop/mapred/Mapper.java             | 2 +-
 .../src/main/java/org/apache/hadoop/mapred/Reducer.java            | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/92a8917c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
index 85b4c93..ef9ec61 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
@@ -1318,7 +1318,7 @@ public class JobConf extends Configuration {
    * bytes, of input files. However, the {@link FileSystem} blocksize of the 
    * input files is treated as an upper bound for input splits. A lower bound 
    * on the split size can be set via 
-   * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
+   * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.input.fileinputformat.split.minsize">
    * mapreduce.input.fileinputformat.split.minsize</a>.</p>
    *  
    * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92a8917c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Mapper.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Mapper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Mapper.java
index ac2c96d..3c15439 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Mapper.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Mapper.java
@@ -147,7 +147,7 @@ public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {
    * takes significant amount of time to process individual key/value
    * pairs, this is crucial since the framework might assume that the task has 
    * timed-out and kill that task. The other way of avoiding this is to set 
-   * <a href="{@docRoot}/../mapred-default.html#mapreduce.task.timeout">
+   * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.task.timeout">
    * mapreduce.task.timeout</a> to a high-enough value (or even zero for no 
    * time-outs).</p>
    * 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92a8917c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Reducer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Reducer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Reducer.java
index 962e195..fa6de12 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Reducer.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Reducer.java
@@ -187,7 +187,7 @@ public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {
    * takes a significant amount of time to process individual key/value 
    * pairs, this is crucial since the framework might assume that the task has 
    * timed-out and kill that task. The other way of avoiding this is to set 
-   * <a href="{@docRoot}/../mapred-default.html#mapreduce.task.timeout">
+   * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.task.timeout">
    * mapreduce.task.timeout</a> to a high-enough value (or even zero for no 
    * time-outs).</p>
    * 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/8] hadoop git commit: HDFS-11229. HDFS-11056 failed to close meta file. Contributed by Wei-Chiu Chuang.

Posted by xg...@apache.org.
HDFS-11229. HDFS-11056 failed to close meta file. Contributed by Wei-Chiu Chuang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2a28e8cf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2a28e8cf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2a28e8cf

Branch: refs/heads/YARN-5734
Commit: 2a28e8cf0469a373a99011f0fa540474e60528c8
Parents: b606e02
Author: Wei-Chiu Chuang <we...@apache.org>
Authored: Fri Dec 9 16:02:42 2016 -0800
Committer: Wei-Chiu Chuang <we...@apache.org>
Committed: Fri Dec 9 16:02:42 2016 -0800

----------------------------------------------------------------------
 .../hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java     | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a28e8cf/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 08564de..97bb09b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -1132,9 +1132,10 @@ public class FsVolumeImpl implements FsVolumeSpi {
     int offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
         (int)(onDiskLen / bytesPerChecksum * checksumSize);
     byte[] lastChecksum = new byte[checksumSize];
-    RandomAccessFile raf = new RandomAccessFile(metaFile, "r");
-    raf.seek(offsetInChecksum);
-    raf.read(lastChecksum, 0, checksumSize);
+    try (RandomAccessFile raf = new RandomAccessFile(metaFile, "r")) {
+      raf.seek(offsetInChecksum);
+      raf.read(lastChecksum, 0, checksumSize);
+    }
     return lastChecksum;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org