You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/01 10:23:54 UTC
[pulsar] branch branch-2.9 updated: [fix][test] Catch exception when update data in mockZookeeper (#16473)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 52ac0767cbb [fix][test] Catch exception when update data in mockZookeeper (#16473)
52ac0767cbb is described below
commit 52ac0767cbb161038c37f4634174168c56064f17
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Tue Jul 12 22:39:38 2022 +0800
[fix][test] Catch exception when update data in mockZookeeper (#16473)
(cherry picked from commit 4df2593a62606d3fcfd5ebab8923870814832569)
---
.../java/org/apache/zookeeper/MockZooKeeper.java | 161 ++++++++++++---------
1 file changed, 94 insertions(+), 67 deletions(-)
diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
index cd0c60c0087..b9325c16102 100644
--- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
+++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
@@ -334,8 +334,8 @@ public class MockZooKeeper extends ZooKeeper {
executor.execute(() -> {
- lock();
try {
+ lock();
if (stopped) {
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
@@ -394,6 +394,9 @@ public class MockZooKeeper extends ZooKeeper {
KeeperState.SyncConnected,
parent)));
}
+ } catch (Throwable ex) {
+ log.error("create path : {} error", path, ex);
+ cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null);
} finally {
unlockIfLocked();
}
@@ -426,28 +429,33 @@ public class MockZooKeeper extends ZooKeeper {
@Override
public void getData(final String path, boolean watch, final DataCallback cb, final Object ctx) {
executor.execute(() -> {
- checkReadOpDelay();
- Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path);
- if (failure.isPresent()) {
- cb.processResult(failure.get().intValue(), path, ctx, null, null);
- return;
- } else if (stopped) {
- cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null);
- return;
- }
-
- MockZNode value;
- lock();
try {
- value = tree.get(path);
- } finally {
- unlockIfLocked();
- }
+ checkReadOpDelay();
+ Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path);
+ if (failure.isPresent()) {
+ cb.processResult(failure.get().intValue(), path, ctx, null, null);
+ return;
+ } else if (stopped) {
+ cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null);
+ return;
+ }
- if (value == null) {
- cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null);
- } else {
- cb.processResult(0, path, ctx, value.getContent(), createStatForZNode(value));
+ MockZNode value;
+ lock();
+ try {
+ value = tree.get(path);
+ } finally {
+ unlockIfLocked();
+ }
+
+ if (value == null) {
+ cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null);
+ } else {
+ cb.processResult(0, path, ctx, value.getContent(), createStatForZNode(value));
+ }
+ } catch (Throwable ex) {
+ log.error("get data : {} error", path, ex);
+ cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null, null);
}
});
}
@@ -456,8 +464,8 @@ public class MockZooKeeper extends ZooKeeper {
public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object ctx) {
executor.execute(() -> {
checkReadOpDelay();
- lock();
try {
+ lock();
Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path);
if (failure.isPresent()) {
unlockIfLocked();
@@ -482,6 +490,9 @@ public class MockZooKeeper extends ZooKeeper {
unlockIfLocked();
cb.processResult(0, path, ctx, value.getContent(), stat);
}
+ } catch (Throwable ex) {
+ log.error("get data : {} error", path, ex);
+ cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null, null);
} finally {
unlockIfLocked();
}
@@ -491,9 +502,9 @@ public class MockZooKeeper extends ZooKeeper {
@Override
public void getChildren(final String path, final Watcher watcher, final ChildrenCallback cb, final Object ctx) {
executor.execute(() -> {
- lock();
List<String> children = Lists.newArrayList();
try {
+ lock();
Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path);
if (failure.isPresent()) {
unlockIfLocked();
@@ -529,11 +540,14 @@ public class MockZooKeeper extends ZooKeeper {
if (watcher != null) {
watchers.put(path, watcher);
}
+ cb.processResult(0, path, ctx, children);
+ } catch (Throwable ex) {
+ log.error("get children : {} error", path, ex);
+ cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null);
} finally {
unlockIfLocked();
}
- cb.processResult(0, path, ctx, children);
});
}
@@ -603,8 +617,8 @@ public class MockZooKeeper extends ZooKeeper {
public void getChildren(final String path, boolean watcher, final Children2Callback cb, final Object ctx) {
executor.execute(() -> {
Set<String> children = new TreeSet<>();
- lock();
try {
+ lock();
Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path);
if (failure.isPresent()) {
unlockIfLocked();
@@ -630,10 +644,13 @@ public class MockZooKeeper extends ZooKeeper {
String child = relativePath.split("/", 2)[0];
children.add(child);
});
+ cb.processResult(0, path, ctx, new ArrayList<>(children), new Stat());
+ } catch (Throwable ex) {
+ log.error("get children : {} error", path, ex);
+ cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null, null);
} finally {
unlockIfLocked();
}
- cb.processResult(0, path, ctx, new ArrayList<>(children), new Stat());
});
}
@@ -702,8 +719,8 @@ public class MockZooKeeper extends ZooKeeper {
@Override
public void exists(String path, Watcher watcher, StatCallback cb, Object ctx) {
executor.execute(() -> {
- lock();
try {
+ lock();
Optional<KeeperException.Code> failure = programmedFailure(Op.EXISTS, path);
if (failure.isPresent()) {
unlockIfLocked();
@@ -726,6 +743,9 @@ public class MockZooKeeper extends ZooKeeper {
unlockIfLocked();
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
}
+ } catch (Throwable ex) {
+ log.error("exist : {} error", path, ex);
+ cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null);
} finally {
unlockIfLocked();
}
@@ -802,56 +822,60 @@ public class MockZooKeeper extends ZooKeeper {
}
executor.execute(() -> {
- final Set<Watcher> toNotify = Sets.newHashSet();
- Stat stat;
- lock();
try {
+ final Set<Watcher> toNotify = Sets.newHashSet();
+ Stat stat;
+ lock();
+ try {
+ Optional<KeeperException.Code> failure = programmedFailure(Op.SET, path);
+ if (failure.isPresent()) {
+ unlockIfLocked();
+ cb.processResult(failure.get().intValue(), path, ctx, null);
+ return;
+ } else if (stopped) {
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
+ return;
+ }
- Optional<KeeperException.Code> failure = programmedFailure(Op.SET, path);
- if (failure.isPresent()) {
- unlockIfLocked();
- cb.processResult(failure.get().intValue(), path, ctx, null);
- return;
- } else if (stopped) {
- unlockIfLocked();
- cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
- return;
- }
+ if (!tree.containsKey(path)) {
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
+ return;
+ }
- if (!tree.containsKey(path)) {
- unlockIfLocked();
- cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
- return;
- }
+ MockZNode mockZNode = tree.get(path);
+ int currentVersion = mockZNode.getVersion();
- MockZNode mockZNode = tree.get(path);
- int currentVersion = mockZNode.getVersion();
+ // Check version
+ if (version != -1 && version != currentVersion) {
+ log.debug("[{}] Current version: {} -- Expected: {}", path, currentVersion, version);
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx, null);
+ return;
+ }
- // Check version
- if (version != -1 && version != currentVersion) {
- log.debug("[{}] Current version: {} -- Expected: {}", path, currentVersion, version);
+ log.debug("[{}] Updating -- current version: {}", path, currentVersion);
+ MockZNode newZNode = MockZNode.of(data, currentVersion + 1, mockZNode.getEphemeralOwner());
+ tree.put(path, newZNode);
+ stat = createStatForZNode(newZNode);
+ } finally {
unlockIfLocked();
- cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx, null);
- return;
}
+ cb.processResult(0, path, ctx, stat);
- log.debug("[{}] Updating -- current version: {}", path, currentVersion);
- MockZNode newZNode = MockZNode.of(data, currentVersion + 1, mockZNode.getEphemeralOwner());
- tree.put(path, newZNode);
- stat = createStatForZNode(newZNode);
- } finally {
- unlockIfLocked();
- }
- cb.processResult(0, path, ctx, stat);
+ toNotify.addAll(watchers.get(path));
+ watchers.removeAll(path);
- toNotify.addAll(watchers.get(path));
- watchers.removeAll(path);
+ for (Watcher watcher : toNotify) {
+ watcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path));
+ }
- for (Watcher watcher : toNotify) {
- watcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path));
+ triggerPersistentWatches(path, null, EventType.NodeDataChanged);
+ } catch (Throwable ex) {
+ log.error("Update data : {} error", path, ex);
+ cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null);
}
-
- triggerPersistentWatches(path, null, EventType.NodeDataChanged);
});
}
@@ -915,8 +939,8 @@ public class MockZooKeeper extends ZooKeeper {
@Override
public void delete(final String path, int version, final VoidCallback cb, final Object ctx) {
Runnable r = () -> {
- lock();
try {
+ lock();
final Set<Watcher> toNotifyDelete = Sets.newHashSet();
toNotifyDelete.addAll(watchers.get(path));
@@ -962,6 +986,9 @@ public class MockZooKeeper extends ZooKeeper {
parent)));
triggerPersistentWatches(path, parent, EventType.NodeDeleted);
}
+ } catch (Throwable ex) {
+ log.error("delete path : {} error", path, ex);
+ cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx);
} finally {
unlockIfLocked();
}