You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2018/05/04 19:28:06 UTC

[38/50] [abbrv] hadoop git commit: YARN-8151. Yarn RM Epoch should wrap around. Contributed by Young Chen.

YARN-8151. Yarn RM Epoch should wrap around. Contributed by Young Chen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6a80e47
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6a80e47
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6a80e47

Branch: refs/heads/HDFS-12943
Commit: e6a80e476d4348a4373e6dd5792d70edff16516f
Parents: 87c23ef
Author: Inigo Goiri <in...@apache.org>
Authored: Wed May 2 17:23:17 2018 -0700
Committer: Inigo Goiri <in...@apache.org>
Committed: Wed May 2 17:23:17 2018 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java       |  4 ++++
 .../src/main/resources/yarn-default.xml           |  7 +++++++
 .../recovery/FileSystemRMStateStore.java          |  4 ++--
 .../recovery/LeveldbRMStateStore.java             |  2 +-
 .../recovery/MemoryRMStateStore.java              |  2 +-
 .../resourcemanager/recovery/RMStateStore.java    | 18 +++++++++++++++++-
 .../resourcemanager/recovery/ZKRMStateStore.java  |  4 ++--
 .../recovery/RMStateStoreTestBase.java            | 14 ++++++++++++++
 .../recovery/TestFSRMStateStore.java              |  1 +
 .../recovery/TestLeveldbRMStateStore.java         |  1 +
 .../recovery/TestZKRMStateStore.java              |  1 +
 11 files changed, 51 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8aa136d..5ba2e05 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -188,6 +188,10 @@ public class YarnConfiguration extends Configuration {
   public static final String RM_EPOCH = RM_PREFIX + "epoch";
   public static final long DEFAULT_RM_EPOCH = 0L;
 
+  /** The epoch range before wrap around. 0 disables wrap around*/
+  public static final String RM_EPOCH_RANGE = RM_EPOCH + ".range";
+  public static final long DEFAULT_RM_EPOCH_RANGE = 0;
+
   /** The address of the applications manager interface in the RM.*/
   public static final String RM_ADDRESS = 
     RM_PREFIX + "address";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 85915c2..4eb509f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -677,6 +677,13 @@
   </property>
 
   <property>
+    <description>The range of values above base epoch that the RM will use before
+      wrapping around</description>
+    <name>yarn.resourcemanager.epoch.range</name>
+    <value>0</value>
+  </property>
+
+  <property>
     <description>The list of RM nodes in the cluster when HA is
       enabled. See description of yarn.resourcemanager.ha
       .enabled for full details on how this is used.</description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 19297bc..b797283 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -205,12 +205,12 @@ public class FileSystemRMStateStore extends RMStateStore {
       Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
       currentEpoch = epoch.getEpoch();
       // increment epoch and store it
-      byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+      byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
           .toByteArray();
       updateFile(epochNodePath, storeData, false);
     } else {
       // initialize epoch file with 1 for the next time.
-      byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+      byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
           .toByteArray();
       writeFileWithRetries(epochNodePath, storeData, false);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
index 36a8dfa..e7fb02f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
@@ -259,7 +259,7 @@ public class LeveldbRMStateStore extends RMStateStore {
       if (data != null) {
         currentEpoch = EpochProto.parseFrom(data).getEpoch();
       }
-      EpochProto proto = Epoch.newInstance(currentEpoch + 1).getProto();
+      EpochProto proto = Epoch.newInstance(nextEpoch(currentEpoch)).getProto();
       db.put(dbKeyBytes, proto.toByteArray());
     } catch (DBException e) {
       throw new IOException(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
index 5041000..219e10a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
@@ -59,7 +59,7 @@ public class MemoryRMStateStore extends RMStateStore {
   @Override
   public synchronized long getAndIncrementEpoch() throws Exception {
     long currentEpoch = epoch;
-    epoch = epoch + 1;
+    epoch = nextEpoch(epoch);
     return currentEpoch;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index b4dd378..242b5d0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -104,6 +104,7 @@ public abstract class RMStateStore extends AbstractService {
   protected static final String VERSION_NODE = "RMVersionNode";
   protected static final String EPOCH_NODE = "EpochNode";
   protected long baseEpoch;
+  private long epochRange;
   protected ResourceManager resourceManager;
   private final ReadLock readLock;
   private final WriteLock writeLock;
@@ -732,6 +733,8 @@ public abstract class RMStateStore extends AbstractService {
     // read the base epoch value from conf
     baseEpoch = conf.getLong(YarnConfiguration.RM_EPOCH,
         YarnConfiguration.DEFAULT_RM_EPOCH);
+    epochRange = conf.getLong(YarnConfiguration.RM_EPOCH_RANGE,
+        YarnConfiguration.DEFAULT_RM_EPOCH_RANGE);
     initInternal(conf);
   }
 
@@ -818,7 +821,20 @@ public abstract class RMStateStore extends AbstractService {
    * Get the current epoch of RM and increment the value.
    */
   public abstract long getAndIncrementEpoch() throws Exception;
-  
+
+  /**
+   * Compute the next epoch value by incrementing by one.
+   * Wraps around if the epoch range is exceeded so that
+   * when federation is enabled epoch collisions can be avoided.
+   */
+  protected long nextEpoch(long epoch){
+    long epochVal = epoch - baseEpoch + 1;
+    if (epochRange > 0) {
+      epochVal %= epochRange;
+    }
+    return  epochVal + baseEpoch;
+  }
+
   /**
    * Blocking API
    * The derived class must recover state from the store and return a new 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 9073910..de1f1ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -491,13 +491,13 @@ public class ZKRMStateStore extends RMStateStore {
       Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
       currentEpoch = epoch.getEpoch();
       // increment epoch and store it
-      byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+      byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
           .toByteArray();
       zkManager.safeSetData(epochNodePath, storeData, -1, zkAcl,
           fencingNodePath);
     } else {
       // initialize epoch node with 1 for the next time.
-      byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+      byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
           .toByteArray();
       zkManager.safeCreate(epochNodePath, storeData, zkAcl,
           CreateMode.PERSISTENT, zkAcl, fencingNodePath);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 957d4ce..3454d72 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -94,6 +94,8 @@ public class RMStateStoreTestBase {
 
   protected final long epoch = 10L;
 
+  private final long epochRange = 10L;
+
   static class TestDispatcher implements Dispatcher, EventHandler<Event> {
 
     ApplicationAttemptId attemptId;
@@ -141,6 +143,10 @@ public class RMStateStoreTestBase {
     boolean attemptExists(RMAppAttempt attempt) throws Exception;
   }
 
+  public long getEpochRange() {
+    return epochRange;
+  }
+
   void waitNotify(TestDispatcher dispatcher) {
     long startTime = System.currentTimeMillis();
     while(!dispatcher.notified) {
@@ -576,6 +582,14 @@ public class RMStateStoreTestBase {
     
     long thirdTimeEpoch = store.getAndIncrementEpoch();
     Assert.assertEquals(epoch + 2, thirdTimeEpoch);
+
+    for (int i = 0; i < epochRange; ++i) {
+      store.getAndIncrementEpoch();
+    }
+    long wrappedEpoch = store.getAndIncrementEpoch();
+    // Epoch should have wrapped around and then incremented once for a total
+    // of + 3
+    Assert.assertEquals(epoch + 3, wrappedEpoch);
   }
 
   public void testAppDeletion(RMStateStoreHelper stateStoreHelper)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
index fe4a701..14f5404 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
@@ -118,6 +118,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
       conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
               900L);
       conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
+      conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange());
       if (adminCheckEnable) {
         conf.setBoolean(
           YarnConfiguration.YARN_INTERMEDIATE_DATA_ENCRYPTION, true);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
index afd0c77..576ee7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
@@ -83,6 +83,7 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
   @Test(timeout = 60000)
   public void testEpoch() throws Exception {
     conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
+    conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange());
     LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
     testEpoch(tester);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index d8718e0..4cba266 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -210,6 +210,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
           curatorTestingServer.getConnectString());
       conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
       conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
+      conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange());
       this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
       return this.store;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org