You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/05/07 20:35:42 UTC
[17/35] hadoop git commit: YARN-8151. Yarn RM Epoch should wrap
around. Contributed by Young Chen.
YARN-8151. Yarn RM Epoch should wrap around. Contributed by Young Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6a80e47
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6a80e47
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6a80e47
Branch: refs/heads/HDDS-4
Commit: e6a80e476d4348a4373e6dd5792d70edff16516f
Parents: 87c23ef
Author: Inigo Goiri <in...@apache.org>
Authored: Wed May 2 17:23:17 2018 -0700
Committer: Inigo Goiri <in...@apache.org>
Committed: Wed May 2 17:23:17 2018 -0700
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 4 ++++
.../src/main/resources/yarn-default.xml | 7 +++++++
.../recovery/FileSystemRMStateStore.java | 4 ++--
.../recovery/LeveldbRMStateStore.java | 2 +-
.../recovery/MemoryRMStateStore.java | 2 +-
.../resourcemanager/recovery/RMStateStore.java | 18 +++++++++++++++++-
.../resourcemanager/recovery/ZKRMStateStore.java | 4 ++--
.../recovery/RMStateStoreTestBase.java | 14 ++++++++++++++
.../recovery/TestFSRMStateStore.java | 1 +
.../recovery/TestLeveldbRMStateStore.java | 1 +
.../recovery/TestZKRMStateStore.java | 1 +
11 files changed, 51 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8aa136d..5ba2e05 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -188,6 +188,10 @@ public class YarnConfiguration extends Configuration {
public static final String RM_EPOCH = RM_PREFIX + "epoch";
public static final long DEFAULT_RM_EPOCH = 0L;
+ /** The epoch range before wrap around. 0 disables wrap around*/
+ public static final String RM_EPOCH_RANGE = RM_EPOCH + ".range";
+ public static final long DEFAULT_RM_EPOCH_RANGE = 0;
+
/** The address of the applications manager interface in the RM.*/
public static final String RM_ADDRESS =
RM_PREFIX + "address";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 85915c2..4eb509f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -677,6 +677,13 @@
</property>
<property>
+ <description>The range of values above base epoch that the RM will use before
+ wrapping around</description>
+ <name>yarn.resourcemanager.epoch.range</name>
+ <value>0</value>
+ </property>
+
+ <property>
<description>The list of RM nodes in the cluster when HA is
enabled. See description of yarn.resourcemanager.ha
.enabled for full details on how this is used.</description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 19297bc..b797283 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -205,12 +205,12 @@ public class FileSystemRMStateStore extends RMStateStore {
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
currentEpoch = epoch.getEpoch();
// increment epoch and store it
- byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+ byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
.toByteArray();
updateFile(epochNodePath, storeData, false);
} else {
// initialize epoch file with 1 for the next time.
- byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+ byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
.toByteArray();
writeFileWithRetries(epochNodePath, storeData, false);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
index 36a8dfa..e7fb02f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
@@ -259,7 +259,7 @@ public class LeveldbRMStateStore extends RMStateStore {
if (data != null) {
currentEpoch = EpochProto.parseFrom(data).getEpoch();
}
- EpochProto proto = Epoch.newInstance(currentEpoch + 1).getProto();
+ EpochProto proto = Epoch.newInstance(nextEpoch(currentEpoch)).getProto();
db.put(dbKeyBytes, proto.toByteArray());
} catch (DBException e) {
throw new IOException(e);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
index 5041000..219e10a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
@@ -59,7 +59,7 @@ public class MemoryRMStateStore extends RMStateStore {
@Override
public synchronized long getAndIncrementEpoch() throws Exception {
long currentEpoch = epoch;
- epoch = epoch + 1;
+ epoch = nextEpoch(epoch);
return currentEpoch;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index b4dd378..242b5d0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -104,6 +104,7 @@ public abstract class RMStateStore extends AbstractService {
protected static final String VERSION_NODE = "RMVersionNode";
protected static final String EPOCH_NODE = "EpochNode";
protected long baseEpoch;
+ private long epochRange;
protected ResourceManager resourceManager;
private final ReadLock readLock;
private final WriteLock writeLock;
@@ -732,6 +733,8 @@ public abstract class RMStateStore extends AbstractService {
// read the base epoch value from conf
baseEpoch = conf.getLong(YarnConfiguration.RM_EPOCH,
YarnConfiguration.DEFAULT_RM_EPOCH);
+ epochRange = conf.getLong(YarnConfiguration.RM_EPOCH_RANGE,
+ YarnConfiguration.DEFAULT_RM_EPOCH_RANGE);
initInternal(conf);
}
@@ -818,7 +821,20 @@ public abstract class RMStateStore extends AbstractService {
* Get the current epoch of RM and increment the value.
*/
public abstract long getAndIncrementEpoch() throws Exception;
-
+
+ /**
+ * Compute the next epoch value by incrementing by one.
+ * Wraps around if the epoch range is exceeded so that
+ * when federation is enabled epoch collisions can be avoided.
+ */
+ protected long nextEpoch(long epoch){
+ long epochVal = epoch - baseEpoch + 1;
+ if (epochRange > 0) {
+ epochVal %= epochRange;
+ }
+ return epochVal + baseEpoch;
+ }
+
/**
* Blocking API
* The derived class must recover state from the store and return a new
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 9073910..de1f1ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -491,13 +491,13 @@ public class ZKRMStateStore extends RMStateStore {
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
currentEpoch = epoch.getEpoch();
// increment epoch and store it
- byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+ byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
.toByteArray();
zkManager.safeSetData(epochNodePath, storeData, -1, zkAcl,
fencingNodePath);
} else {
// initialize epoch node with 1 for the next time.
- byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+ byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
.toByteArray();
zkManager.safeCreate(epochNodePath, storeData, zkAcl,
CreateMode.PERSISTENT, zkAcl, fencingNodePath);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 957d4ce..3454d72 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -94,6 +94,8 @@ public class RMStateStoreTestBase {
protected final long epoch = 10L;
+ private final long epochRange = 10L;
+
static class TestDispatcher implements Dispatcher, EventHandler<Event> {
ApplicationAttemptId attemptId;
@@ -141,6 +143,10 @@ public class RMStateStoreTestBase {
boolean attemptExists(RMAppAttempt attempt) throws Exception;
}
+ public long getEpochRange() {
+ return epochRange;
+ }
+
void waitNotify(TestDispatcher dispatcher) {
long startTime = System.currentTimeMillis();
while(!dispatcher.notified) {
@@ -576,6 +582,14 @@ public class RMStateStoreTestBase {
long thirdTimeEpoch = store.getAndIncrementEpoch();
Assert.assertEquals(epoch + 2, thirdTimeEpoch);
+
+ for (int i = 0; i < epochRange; ++i) {
+ store.getAndIncrementEpoch();
+ }
+ long wrappedEpoch = store.getAndIncrementEpoch();
+ // Epoch should have wrapped around and then incremented once for a total
+ // of + 3
+ Assert.assertEquals(epoch + 3, wrappedEpoch);
}
public void testAppDeletion(RMStateStoreHelper stateStoreHelper)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
index fe4a701..14f5404 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
@@ -118,6 +118,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
900L);
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
+ conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange());
if (adminCheckEnable) {
conf.setBoolean(
YarnConfiguration.YARN_INTERMEDIATE_DATA_ENCRYPTION, true);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
index afd0c77..576ee7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
@@ -83,6 +83,7 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
@Test(timeout = 60000)
public void testEpoch() throws Exception {
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
+ conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange());
LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
testEpoch(tester);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6a80e47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index d8718e0..4cba266 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -210,6 +210,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
curatorTestingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
+ conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange());
this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
return this.store;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org