You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/02/19 01:34:50 UTC

hadoop git commit: YARN-1514. Utility to benchmark ZKRMStateStore#loadState for RM HA. Contributed by Tsuyoshi OZAWA (cherry picked from commit 1c03376300a46722d4147f5b8f37242f68dba0a2)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 3605928bd -> b8fe1a747


YARN-1514. Utility to benchmark ZKRMStateStore#loadState for RM HA. Contributed by Tsuyoshi OZAWA
(cherry picked from commit 1c03376300a46722d4147f5b8f37242f68dba0a2)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b8fe1a74
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b8fe1a74
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b8fe1a74

Branch: refs/heads/branch-2
Commit: b8fe1a747e9dfaeaf71471d89a706787a28aaa1b
Parents: 3605928
Author: Jian He <ji...@apache.org>
Authored: Wed Feb 18 16:06:55 2015 -0800
Committer: Jian He <ji...@apache.org>
Committed: Wed Feb 18 16:34:42 2015 -0800

----------------------------------------------------------------------
 hadoop-project/pom.xml                          |   1 -
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop-yarn-server-resourcemanager/pom.xml  |   8 +-
 .../org/apache/hadoop/test/YarnTestDriver.java  |  60 ++++
 .../recovery/RMStateStoreTestBase.java          |  19 +-
 .../recovery/TestZKRMStateStorePerf.java        | 277 +++++++++++++++++++
 6 files changed, 359 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8fe1a74/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 569b292..5d0aa02 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -827,7 +827,6 @@
         <artifactId>zookeeper</artifactId>
         <version>${zookeeper.version}</version>
         <type>test-jar</type>
-        <scope>test</scope>
         <exclusions>
           <exclusion>
             <groupId>org.jboss.netty</groupId>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8fe1a74/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b1dfa21..2a9fbe0 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -265,6 +265,9 @@ Release 2.7.0 - UNRELEASED
     YARN-1299. Improve a log message in AppSchedulingInfo by adding application 
     id. (Ashutosh Jindal and Devaraj K via ozawa)
 
+    YARN-1514. Utility to benchmark ZKRMStateStore#loadState for RM HA.
+    (Tsuyoshi OZAWA via jianhe)
+
   OPTIMIZATIONS
 
     YARN-2990. FairScheduler's delay-scheduling always waits for node-local and 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8fe1a74/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index 30695ab..a41b94a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -186,7 +186,6 @@
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
       <type>test-jar</type>
-      <scope>test</scope>
     </dependency>
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
@@ -245,6 +244,13 @@
               <goal>test-jar</goal>
             </goals>
             <phase>test-compile</phase>
+            <configuration>
+              <archive>
+                <manifest>
+                  <mainClass>org.apache.hadoop.test.YarnTestDriver</mainClass>
+                </manifest>
+              </archive>
+            </configuration>
           </execution>
         </executions>
       </plugin>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8fe1a74/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/test/YarnTestDriver.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/test/YarnTestDriver.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/test/YarnTestDriver.java
