You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/24 01:07:30 UTC
[1/2] hbase git commit: HBASE-18628 Fix event pre-emption in
ZKPermWatcher
Repository: hbase
Updated Branches:
refs/heads/branch-1 19a80c823 -> 362a2924d
refs/heads/branch-1.4 40dedb8df -> e293da211
HBASE-18628 Fix event pre-emption in ZKPermWatcher
Instead of using an Atomic Reference to data and aborting when we detect
that new data comes in, use the native cancellation/pre-emption features
of Java Future.
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e293da21
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e293da21
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e293da21
Branch: refs/heads/branch-1.4
Commit: e293da2114cea79326eb799def72d05aee1753f2
Parents: 40dedb8
Author: Mike Drob <md...@apache.org>
Authored: Mon Aug 21 16:23:27 2017 -0500
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Aug 23 16:42:28 2017 -0700
----------------------------------------------------------------------
.../security/access/ZKPermissionWatcher.java | 60 ++++++++++----------
1 file changed, 30 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e293da21/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
index b4bf510..4c37b52 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
@@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
@@ -55,12 +56,11 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
private static final Log LOG = LogFactory.getLog(ZKPermissionWatcher.class);
// parent node for permissions lists
static final String ACL_NODE = "acl";
- TableAuthManager authManager;
- String aclZNode;
- CountDownLatch initialized = new CountDownLatch(1);
- AtomicReference<List<ZKUtil.NodeAndData>> nodes =
- new AtomicReference<List<ZKUtil.NodeAndData>>(null);
- ExecutorService executor;
+ private final TableAuthManager authManager;
+ private final String aclZNode;
+ private final CountDownLatch initialized = new CountDownLatch(1);
+ private final ExecutorService executor;
+ private Future<?> childrenChangedFuture;
public ZKPermissionWatcher(ZooKeeperWatcher watcher,
TableAuthManager authManager, Configuration conf) {
@@ -83,7 +83,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
List<ZKUtil.NodeAndData> existing =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
if (existing != null) {
- refreshNodes(existing, null);
+ refreshNodes(existing);
}
return null;
}
@@ -127,7 +127,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
try {
List<ZKUtil.NodeAndData> nodes =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
- refreshNodes(nodes, null);
+ refreshNodes(nodes);
} catch (KeeperException ke) {
LOG.error("Error reading data from zookeeper", ke);
// only option is to abort
@@ -185,37 +185,36 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
waitUntilStarted();
if (path.equals(aclZNode)) {
try {
- List<ZKUtil.NodeAndData> nodeList =
+ final List<ZKUtil.NodeAndData> nodeList =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
- while (!nodes.compareAndSet(null, nodeList)) {
- try {
- Thread.sleep(20);
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while setting node list", e);
- Thread.currentThread().interrupt();
+ // preempt any existing nodeChildrenChanged event processing
+ if (childrenChangedFuture != null && !childrenChangedFuture.isDone()) {
+ boolean cancelled = childrenChangedFuture.cancel(true);
+ if (!cancelled) {
+ // task may have finished between our check and attempted cancel, this is fine.
+ if (! childrenChangedFuture.isDone()) {
+ LOG.warn("Could not cancel processing node children changed event, " +
+ "please file a JIRA and attach logs if possible.");
+ }
}
}
+ childrenChangedFuture = asyncProcessNodeUpdate(new Runnable() {
+ @Override
+ public void run() {
+ refreshNodes(nodeList);
+ }
+ });
} catch (KeeperException ke) {
LOG.error("Error reading data from zookeeper for path "+path, ke);
watcher.abort("Zookeeper error get node children for path "+path, ke);
}
- asyncProcessNodeUpdate(new Runnable() {
- // allows subsequent nodeChildrenChanged event to preempt current processing of
- // nodeChildrenChanged event
- @Override
- public void run() {
- List<ZKUtil.NodeAndData> nodeList = nodes.get();
- nodes.set(null);
- refreshNodes(nodeList, nodes);
- }
- });
}
}
- private void asyncProcessNodeUpdate(Runnable runnable) {
+ private Future<?> asyncProcessNodeUpdate(Runnable runnable) {
if (!executor.isShutdown()) {
try {
- executor.submit(runnable);
+ return executor.submit(runnable);
} catch (RejectedExecutionException e) {
if (executor.isShutdown()) {
LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown");
@@ -224,12 +223,13 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
}
}
}
+ return null; // No task launched so there will be nothing to cancel later
}
- private void refreshNodes(List<ZKUtil.NodeAndData> nodes, AtomicReference ref) {
+ private void refreshNodes(List<ZKUtil.NodeAndData> nodes) {
for (ZKUtil.NodeAndData n : nodes) {
- if (ref != null && ref.get() != null) {
- // there is a newer list
+ if (Thread.interrupted()) {
+ // Use Thread.interrupted so that we clear interrupt status
break;
}
if (n.isEmpty()) continue;
[2/2] hbase git commit: HBASE-18628 Fix event pre-emption in
ZKPermWatcher
Posted by ap...@apache.org.
HBASE-18628 Fix event pre-emption in ZKPermWatcher
Instead of using an Atomic Reference to data and aborting when we detect
that new data comes in, use the native cancellation/pre-emption features
of Java Future.
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/362a2924
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/362a2924
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/362a2924
Branch: refs/heads/branch-1
Commit: 362a2924d2a61630b5644a2776cf8a6a96b03954
Parents: 19a80c8
Author: Mike Drob <md...@apache.org>
Authored: Mon Aug 21 16:23:27 2017 -0500
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Aug 23 16:42:33 2017 -0700
----------------------------------------------------------------------
.../security/access/ZKPermissionWatcher.java | 60 ++++++++++----------
1 file changed, 30 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/362a2924/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
index b4bf510..4c37b52 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/ZKPermissionWatcher.java
@@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
@@ -55,12 +56,11 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
private static final Log LOG = LogFactory.getLog(ZKPermissionWatcher.class);
// parent node for permissions lists
static final String ACL_NODE = "acl";
- TableAuthManager authManager;
- String aclZNode;
- CountDownLatch initialized = new CountDownLatch(1);
- AtomicReference<List<ZKUtil.NodeAndData>> nodes =
- new AtomicReference<List<ZKUtil.NodeAndData>>(null);
- ExecutorService executor;
+ private final TableAuthManager authManager;
+ private final String aclZNode;
+ private final CountDownLatch initialized = new CountDownLatch(1);
+ private final ExecutorService executor;
+ private Future<?> childrenChangedFuture;
public ZKPermissionWatcher(ZooKeeperWatcher watcher,
TableAuthManager authManager, Configuration conf) {
@@ -83,7 +83,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
List<ZKUtil.NodeAndData> existing =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
if (existing != null) {
- refreshNodes(existing, null);
+ refreshNodes(existing);
}
return null;
}
@@ -127,7 +127,7 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
try {
List<ZKUtil.NodeAndData> nodes =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
- refreshNodes(nodes, null);
+ refreshNodes(nodes);
} catch (KeeperException ke) {
LOG.error("Error reading data from zookeeper", ke);
// only option is to abort
@@ -185,37 +185,36 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
waitUntilStarted();
if (path.equals(aclZNode)) {
try {
- List<ZKUtil.NodeAndData> nodeList =
+ final List<ZKUtil.NodeAndData> nodeList =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
- while (!nodes.compareAndSet(null, nodeList)) {
- try {
- Thread.sleep(20);
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while setting node list", e);
- Thread.currentThread().interrupt();
+ // preempt any existing nodeChildrenChanged event processing
+ if (childrenChangedFuture != null && !childrenChangedFuture.isDone()) {
+ boolean cancelled = childrenChangedFuture.cancel(true);
+ if (!cancelled) {
+ // task may have finished between our check and attempted cancel, this is fine.
+ if (! childrenChangedFuture.isDone()) {
+ LOG.warn("Could not cancel processing node children changed event, " +
+ "please file a JIRA and attach logs if possible.");
+ }
}
}
+ childrenChangedFuture = asyncProcessNodeUpdate(new Runnable() {
+ @Override
+ public void run() {
+ refreshNodes(nodeList);
+ }
+ });
} catch (KeeperException ke) {
LOG.error("Error reading data from zookeeper for path "+path, ke);
watcher.abort("Zookeeper error get node children for path "+path, ke);
}
- asyncProcessNodeUpdate(new Runnable() {
- // allows subsequent nodeChildrenChanged event to preempt current processing of
- // nodeChildrenChanged event
- @Override
- public void run() {
- List<ZKUtil.NodeAndData> nodeList = nodes.get();
- nodes.set(null);
- refreshNodes(nodeList, nodes);
- }
- });
}
}
- private void asyncProcessNodeUpdate(Runnable runnable) {
+ private Future<?> asyncProcessNodeUpdate(Runnable runnable) {
if (!executor.isShutdown()) {
try {
- executor.submit(runnable);
+ return executor.submit(runnable);
} catch (RejectedExecutionException e) {
if (executor.isShutdown()) {
LOG.warn("aclZNode changed after ZKPermissionWatcher was shutdown");
@@ -224,12 +223,13 @@ public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable
}
}
}
+ return null; // No task launched so there will be nothing to cancel later
}
- private void refreshNodes(List<ZKUtil.NodeAndData> nodes, AtomicReference ref) {
+ private void refreshNodes(List<ZKUtil.NodeAndData> nodes) {
for (ZKUtil.NodeAndData n : nodes) {
- if (ref != null && ref.get() != null) {
- // there is a newer list
+ if (Thread.interrupted()) {
+ // Use Thread.interrupted so that we clear interrupt status
break;
}
if (n.isEmpty()) continue;