You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2016/01/13 23:29:30 UTC
[02/17] hadoop git commit: YARN-4348. ZKRMStateStore.syncInternal
shouldn't wait for sync completion for avoiding blocking ZK's event thread.
(ozawa)
YARN-4348. ZKRMStateStore.syncInternal shouldn't wait for sync completion for avoiding blocking ZK's event thread. (ozawa)
(cherry picked from commit 0460b8a8a3de232f236f49ef6769d38cda62cc28)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0fc19f6a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0fc19f6a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0fc19f6a
Branch: refs/heads/branch-2.7.2
Commit: 0fc19f6a4d24b7268a3d79d7d98fce528ffdefcc
Parents: 3e192f8
Author: Tsuyoshi Ozawa <oz...@apache.org>
Authored: Tue Dec 8 13:31:23 2015 +0900
Committer: Vinod Kumar Vavilapalli (I am also known as @tshooter.) <vi...@apache.org>
Committed: Wed Jan 13 11:24:11 2016 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 ++
.../recovery/ZKRMStateStore.java | 42 +++++++-------------
2 files changed, 18 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fc19f6a/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 2487e3a..61e14f5 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -879,6 +879,9 @@ Release 2.6.3 - UNRELEASED
YARN-4434. NodeManager Disk Checker parameter documentation is not correct.
(Weiwei Yang via aajisaka)
+ YARN-4348. ZKRMStateStore.syncInternal shouldn't wait for sync completion for
+ avoiding blocking ZK's event thread. (ozawa)
+
Release 2.6.2 - 2015-10-28
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fc19f6a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index f12ada7..70ccd8c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -116,12 +116,10 @@ public class ZKRMStateStore extends RMStateStore {
private List<ZKUtil.ZKAuthInfo> zkAuths;
class ZKSyncOperationCallback implements AsyncCallback.VoidCallback {
- public final CountDownLatch latch = new CountDownLatch(1);
@Override
public void processResult(int rc, String path, Object ctx){
if (rc == Code.OK.intValue()) {
LOG.info("ZooKeeper sync operation succeeded. path: " + path);
- latch.countDown();
} else {
LOG.fatal("ZooKeeper sync operation failed. Waiting for session " +
"timeout. path: " + path);
@@ -945,16 +943,20 @@ public class ZKRMStateStore extends RMStateStore {
* @return true if ZK.sync() succeededs, false if ZK.sync() fails.
* @throws InterruptedException
*/
- private boolean syncInternal(String path) throws InterruptedException {
- ZKSyncOperationCallback cb = new ZKSyncOperationCallback();
- if (path != null) {
- zkClient.sync(path, cb, null);
- } else {
- zkClient.sync(zkRootNodePath, cb, null);
+ private void syncInternal(final String path) throws InterruptedException {
+ final ZKSyncOperationCallback cb = new ZKSyncOperationCallback();
+ final String pathForSync = (path != null) ? path : zkRootNodePath;
+ try {
+ new ZKAction<Void>() {
+ @Override
+ Void run() throws KeeperException, InterruptedException {
+ zkClient.sync(pathForSync, cb, null);
+ return null;
+ }
+ }.runWithRetries();
+ } catch (Exception e) {
+ LOG.fatal("sync failed.");
}
- boolean succeededToSync = cb.latch.await(
- zkSessionTimeout, TimeUnit.MILLISECONDS);
- return succeededToSync;
}
/**
@@ -1211,22 +1213,8 @@ public class ZKRMStateStore extends RMStateStore {
"Retry no. " + retry);
Thread.sleep(zkRetryInterval);
createConnection();
- boolean succeededToSync = false;
- try {
- succeededToSync = syncInternal(ke.getPath());
- } catch (InterruptedException ie) {
- LOG.info("Interrupted sync operation. Giving up!");
- Thread.currentThread().interrupt();
- throw ke;
- }
- if (succeededToSync) {
- // continue the operation.
- continue;
- } else {
- // Giving up since new connection without sync can occur an
- // unexpected view from the client like YARN-3798.
- LOG.info("Failed to sync with ZK new connection.");
- }
+ syncInternal(ke.getPath());
+ continue;
}
LOG.info("Maxed out ZK retries. Giving up!");
throw ke;