new file mode 100644
index 0000000..8874ed8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/test/YarnTestDriver.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.test;
+
+import org.apache.hadoop.util.ProgramDriver;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestZKRMStateStorePerf;
+
+/**
+ * Driver for Yarn tests.
+ *
+ */
+public class YarnTestDriver {
+
+  private ProgramDriver pgd;
+
+  public YarnTestDriver() {
+    this(new ProgramDriver());
+  }
+
+  public YarnTestDriver(ProgramDriver pgd) {
+    this.pgd = pgd;
+    try {
+      pgd.addClass(TestZKRMStateStorePerf.class.getSimpleName(),
+          TestZKRMStateStorePerf.class,
+          "ZKRMStateStore i/o benchmark.");
+    } catch(Throwable e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void run(String argv[]) {
+    int exitCode = -1;
+    try {
+      exitCode = pgd.run(argv);
+    } catch(Throwable e) {
+      e.printStackTrace();
+    }
+    System.exit(exitCode);
+  }
+
+  public static void main(String argv[]){
+    new YarnTestDriver().run(argv);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8fe1a74/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index b01969b..5b53a02 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
+import org.apache.hadoop.yarn.event.Event;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -79,8 +80,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
 
   public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
 
-  static class TestDispatcher implements
-      Dispatcher, EventHandler<RMAppAttemptEvent> {
+  static class TestDispatcher implements Dispatcher, EventHandler<Event> {
 
     ApplicationAttemptId attemptId;
 
@@ -93,8 +93,11 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
     }
 
     @Override
-    public void handle(RMAppAttemptEvent event) {
-      assertEquals(attemptId, event.getApplicationAttemptId());
+    public void handle(Event event) {
+      if (event instanceof RMAppAttemptEvent) {
+        RMAppAttemptEvent rmAppAttemptEvent = (RMAppAttemptEvent) event;
+        assertEquals(attemptId, rmAppAttemptEvent.getApplicationAttemptId());
+      }
       notified = true;
       synchronized (this) {
         notifyAll();
@@ -134,7 +137,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
     dispatcher.notified = false;
   }
 
-  RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime,
+  protected RMApp storeApp(RMStateStore store, ApplicationId appId,
+      long submitTime,
       long startTime) throws Exception {
     ApplicationSubmissionContext context =
         new ApplicationSubmissionContextPBImpl();
@@ -150,7 +154,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
     return mockApp;
   }
 
-  ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
+  protected ContainerId storeAttempt(RMStateStore store,
+      ApplicationAttemptId attemptId,
       String containerIdStr, Token<AMRMTokenIdentifier> appToken,
       SecretKey clientTokenMasterKey, TestDispatcher dispatcher)
       throws Exception {
@@ -474,7 +479,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
 
   }
 
-  private Token<AMRMTokenIdentifier> generateAMRMToken(
+  protected Token<AMRMTokenIdentifier> generateAMRMToken(
       ApplicationAttemptId attemptId,
       AMRMTokenSecretManager appTokenMgr) {
     Token<AMRMTokenIdentifier> appToken =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8fe1a74/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
new file mode 100644
index 0000000..654b357
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import com.google.common.base.Optional;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.crypto.SecretKey;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestZKRMStateStorePerf extends RMStateStoreTestBase
+    implements Tool {
+  public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
+
+  final String version = "0.1";
+
+  // Configurable variables for performance test
+  private int ZK_PERF_NUM_APP_DEFAULT = 1000;
+  private int ZK_PERF_NUM_APPATTEMPT_PER_APP = 10;
+
+  private final long clusterTimeStamp =  1352994193343L;
+
+  private static final String USAGE =
+      "Usage: " + TestZKRMStateStorePerf.class.getSimpleName() +
+          " -appSize numberOfApplications" +
+          " -appAttemptSize numberOfApplicationAttempts" +
+          " [-hostPort Host:Port]" +
+          " [-workingZnode rootZnodeForTesting]\n";
+
+  private YarnConfiguration conf = null;
+  private String workingZnode = "/Test";
+  private ZKRMStateStore store;
+  private AMRMTokenSecretManager appTokenMgr;
+  private ClientToAMTokenSecretManagerInRM clientToAMTokenMgr;
+
+  @Before
+  public void setUpZKServer() throws Exception {
+    super.setUp();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (store != null) {
+      store.stop();
+    }
+    if (appTokenMgr != null) {
+      appTokenMgr.stop();
+    }
+    super.tearDown();
+  }
+
+  private void initStore(String hostPort) {
+    Optional<String> optHostPort = Optional.fromNullable(hostPort);
+    RMContext rmContext = mock(RMContext.class);
+
+    conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.RM_ZK_ADDRESS, optHostPort.or(this.hostPort));
+    conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
+
+    store = new ZKRMStateStore();
+    store.init(conf);
+    store.start();
+    when(rmContext.getStateStore()).thenReturn(store);
+    appTokenMgr = new AMRMTokenSecretManager(conf, rmContext);
+    appTokenMgr.start();
+    clientToAMTokenMgr = new ClientToAMTokenSecretManagerInRM();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public int run(String[] args) {
+    LOG.info("Starting ZKRMStateStorePerf ver." + version);
+
+    int numApp = ZK_PERF_NUM_APP_DEFAULT;
+    int numAppAttemptPerApp = ZK_PERF_NUM_APPATTEMPT_PER_APP;
+    String hostPort = null;
+    boolean launchLocalZK= true;
+
+    if (args.length == 0) {
+      System.err.println("Missing arguments.");
+      return -1;
+    }
+
+    for (int i = 0; i < args.length; i++) { // parse command line
+      if (args[i].equalsIgnoreCase("-appsize")) {
+        numApp = Integer.parseInt(args[++i]);
+      } else if (args[i].equalsIgnoreCase("-appattemptsize")) {
+        numAppAttemptPerApp = Integer.parseInt(args[++i]);
+      } else if (args[i].equalsIgnoreCase("-hostPort")) {
+        hostPort = args[++i];
+        launchLocalZK = false;
+      } else if (args[i].equalsIgnoreCase("-workingZnode"))  {
+        workingZnode = args[++i];
+      } else {
+        System.err.println("Illegal argument: " + args[i]);
+        return -1;
+      }
+    }
+
+    if (launchLocalZK) {
+      try {
+        setUp();
+      } catch (Exception e) {
+        System.err.println("failed to setup. : " + e.getMessage());
+        return -1;
+      }
+    }
+
+    initStore(hostPort);
+
+    long submitTime = System.currentTimeMillis();
+    long startTime = System.currentTimeMillis() + 1234;
+
+    ArrayList<ApplicationId> applicationIds = new ArrayList<>();
+    ArrayList<RMApp> rmApps = new ArrayList<>();
+    ArrayList<ApplicationAttemptId> attemptIds = new ArrayList<>();
+    HashMap<ApplicationId, Set<ApplicationAttemptId>> appIdsToAttemptId =
+        new HashMap<>();
+    TestDispatcher dispatcher = new TestDispatcher();
+    store.setRMDispatcher(dispatcher);
+
+    for (int i = 0; i < numApp; i++) {
+      ApplicationId appId = ApplicationId.newInstance(clusterTimeStamp, i);
+      applicationIds.add(appId);
+      ArrayList<ApplicationAttemptId> attemptIdsForThisApp =
+          new ArrayList<>();
+      for (int j = 0; j < numAppAttemptPerApp; j++) {
+        ApplicationAttemptId attemptId =
+            ApplicationAttemptId.newInstance(appId, j);
+        attemptIdsForThisApp.add(attemptId);
+      }
+      appIdsToAttemptId.put(appId, new LinkedHashSet(attemptIdsForThisApp));
+      attemptIds.addAll(attemptIdsForThisApp);
+    }
+
+    for (ApplicationId appId : applicationIds) {
+      RMApp app = null;
+      try {
+        app = storeApp(store, appId, submitTime, startTime);
+      } catch (Exception e) {
+        System.err.println("failed to create Application Znode. : "
+            + e.getMessage());
+        return -1;
+      }
+      waitNotify(dispatcher);
+      rmApps.add(app);
+    }
+
+    for (ApplicationAttemptId attemptId : attemptIds) {
+      Token<AMRMTokenIdentifier> tokenId =
+          generateAMRMToken(attemptId, appTokenMgr);
+      SecretKey clientTokenKey =
+          clientToAMTokenMgr.createMasterKey(attemptId);
+      try {
+        storeAttempt(store, attemptId,
+            ContainerId.newContainerId(attemptId, 0L).toString(),
+            tokenId, clientTokenKey, dispatcher);
+      } catch (Exception e) {
+        System.err.println("failed to create AppAttempt Znode. : "
+            + e.getMessage());
+        return -1;
+      }
+    }
+
+    long storeStart = System.currentTimeMillis();
+    try {
+      store.loadState();
+    } catch (Exception e) {
+      System.err.println("failed to locaState from ZKRMStateStore. : "
+          + e.getMessage());
+      return -1;
+    }
+    long storeEnd = System.currentTimeMillis();
+
+    long loadTime = storeEnd - storeStart;
+
+    String resultMsg =  "ZKRMStateStore takes " + loadTime + " msec to loadState.";
+    LOG.info(resultMsg);
+    System.out.println(resultMsg);
+
+    // cleanup
+    try {
+      for (RMApp app : rmApps) {
+        ApplicationStateData appState =
+            ApplicationStateData.newInstance(app.getSubmitTime(),
+                app.getStartTime(), app.getApplicationSubmissionContext(),
+                app.getUser());
+        ApplicationId appId = app.getApplicationId();
+        Map m = mock(Map.class);
+        when(m.keySet()).thenReturn(appIdsToAttemptId.get(appId));
+        appState.attempts = m;
+        store.removeApplicationStateInternal(appState);
+      }
+    } catch (Exception e) {
+      System.err.println("failed to cleanup. : " + e.getMessage());
+      return -1;
+    }
+
+    return 0;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    // currently this function is just ignored
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Test
+  public void perfZKRMStateStore() throws Exception {
+    String[] args = {
+        "-appSize", String.valueOf(ZK_PERF_NUM_APP_DEFAULT),
+        "-appAttemptSize", String.valueOf(ZK_PERF_NUM_APPATTEMPT_PER_APP)
+    };
+    run(args);
+  }
+
+  static public void main(String[] args) throws Exception {
+    TestZKRMStateStorePerf perf = new TestZKRMStateStorePerf();
+
+    int res = -1;
+    try {
+      res = ToolRunner.run(perf, args);
+    } catch(Exception e) {
+      System.err.print(StringUtils.stringifyException(e));
+      res = -2;
+    }
+    if(res == -1) {
+      System.err.print(USAGE);
+    }
+    System.exit(res);
+  }
+}