You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/02 18:53:41 UTC
[pulsar] branch branch-2.9 updated: [Broker] Fix LeaderElectionService.getCurrentLeader and add support for empheralOwner in MockZooKeeper (#13066)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 3d0ce3a [Broker] Fix LeaderElectionService.getCurrentLeader and add support for empheralOwner in MockZooKeeper (#13066)
3d0ce3a is described below
commit 3d0ce3ac19abb4810a3fa410d292a49a9420d21d
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Dec 2 20:34:35 2021 +0200
[Broker] Fix LeaderElectionService.getCurrentLeader and add support for empheralOwner in MockZooKeeper (#13066)
* Add leader election unit test that uses multiple brokers
* Support empheralOwner in MockZooKeeper
* Use unique sessions for all brokers
* Add failing test case for leader broker information not available on other brokers
* Add test for reading the current leader
* Fix issue in leader election
* Address review feedback: make methods static
* Use unique-session only in MultiBrokerBaseTests
* Move tenant and namespace creation to it's own method
* Improve cleanup
* Add alwaysRun to BeforeClass
* Fix leaks in locking in MockZooKeeper
* Reduce code duplication
* Fix NPE when CreateMode is null
(cherry picked from commit 36a45ee89639bf66a12de23e415d80451deaa4e0)
---
.../apache/pulsar/broker/MultiBrokerBaseTest.java | 153 ++++
.../broker/auth/MockedPulsarServiceBaseTest.java | 12 +-
.../loadbalance/MultiBrokerLeaderElectionTest.java | 71 ++
.../coordination/impl/LeaderElectionImpl.java | 5 +-
.../java/org/apache/zookeeper/MockZooKeeper.java | 860 ++++++++++++---------
.../org/apache/zookeeper/MockZooKeeperSession.java | 16 +-
6 files changed, 728 insertions(+), 389 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
new file mode 100644
index 0000000..f4d106d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.zookeeper.MockZooKeeperSession;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+public abstract class MultiBrokerBaseTest extends MockedPulsarServiceBaseTest {
+ protected List<PulsarService> additionalBrokers;
+ protected List<PulsarAdmin> additionalBrokerAdmins;
+ protected List<PulsarClient> additionalBrokerClients;
+
+ protected int numberOfAdditionalBrokers() {
+ return 2;
+ }
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ public final void setup() throws Exception {
+ super.internalSetup();
+ additionalBrokersSetup();
+ pulsarResourcesSetup();
+ }
+
+ protected void pulsarResourcesSetup() throws PulsarAdminException {
+ admin.tenants().createTenant("public", createDefaultTenantInfo());
+ admin.namespaces()
+ .createNamespace("public/default", getPulsar().getConfiguration().getDefaultNumberOfNamespaceBundles());
+ }
+
+ protected void additionalBrokersSetup() throws Exception {
+ int numberOfAdditionalBrokers = numberOfAdditionalBrokers();
+ additionalBrokers = new ArrayList<>(numberOfAdditionalBrokers);
+ additionalBrokerAdmins = new ArrayList<>(numberOfAdditionalBrokers);
+ additionalBrokerClients = new ArrayList<>(numberOfAdditionalBrokers);
+ for (int i = 0; i < numberOfAdditionalBrokers; i++) {
+ PulsarService pulsarService = createAdditionalBroker(i);
+ additionalBrokers.add(i, pulsarService);
+ PulsarAdminBuilder pulsarAdminBuilder =
+ PulsarAdmin.builder().serviceHttpUrl(pulsarService.getWebServiceAddress() != null
+ ? pulsarService.getWebServiceAddress()
+ : pulsarService.getWebServiceAddressTls());
+ customizeNewPulsarAdminBuilder(pulsarAdminBuilder);
+ additionalBrokerAdmins.add(i, pulsarAdminBuilder.build());
+ additionalBrokerClients.add(i, newPulsarClient(pulsarService.getBrokerServiceUrl(), 0));
+ }
+ }
+
+ protected ServiceConfiguration createConfForAdditionalBroker(int additionalBrokerIndex) {
+ return getDefaultConf();
+ }
+
+ protected PulsarService createAdditionalBroker(int additionalBrokerIndex) throws Exception {
+ return startBroker(createConfForAdditionalBroker(additionalBrokerIndex));
+ }
+
+ @Override
+ protected ZKMetadataStore createLocalMetadataStore() {
+ // use MockZooKeeperSession to provide a unique session id for each instance
+ return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeper));
+ }
+
+ @Override
+ protected ZKMetadataStore createConfigurationMetadataStore() {
+ // use MockZooKeeperSession to provide a unique session id for each instance
+ return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeperGlobal));
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ public final void cleanup() throws Exception {
+ additionalBrokersCleanup();
+ super.internalCleanup();
+ }
+
+ protected void additionalBrokersCleanup() {
+ if (additionalBrokerAdmins != null) {
+ for (PulsarAdmin additionalBrokerAdmin : additionalBrokerAdmins) {
+ additionalBrokerAdmin.close();
+ }
+ additionalBrokerAdmins = null;
+ }
+ if (additionalBrokerClients != null) {
+ for (PulsarClient additionalBrokerClient : additionalBrokerClients) {
+ try {
+ additionalBrokerClient.shutdown();
+ } catch (PulsarClientException e) {
+ // ignore
+ }
+ }
+ additionalBrokerClients = null;
+ }
+ if (additionalBrokers != null) {
+ for (PulsarService pulsarService : additionalBrokers) {
+ try {
+ pulsarService.getConfiguration().setBrokerShutdownTimeoutMs(0L);
+ pulsarService.close();
+ } catch (PulsarServerException e) {
+ // ignore
+ }
+ }
+ additionalBrokers = null;
+ }
+ }
+
+ public final List<PulsarService> getAllBrokers() {
+ List<PulsarService> brokers = new ArrayList<>(numberOfAdditionalBrokers() + 1);
+ brokers.add(getPulsar());
+ brokers.addAll(additionalBrokers);
+ return Collections.unmodifiableList(brokers);
+ }
+
+ public final List<PulsarAdmin> getAllAdmins() {
+ List<PulsarAdmin> admins = new ArrayList<>(numberOfAdditionalBrokers() + 1);
+ admins.add(admin);
+ admins.addAll(additionalBrokerAdmins);
+ return Collections.unmodifiableList(admins);
+ }
+
+ public final List<PulsarClient> getAllClients() {
+ List<PulsarClient> clients = new ArrayList<>(numberOfAdditionalBrokers() + 1);
+ clients.add(pulsarClient);
+ clients.addAll(additionalBrokerClients);
+ return Collections.unmodifiableList(clients);
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 49e7ef3..a3f7166 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -317,8 +317,8 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
// Override default providers with mocked ones
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore();
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(pulsar).createConfigurationMetadataStore();
+ doReturn(createLocalMetadataStore()).when(pulsar).createLocalMetadataStore();
+ doReturn(createConfigurationMetadataStore()).when(pulsar).createConfigurationMetadataStore();
Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar));
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
@@ -332,6 +332,14 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
}
}
+ protected ZKMetadataStore createLocalMetadataStore() {
+ return new ZKMetadataStore(mockZooKeeper);
+ }
+
+ protected ZKMetadataStore createConfigurationMetadataStore() {
+ return new ZKMetadataStore(mockZooKeeperGlobal);
+ }
+
private void mockConfigBrokerInterceptors(PulsarService pulsarService) {
ServiceConfiguration configuration = spy(conf);
Set<String> mockBrokerInterceptors = mock(Set.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
new file mode 100644
index 0000000..0045ddd
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/MultiBrokerLeaderElectionTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.MultiBrokerBaseTest;
+import org.apache.pulsar.broker.PulsarService;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class MultiBrokerLeaderElectionTest extends MultiBrokerBaseTest {
+
+ @Test
+ public void shouldElectOneLeader() {
+ int leaders = 0;
+ for (PulsarService broker : getAllBrokers()) {
+ if (broker.getLeaderElectionService().isLeader()) {
+ leaders++;
+ }
+ }
+ assertEquals(leaders, 1);
+ }
+
+ @Test
+ public void shouldAllBrokersKnowTheLeader() {
+ Awaitility.await().untilAsserted(() -> {
+ for (PulsarService broker : getAllBrokers()) {
+ Optional<LeaderBroker> currentLeader = broker.getLeaderElectionService().getCurrentLeader();
+ assertTrue(currentLeader.isPresent(), "Leader wasn't known on broker " + broker.getBrokerServiceUrl());
+ }
+ });
+ }
+
+ @Test
+ public void shouldAllBrokersBeAbleToGetTheLeader() {
+ Awaitility.await().untilAsserted(() -> {
+ LeaderBroker leader = null;
+ for (PulsarService broker : getAllBrokers()) {
+ Optional<LeaderBroker> currentLeader =
+ broker.getLeaderElectionService().readCurrentLeader().get(1, TimeUnit.SECONDS);
+ assertTrue(currentLeader.isPresent(), "Leader wasn't known on broker " + broker.getBrokerServiceUrl());
+ if (leader != null) {
+ assertEquals(currentLeader.get(), leader,
+ "Different leader on broker " + broker.getBrokerServiceUrl());
+ } else {
+ leader = currentLeader.get();
+ }
+ }
+ });
+ }
+}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
index 29cb4f9..6599c62 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java
@@ -111,7 +111,10 @@ class LeaderElectionImpl<T> implements LeaderElection<T> {
} else {
return tryToBecomeLeader();
}
- });
+ }).thenCompose(leaderElectionState ->
+ // make sure that the cache contains the current leader
+ // so that getLeaderValueIfPresent works on all brokers
+ cache.get(path).thenApply(__ -> leaderElectionState));
}
private synchronized CompletableFuture<LeaderElectionState> handleExistingLeaderValue(GetResult res) {
diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
index c7f677d..cd0c60c 100644
--- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
+++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
@@ -19,32 +19,33 @@
package org.apache.zookeeper;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.DefaultThreadFactory;
-
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiPredicate;
-
import lombok.AllArgsConstructor;
import lombok.Data;
-import org.apache.commons.lang3.tuple.Pair;
+import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.AsyncCallback.Children2Callback;
import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.apache.zookeeper.AsyncCallback.DataCallback;
@@ -53,7 +54,6 @@ import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.client.HostProvider;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.objenesis.Objenesis;
@@ -64,7 +64,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MockZooKeeper extends ZooKeeper {
- private TreeMap<String, Pair<byte[], Integer>> tree;
+ @Data
+ @AllArgsConstructor
+ private static class MockZNode {
+ byte[] content;
+ int version;
+ long ephemeralOwner;
+
+ static MockZNode of(byte[] content, int version, long ephemeralOwner) {
+ return new MockZNode(content, version, ephemeralOwner);
+ }
+ }
+
+ private TreeMap<String, MockZNode> tree;
private SetMultimap<String, Watcher> watchers;
private volatile boolean stopped;
private AtomicReference<KeeperException.Code> alwaysFail;
@@ -78,6 +90,7 @@ public class MockZooKeeper extends ZooKeeper {
private ReentrantLock mutex;
private AtomicLong sequentialIdGenerator;
+ private ThreadLocal<Long> epheralOwnerThreadLocal;
//see details of Objenesis caching - http://objenesis.org/details.html
//see supported jvms - https://github.com/easymock/objenesis/blob/master/SupportedJVMs.md
@@ -85,9 +98,9 @@ public class MockZooKeeper extends ZooKeeper {
public enum Op {
CREATE, GET, SET, GET_CHILDREN, DELETE, EXISTS, SYNC,
- };
+ }
- private class Failure {
+ private static class Failure {
final KeeperException.Code failReturnCode;
final BiPredicate<Op, String> predicate;
@@ -121,14 +134,7 @@ public class MockZooKeeper extends ZooKeeper {
public static MockZooKeeper newInstanceForGlobalZK(ExecutorService executor, int readOpDelayMs) {
try {
- ObjectInstantiator<MockZooKeeper> mockZooKeeperInstantiator =
- new ObjenesisStd().getInstantiatorOf(MockZooKeeper.class);
- MockZooKeeper zk = (MockZooKeeper) mockZooKeeperInstantiator.newInstance();
- zk.init(executor);
- zk.readOpDelayMs = readOpDelayMs;
- zk.mutex = new ReentrantLock();
- zk.sequentialIdGenerator = new AtomicLong();
- return zk;
+ return createMockZooKeeperInstance(executor, readOpDelayMs);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
@@ -138,14 +144,9 @@ public class MockZooKeeper extends ZooKeeper {
public static MockZooKeeper newInstance(ExecutorService executor, int readOpDelayMs) {
try {
- ObjectInstantiator<MockZooKeeper> mockZooKeeperInstantiator = objenesis.getInstantiatorOf(MockZooKeeper.class);
- MockZooKeeper zk = (MockZooKeeper) mockZooKeeperInstantiator.newInstance();
- zk.init(executor);
- zk.readOpDelayMs = readOpDelayMs;
- zk.mutex = new ReentrantLock();
+ MockZooKeeper zk = createMockZooKeeperInstance(executor, readOpDelayMs);
ObjectInstantiator<ClientCnxn> clientCnxnObjectInstantiator = objenesis.getInstantiatorOf(ClientCnxn.class);
Whitebox.setInternalState(zk, "cnxn", clientCnxnObjectInstantiator.newInstance());
- zk.sequentialIdGenerator = new AtomicLong();
return zk;
} catch (RuntimeException e) {
throw e;
@@ -154,6 +155,19 @@ public class MockZooKeeper extends ZooKeeper {
}
}
+ private static MockZooKeeper createMockZooKeeperInstance(ExecutorService executor, int readOpDelayMs) {
+ ObjectInstantiator<MockZooKeeper> mockZooKeeperInstantiator =
+ objenesis.getInstantiatorOf(MockZooKeeper.class);
+ MockZooKeeper zk = mockZooKeeperInstantiator.newInstance();
+ zk.epheralOwnerThreadLocal = new ThreadLocal<>();
+ zk.init(executor);
+ zk.readOpDelayMs = readOpDelayMs;
+ zk.mutex = new ReentrantLock();
+ zk.lockInstance = ThreadLocal.withInitial(zk::createLock);
+ zk.sequentialIdGenerator = new AtomicLong();
+ return zk;
+ }
+
private void init(ExecutorService executor) {
tree = Maps.newTreeMap();
if (executor != null) {
@@ -176,7 +190,8 @@ public class MockZooKeeper extends ZooKeeper {
private MockZooKeeper(String quorum) throws Exception {
// This constructor is never called
- super(quorum, 1, event -> {});
+ super(quorum, 1, event -> {
+ });
assert false;
}
@@ -185,27 +200,68 @@ public class MockZooKeeper extends ZooKeeper {
return States.CONNECTED;
}
+
+ @Slf4j
+ private static class SingleAcquireAndReleaseLock {
+ private final AtomicBoolean acquired = new AtomicBoolean(false);
+ private final Lock lock;
+
+ SingleAcquireAndReleaseLock(Lock lock) {
+ this.lock = lock;
+ }
+
+ public void lock() {
+ if (acquired.compareAndSet(false, true)) {
+ lock.lock();
+ } else {
+ throw new IllegalStateException("Lock was already acquired!");
+ }
+ }
+
+ public void unlockIfNeeded() {
+ if (acquired.compareAndSet(true, false)) {
+ lock.unlock();
+ }
+ }
+ }
+
+ private ThreadLocal<SingleAcquireAndReleaseLock> lockInstance;
+
+ private SingleAcquireAndReleaseLock createLock() {
+ return new SingleAcquireAndReleaseLock(mutex);
+ }
+
+ private void lock() {
+ lockInstance.get().lock();
+ }
+
+ private void unlockIfLocked() {
+ lockInstance.get().unlockIfNeeded();
+ }
+
@Override
public void register(Watcher watcher) {
- mutex.lock();
+ lock();
sessionWatcher = watcher;
- mutex.unlock();
+ unlockIfLocked();
}
@Override
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
throws KeeperException, InterruptedException {
- mutex.lock();
-
final Set<Watcher> toNotifyCreate = Sets.newHashSet();
final Set<Watcher> toNotifyParent = Sets.newHashSet();
final String parent = path.substring(0, path.lastIndexOf("/"));
+ lock();
try {
+
+
maybeThrowProgrammedFailure(Op.CREATE, path);
- if (stopped)
+ if (stopped) {
throw new KeeperException.ConnectionLossException();
+ }
if (tree.containsKey(path)) {
throw new KeeperException.NodeExistsException(path);
@@ -215,16 +271,17 @@ public class MockZooKeeper extends ZooKeeper {
throw new KeeperException.NoNodeException();
}
- if (createMode == CreateMode.EPHEMERAL_SEQUENTIAL || createMode == CreateMode.PERSISTENT_SEQUENTIAL) {
- byte[] parentData = tree.get(parent).getLeft();
- int parentVersion = tree.get(parent).getRight();
+ if (createMode.isSequential()) {
+ MockZNode parentNode = tree.get(parent);
+ int parentVersion = tree.get(parent).getVersion();
path = path + parentVersion;
// Update parent version
- tree.put(parent, Pair.of(parentData, parentVersion + 1));
+ tree.put(parent,
+ MockZNode.of(parentNode.getContent(), parentVersion + 1, parentNode.getEphemeralOwner()));
}
- tree.put(path, Pair.of(data, 0));
+ tree.put(path, MockZNode.of(data, 0, createMode.isEphemeral() ? getEphemeralOwner() : -1L));
toNotifyCreate.addAll(watchers.get(path));
@@ -233,8 +290,7 @@ public class MockZooKeeper extends ZooKeeper {
}
watchers.removeAll(path);
} finally {
-
- mutex.unlock();
+ unlockIfLocked();
}
final String finalPath = path;
@@ -256,68 +312,90 @@ public class MockZooKeeper extends ZooKeeper {
return path;
}
+ protected long getEphemeralOwner() {
+ Long epheralOwner = epheralOwnerThreadLocal.get();
+ if (epheralOwner != null) {
+ return epheralOwner;
+ }
+ return getSessionId();
+ }
+
+ public void overrideEpheralOwner(long epheralOwner) {
+ epheralOwnerThreadLocal.set(epheralOwner);
+ }
+
+ public void removeEpheralOwnerOverride() {
+ epheralOwnerThreadLocal.remove();
+ }
+
@Override
public void create(final String path, final byte[] data, final List<ACL> acl, CreateMode createMode,
- final StringCallback cb, final Object ctx) {
+ final StringCallback cb, final Object ctx) {
executor.execute(() -> {
- mutex.lock();
+ lock();
+ try {
- if (stopped) {
- cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
- return;
- }
+ if (stopped) {
+ cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
+ return;
+ }
- final Set<Watcher> toNotifyCreate = Sets.newHashSet();
- toNotifyCreate.addAll(watchers.get(path));
+ final Set<Watcher> toNotifyCreate = Sets.newHashSet();
+ toNotifyCreate.addAll(watchers.get(path));
- final Set<Watcher> toNotifyParent = Sets.newHashSet();
- final String parent = path.substring(0, path.lastIndexOf("/"));
- if (!parent.isEmpty()) {
- toNotifyParent.addAll(watchers.get(parent));
- }
+ final Set<Watcher> toNotifyParent = Sets.newHashSet();
+ final String parent = path.substring(0, path.lastIndexOf("/"));
+ if (!parent.isEmpty()) {
+ toNotifyParent.addAll(watchers.get(parent));
+ }
- final String name;
- if (createMode != null && createMode.isSequential()) {
- name = path + Long.toString(sequentialIdGenerator.getAndIncrement());
- } else {
- name = path;
- }
+ final String name;
+ if (createMode != null && createMode.isSequential()) {
+ name = path + sequentialIdGenerator.getAndIncrement();
+ } else {
+ name = path;
+ }
- Optional<KeeperException.Code> failure = programmedFailure(Op.CREATE, path);
- if (failure.isPresent()) {
- mutex.unlock();
- cb.processResult(failure.get().intValue(), path, ctx, null);
- } else if (stopped) {
- mutex.unlock();
- cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
- } else if (tree.containsKey(path)) {
- mutex.unlock();
- cb.processResult(KeeperException.Code.NODEEXISTS.intValue(), path, ctx, null);
- } else if (!parent.isEmpty() && !tree.containsKey(parent)) {
- mutex.unlock();
- toNotifyParent.forEach(watcher -> watcher
- .process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent)));
- cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
- } else {
- tree.put(name, Pair.of(data, 0));
- watchers.removeAll(name);
- mutex.unlock();
- cb.processResult(0, path, ctx, name);
-
- triggerPersistentWatches(path, parent, EventType.NodeCreated);
-
- toNotifyCreate.forEach(
- watcher -> watcher.process(
- new WatchedEvent(EventType.NodeCreated,
- KeeperState.SyncConnected,
- name)));
- toNotifyParent.forEach(
- watcher -> watcher.process(
- new WatchedEvent(EventType.NodeChildrenChanged,
- KeeperState.SyncConnected,
- parent)));
+ Optional<KeeperException.Code> failure = programmedFailure(Op.CREATE, path);
+ if (failure.isPresent()) {
+ unlockIfLocked();
+ cb.processResult(failure.get().intValue(), path, ctx, null);
+ } else if (stopped) {
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
+ } else if (tree.containsKey(path)) {
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.NODEEXISTS.intValue(), path, ctx, null);
+ } else if (!parent.isEmpty() && !tree.containsKey(parent)) {
+ unlockIfLocked();
+ toNotifyParent.forEach(watcher -> watcher
+ .process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected,
+ parent)));
+ cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
+ } else {
+ tree.put(name, MockZNode.of(data, 0,
+ createMode != null && createMode.isEphemeral() ? getEphemeralOwner() : -1L));
+ watchers.removeAll(name);
+ unlockIfLocked();
+ cb.processResult(0, path, ctx, name);
+
+ triggerPersistentWatches(path, parent, EventType.NodeCreated);
+
+ toNotifyCreate.forEach(
+ watcher -> watcher.process(
+ new WatchedEvent(EventType.NodeCreated,
+ KeeperState.SyncConnected,
+ name)));
+ toNotifyParent.forEach(
+ watcher -> watcher.process(
+ new WatchedEvent(EventType.NodeChildrenChanged,
+ KeeperState.SyncConnected,
+ parent)));
+ }
+ } finally {
+ unlockIfLocked();
}
});
@@ -325,10 +403,10 @@ public class MockZooKeeper extends ZooKeeper {
@Override
public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException {
- mutex.lock();
+ lock();
try {
maybeThrowProgrammedFailure(Op.GET, path);
- Pair<byte[], Integer> value = tree.get(path);
+ MockZNode value = tree.get(path);
if (value == null) {
throw new KeeperException.NoNodeException(path);
} else {
@@ -336,12 +414,12 @@ public class MockZooKeeper extends ZooKeeper {
watchers.put(path, watcher);
}
if (stat != null) {
- stat.setVersion(value.getRight());
+ applyToStat(value, stat);
}
- return value.getLeft();
+ return value.getContent();
}
} finally {
- mutex.unlock();
+ unlockIfLocked();
}
}
@@ -358,20 +436,18 @@ public class MockZooKeeper extends ZooKeeper {
return;
}
- Pair<byte[], Integer> value;
- mutex.lock();
+ MockZNode value;
+ lock();
try {
value = tree.get(path);
} finally {
- mutex.unlock();
+ unlockIfLocked();
}
if (value == null) {
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null);
} else {
- Stat stat = new Stat();
- stat.setVersion(value.getRight());
- cb.processResult(0, path, ctx, value.getLeft(), stat);
+ cb.processResult(0, path, ctx, value.getContent(), createStatForZNode(value));
}
});
}
@@ -380,31 +456,34 @@ public class MockZooKeeper extends ZooKeeper {
public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object ctx) {
executor.execute(() -> {
checkReadOpDelay();
- mutex.lock();
- Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path);
- if (failure.isPresent()) {
- mutex.unlock();
- cb.processResult(failure.get().intValue(), path, ctx, null, null);
- return;
- } else if (stopped) {
- mutex.unlock();
- cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null);
- return;
- }
-
- Pair<byte[], Integer> value = tree.get(path);
- if (value == null) {
- mutex.unlock();
- cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null);
- } else {
- if (watcher != null) {
- watchers.put(path, watcher);
+ lock();
+ try {
+ Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path);
+ if (failure.isPresent()) {
+ unlockIfLocked();
+ cb.processResult(failure.get().intValue(), path, ctx, null, null);
+ return;
+ } else if (stopped) {
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null);
+ return;
}
- Stat stat = new Stat();
- stat.setVersion(value.getRight());
- mutex.unlock();
- cb.processResult(0, path, ctx, value.getLeft(), stat);
+ MockZNode value = tree.get(path);
+ if (value == null) {
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null);
+ } else {
+ if (watcher != null) {
+ watchers.put(path, watcher);
+ }
+
+ Stat stat = createStatForZNode(value);
+ unlockIfLocked();
+ cb.processResult(0, path, ctx, value.getContent(), stat);
+ }
+ } finally {
+ unlockIfLocked();
}
});
}
@@ -412,44 +491,47 @@ public class MockZooKeeper extends ZooKeeper {
@Override
public void getChildren(final String path, final Watcher watcher, final ChildrenCallback cb, final Object ctx) {
executor.execute(() -> {
- mutex.lock();
- Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path);
- if (failure.isPresent()) {
- mutex.unlock();
- cb.processResult(failure.get().intValue(), path, ctx, null);
- return;
- } else if (stopped) {
- mutex.unlock();
- cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
- return;
- }
+ lock();
+ List<String> children = Lists.newArrayList();
+ try {
+ Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path);
+ if (failure.isPresent()) {
+ unlockIfLocked();
+ cb.processResult(failure.get().intValue(), path, ctx, null);
+ return;
+ } else if (stopped) {
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
+ return;
+ }
- if (!tree.containsKey(path)) {
- mutex.unlock();
- cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
- return;
- }
+ if (!tree.containsKey(path)) {
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
+ return;
+ }
- List<String> children = Lists.newArrayList();
- for (String item : tree.tailMap(path).keySet()) {
- if (!item.startsWith(path)) {
- break;
- } else {
- if (path.length() >= item.length()) {
- continue;
- }
+ for (String item : tree.tailMap(path).keySet()) {
+ if (!item.startsWith(path)) {
+ break;
+ } else {
+ if (path.length() >= item.length()) {
+ continue;
+ }
- String child = item.substring(path.length() + 1);
- if (item.charAt(path.length()) == '/' && !child.contains("/")) {
- children.add(child);
+ String child = item.substring(path.length() + 1);
+ if (item.charAt(path.length()) == '/' && !child.contains("/")) {
+ children.add(child);
+ }
}
}
- }
- if (watcher != null) {
- watchers.put(path, watcher);
+ if (watcher != null) {
+ watchers.put(path, watcher);
+ }
+ } finally {
+ unlockIfLocked();
}
- mutex.unlock();
cb.processResult(0, path, ctx, children);
});
@@ -457,7 +539,7 @@ public class MockZooKeeper extends ZooKeeper {
@Override
public List<String> getChildren(String path, Watcher watcher) throws KeeperException {
- mutex.lock();
+ lock();
try {
maybeThrowProgrammedFailure(Op.GET_CHILDREN, path);
@@ -465,35 +547,31 @@ public class MockZooKeeper extends ZooKeeper {
throw new KeeperException.NoNodeException();
}
- List<String> children = Lists.newArrayList();
- for (String item : tree.tailMap(path).keySet()) {
- if (!item.startsWith(path)) {
- break;
- } else {
- if (path.length() >= item.length()) {
- continue;
- }
+ String firstKey = path.equals("/") ? path : path + "/";
+ String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/'
- String child = item.substring(path.length() + 1);
- if (!child.contains("/")) {
- children.add(child);
- }
- }
- }
+ Set<String> children = new TreeSet<>();
+ tree.subMap(firstKey, false, lastKey, false).forEach((key, value) -> {
+ String relativePath = key.replace(firstKey, "");
+
+ // Only return first-level children
+ String child = relativePath.split("/", 2)[0];
+ children.add(child);
+ });
if (watcher != null) {
watchers.put(path, watcher);
}
- return children;
+ return new ArrayList<>(children);
} finally {
- mutex.unlock();
+ unlockIfLocked();
}
}
@Override
public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException {
- mutex.lock();
+ lock();
try {
maybeThrowProgrammedFailure(Op.GET_CHILDREN, path);
@@ -503,173 +581,153 @@ public class MockZooKeeper extends ZooKeeper {
throw new KeeperException.NoNodeException();
}
- List<String> children = Lists.newArrayList();
- for (String item : tree.tailMap(path).keySet()) {
- if (!item.startsWith(path)) {
- break;
- } else {
- if (path.length() >= item.length()) {
- continue;
- }
- String child = item.substring(path.length());
- if (child.indexOf("/") == 0) {
- child = child.substring(1);
- log.debug("child: '{}'", child);
- if (!child.contains("/")) {
- children.add(child);
- }
- }
- }
- }
- return children;
+ String firstKey = path.equals("/") ? path : path + "/";
+ String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/'
+
+ Set<String> children = new TreeSet<>();
+ tree.subMap(firstKey, false, lastKey, false).forEach((key, value) -> {
+ String relativePath = key.replace(firstKey, "");
+
+ // Only return first-level children
+ String child = relativePath.split("/", 2)[0];
+ children.add(child);
+ });
+
+ return new ArrayList<>(children);
} finally {
- mutex.unlock();
+ unlockIfLocked();
}
}
@Override
public void getChildren(final String path, boolean watcher, final Children2Callback cb, final Object ctx) {
executor.execute(() -> {
- mutex.lock();
+ Set<String> children = new TreeSet<>();
+ lock();
+ try {
+ Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path);
+ if (failure.isPresent()) {
+ unlockIfLocked();
+ cb.processResult(failure.get().intValue(), path, ctx, null, null);
+ return;
+ } else if (stopped) {
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null);
+ return;
+ } else if (!tree.containsKey(path)) {
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null);
+ return;
+ }
- Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path);
- if (failure.isPresent()) {
- mutex.unlock();
- cb.processResult(failure.get().intValue(), path, ctx, null, null);
- return;
- } else if (stopped) {
- mutex.unlock();
- cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null);
- return;
- } else if (!tree.containsKey(path)) {
- mutex.unlock();
- cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null);
- return;
- }
+ String firstKey = path.equals("/") ? path : path + "/";
+ String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/'
- log.debug("getChildren path={}", path);
- List<String> children = Lists.newArrayList();
- for (String item : tree.tailMap(path).keySet()) {
- log.debug("Checking path {}", item);
- if (!item.startsWith(path)) {
- break;
- } else if (item.equals(path)) {
- continue;
- } else {
- String child = item.substring(path.length());
- if (child.indexOf("/") == 0) {
- child = child.substring(1);
- log.debug("child: '{}'", child);
- if (!child.contains("/")) {
- children.add(child);
- }
- }
- }
- }
+ tree.subMap(firstKey, false, lastKey, false).forEach((key, value) -> {
+ String relativePath = key.replace(firstKey, "");
- log.debug("getChildren done path={} result={}", path, children);
- mutex.unlock();
- cb.processResult(0, path, ctx, children, new Stat());
+ // Only return first-level children
+ String child = relativePath.split("/", 2)[0];
+ children.add(child);
+ });
+ } finally {
+ unlockIfLocked();
+ }
+ cb.processResult(0, path, ctx, new ArrayList<>(children), new Stat());
});
}
@Override
public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
- mutex.lock();
+ lock();
try {
maybeThrowProgrammedFailure(Op.EXISTS, path);
- if (stopped)
+ if (stopped) {
throw new KeeperException.ConnectionLossException();
+ }
if (tree.containsKey(path)) {
- Stat stat = new Stat();
- stat.setVersion(tree.get(path).getRight());
- return stat;
+ return createStatForZNode(tree.get(path));
} else {
return null;
}
} finally {
- mutex.unlock();
+ unlockIfLocked();
+ }
+ }
+
+ private static Stat createStatForZNode(MockZNode zNode) {
+ return applyToStat(zNode, new Stat());
+ }
+
+ private static Stat applyToStat(MockZNode zNode, Stat stat) {
+ stat.setVersion(zNode.getVersion());
+ if (zNode.getEphemeralOwner() != -1L) {
+ stat.setEphemeralOwner(zNode.getEphemeralOwner());
}
+ return stat;
}
@Override
public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
- mutex.lock();
+ lock();
try {
maybeThrowProgrammedFailure(Op.EXISTS, path);
- if (stopped)
+ if (stopped) {
throw new KeeperException.ConnectionLossException();
+ }
if (watcher != null) {
watchers.put(path, watcher);
}
if (tree.containsKey(path)) {
- Stat stat = new Stat();
- stat.setVersion(tree.get(path).getRight());
- return stat;
+ return createStatForZNode(tree.get(path));
} else {
return null;
}
} finally {
- mutex.unlock();
+ unlockIfLocked();
}
}
@Override
public void exists(String path, boolean watch, StatCallback cb, Object ctx) {
- executor.execute(() -> {
- mutex.lock();
- Optional<KeeperException.Code> failure = programmedFailure(Op.EXISTS, path);
- if (failure.isPresent()) {
- mutex.unlock();
- cb.processResult(failure.get().intValue(), path, ctx, null);
- return;
- } else if (stopped) {
- mutex.unlock();
- cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
- return;
- }
-
- if (tree.containsKey(path)) {
- mutex.unlock();
- cb.processResult(0, path, ctx, new Stat());
- } else {
- mutex.unlock();
- cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
- }
- });
+ exists(path, null, cb, ctx);
}
@Override
public void exists(String path, Watcher watcher, StatCallback cb, Object ctx) {
executor.execute(() -> {
- mutex.lock();
- Optional<KeeperException.Code> failure = programmedFailure(Op.EXISTS, path);
- if (failure.isPresent()) {
- mutex.unlock();
- cb.processResult(failure.get().intValue(), path, ctx, null);
- return;
- } else if (stopped) {
- mutex.unlock();
- cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
- return;
- }
+ lock();
+ try {
+ Optional<KeeperException.Code> failure = programmedFailure(Op.EXISTS, path);
+ if (failure.isPresent()) {
+ unlockIfLocked();
+ cb.processResult(failure.get().intValue(), path, ctx, null);
+ return;
+ } else if (stopped) {
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
+ return;
+ }
- if (watcher != null) {
- watchers.put(path, watcher);
- }
+ if (watcher != null) {
+ watchers.put(path, watcher);
+ }
- if (tree.containsKey(path)) {
- mutex.unlock();
- cb.processResult(0, path, ctx, new Stat());
- } else {
- mutex.unlock();
- cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
+ if (tree.containsKey(path)) {
+ unlockIfLocked();
+ cb.processResult(0, path, ctx, new Stat());
+ } else {
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
+ }
+ } finally {
+ unlockIfLocked();
}
});
}
@@ -693,11 +751,10 @@ public class MockZooKeeper extends ZooKeeper {
@Override
public Stat setData(final String path, byte[] data, int version) throws KeeperException, InterruptedException {
- mutex.lock();
-
final Set<Watcher> toNotify = Sets.newHashSet();
- int newVersion;
+ MockZNode newZNode;
+ lock();
try {
maybeThrowProgrammedFailure(Op.SET, path);
@@ -709,21 +766,22 @@ public class MockZooKeeper extends ZooKeeper {
throw new KeeperException.NoNodeException();
}
- int currentVersion = tree.get(path).getRight();
+ MockZNode mockZNode = tree.get(path);
+ int currentVersion = mockZNode.getVersion();
// Check version
if (version != -1 && version != currentVersion) {
throw new KeeperException.BadVersionException(path);
}
- newVersion = currentVersion + 1;
log.debug("[{}] Updating -- current version: {}", path, currentVersion);
- tree.put(path, Pair.of(data, newVersion));
+ newZNode = MockZNode.of(data, currentVersion + 1, mockZNode.getEphemeralOwner());
+ tree.put(path, newZNode);
toNotify.addAll(watchers.get(path));
watchers.removeAll(path);
} finally {
- mutex.unlock();
+ unlockIfLocked();
}
executor.execute(() -> {
@@ -733,9 +791,7 @@ public class MockZooKeeper extends ZooKeeper {
.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path)));
});
- Stat stat = new Stat();
- stat.setVersion(newVersion);
- return stat;
+ return createStatForZNode(newZNode);
}
@Override
@@ -747,43 +803,45 @@ public class MockZooKeeper extends ZooKeeper {
executor.execute(() -> {
final Set<Watcher> toNotify = Sets.newHashSet();
+ Stat stat;
+ lock();
+ try {
- mutex.lock();
+ Optional<KeeperException.Code> failure = programmedFailure(Op.SET, path);
+ if (failure.isPresent()) {
+ unlockIfLocked();
+ cb.processResult(failure.get().intValue(), path, ctx, null);
+ return;
+ } else if (stopped) {
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
+ return;
+ }
- Optional<KeeperException.Code> failure = programmedFailure(Op.SET, path);
- if (failure.isPresent()) {
- mutex.unlock();
- cb.processResult(failure.get().intValue(), path, ctx, null);
- return;
- } else if (stopped) {
- mutex.unlock();
- cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
- return;
- }
+ if (!tree.containsKey(path)) {
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
+ return;
+ }
- if (!tree.containsKey(path)) {
- mutex.unlock();
- cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
- return;
- }
+ MockZNode mockZNode = tree.get(path);
+ int currentVersion = mockZNode.getVersion();
- int currentVersion = tree.get(path).getRight();
+ // Check version
+ if (version != -1 && version != currentVersion) {
+ log.debug("[{}] Current version: {} -- Expected: {}", path, currentVersion, version);
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx, null);
+ return;
+ }
- // Check version
- if (version != -1 && version != currentVersion) {
- log.debug("[{}] Current version: {} -- Expected: {}", path, currentVersion, version);
- mutex.unlock();
- cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx, null);
- return;
+ log.debug("[{}] Updating -- current version: {}", path, currentVersion);
+ MockZNode newZNode = MockZNode.of(data, currentVersion + 1, mockZNode.getEphemeralOwner());
+ tree.put(path, newZNode);
+ stat = createStatForZNode(newZNode);
+ } finally {
+ unlockIfLocked();
}
-
- int newVersion = currentVersion + 1;
- log.debug("[{}] Updating -- current version: {}", path, currentVersion);
- tree.put(path, Pair.of(data, newVersion));
- Stat stat = new Stat();
- stat.setVersion(newVersion);
-
- mutex.unlock();
cb.processResult(0, path, ctx, stat);
toNotify.addAll(watchers.get(path));
@@ -805,7 +863,7 @@ public class MockZooKeeper extends ZooKeeper {
final Set<Watcher> toNotifyParent;
final String parent;
- mutex.lock();
+ lock();
try {
if (stopped) {
throw new KeeperException.ConnectionLossException();
@@ -816,7 +874,7 @@ public class MockZooKeeper extends ZooKeeper {
}
if (version != -1) {
- int currentVersion = tree.get(path).getRight();
+ int currentVersion = tree.get(path).getVersion();
if (version != currentVersion) {
throw new KeeperException.BadVersionException(path);
}
@@ -835,7 +893,7 @@ public class MockZooKeeper extends ZooKeeper {
watchers.removeAll(path);
} finally {
- mutex.unlock();
+ unlockIfLocked();
}
executor.execute(() -> {
@@ -857,50 +915,55 @@ public class MockZooKeeper extends ZooKeeper {
@Override
public void delete(final String path, int version, final VoidCallback cb, final Object ctx) {
Runnable r = () -> {
- mutex.lock();
- final Set<Watcher> toNotifyDelete = Sets.newHashSet();
- toNotifyDelete.addAll(watchers.get(path));
-
- final Set<Watcher> toNotifyParent = Sets.newHashSet();
- final String parent = path.substring(0, path.lastIndexOf("/"));
- if (!parent.isEmpty()) {
- toNotifyParent.addAll(watchers.get(parent));
- }
- watchers.removeAll(path);
+ lock();
+ try {
+ final Set<Watcher> toNotifyDelete = Sets.newHashSet();
+ toNotifyDelete.addAll(watchers.get(path));
- Optional<KeeperException.Code> failure = programmedFailure(Op.DELETE, path);
- if (failure.isPresent()) {
- mutex.unlock();
- cb.processResult(failure.get().intValue(), path, ctx);
- } else if (stopped) {
- mutex.unlock();
- cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx);
- } else if (!tree.containsKey(path)) {
- mutex.unlock();
- cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx);
- } else if (hasChildren(path)) {
- mutex.unlock();
- cb.processResult(KeeperException.Code.NOTEMPTY.intValue(), path, ctx);
- } else {
- if (version != -1) {
- int currentVersion = tree.get(path).getRight();
- if (version != currentVersion) {
- mutex.unlock();
- cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx);
- return;
- }
+ final Set<Watcher> toNotifyParent = Sets.newHashSet();
+ final String parent = path.substring(0, path.lastIndexOf("/"));
+ if (!parent.isEmpty()) {
+ toNotifyParent.addAll(watchers.get(parent));
}
+ watchers.removeAll(path);
+
+ Optional<KeeperException.Code> failure = programmedFailure(Op.DELETE, path);
+ if (failure.isPresent()) {
+ unlockIfLocked();
+ cb.processResult(failure.get().intValue(), path, ctx);
+ } else if (stopped) {
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx);
+ } else if (!tree.containsKey(path)) {
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx);
+ } else if (hasChildren(path)) {
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.NOTEMPTY.intValue(), path, ctx);
+ } else {
+ if (version != -1) {
+ int currentVersion = tree.get(path).getVersion();
+ if (version != currentVersion) {
+ unlockIfLocked();
+ cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx);
+ return;
+ }
+ }
- tree.remove(path);
+ tree.remove(path);
- mutex.unlock();
- cb.processResult(0, path, ctx);
+ unlockIfLocked();
+ cb.processResult(0, path, ctx);
- toNotifyDelete.forEach(watcher -> watcher
- .process(new WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected, path)));
- toNotifyParent.forEach(watcher -> watcher
- .process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent)));
- triggerPersistentWatches(path, parent, EventType.NodeDeleted);
+ toNotifyDelete.forEach(watcher -> watcher
+ .process(new WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected, path)));
+ toNotifyParent.forEach(watcher -> watcher
+ .process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected,
+ parent)));
+ triggerPersistentWatches(path, parent, EventType.NodeDeleted);
+ }
+ } finally {
+ unlockIfLocked();
}
};
@@ -908,7 +971,6 @@ public class MockZooKeeper extends ZooKeeper {
executor.execute(r);
} catch (RejectedExecutionException ree) {
cb.processResult(KeeperException.Code.SESSIONEXPIRED.intValue(), path, ctx);
- return;
}
}
@@ -917,30 +979,62 @@ public class MockZooKeeper extends ZooKeeper {
public void multi(Iterable<org.apache.zookeeper.Op> ops, AsyncCallback.MultiCallback cb, Object ctx) {
try {
List<OpResult> res = multi(ops);
- cb.processResult(KeeperException.Code.OK.intValue(), (String)null, ctx, res);
+ cb.processResult(KeeperException.Code.OK.intValue(), null, ctx, res);
} catch (Exception e) {
- cb.processResult(KeeperException.Code.APIERROR.intValue(), (String)null, ctx, null);
+ cb.processResult(KeeperException.Code.APIERROR.intValue(), null, ctx, null);
}
}
@Override
public List<OpResult> multi(Iterable<org.apache.zookeeper.Op> ops) throws InterruptedException, KeeperException {
List<OpResult> res = new ArrayList<>();
- for (org.apache.zookeeper.Op op : ops) {
- switch (op.getType()) {
- case ZooDefs.OpCode.create:
- this.create(op.getPath(), ((org.apache.zookeeper.Op.Create)op).data, null, null);
- res.add(new OpResult.CreateResult(op.getPath()));
- break;
- case ZooDefs.OpCode.delete:
- this.delete(op.getPath(), -1);
- res.add(new OpResult.DeleteResult());
- break;
- case ZooDefs.OpCode.setData:
- this.create(op.getPath(), ((org.apache.zookeeper.Op.Create)op).data, null, null);
- res.add(new OpResult.SetDataResult(null));
- break;
- default:
+ try {
+ for (org.apache.zookeeper.Op op : ops) {
+ switch (op.getType()) {
+ case ZooDefs.OpCode.create: {
+ org.apache.zookeeper.Op.Create opc = ((org.apache.zookeeper.Op.Create) op);
+ CreateMode cm = CreateMode.fromFlag(opc.flags);
+ String path = this.create(op.getPath(), opc.data, null, cm);
+ res.add(new OpResult.CreateResult(path));
+ break;
+ }
+ case ZooDefs.OpCode.delete:
+ this.delete(op.getPath(), Whitebox.getInternalState(op, "version"));
+ res.add(new OpResult.DeleteResult());
+ break;
+ case ZooDefs.OpCode.setData: {
+ Stat stat = this.setData(op.getPath(), Whitebox.getInternalState(op, "data"),
+ Whitebox.getInternalState(op, "version"));
+ res.add(new OpResult.SetDataResult(stat));
+ break;
+ }
+ case ZooDefs.OpCode.getChildren: {
+ try {
+ List<String> children = this.getChildren(op.getPath(), null);
+ res.add(new OpResult.GetChildrenResult(children));
+ } catch (KeeperException e) {
+ res.add(new OpResult.ErrorResult(e.code().intValue()));
+ }
+ break;
+ }
+ case ZooDefs.OpCode.getData: {
+ Stat stat = new Stat();
+ try {
+ byte[] payload = this.getData(op.getPath(), null, stat);
+ res.add(new OpResult.GetDataResult(payload, stat));
+ } catch (KeeperException e) {
+ res.add(new OpResult.ErrorResult(e.code().intValue()));
+ }
+ break;
+ }
+ default:
+ }
+ }
+ } catch (KeeperException e) {
+ res.add(new OpResult.ErrorResult(e.code().intValue()));
+ int total = Iterables.size(ops);
+ for (int i = res.size(); i < total; i++) {
+ res.add(new OpResult.ErrorResult(KeeperException.Code.RUNTIMEINCONSISTENCY.intValue()));
}
}
return res;
@@ -972,7 +1066,7 @@ public class MockZooKeeper extends ZooKeeper {
}
public void shutdown() throws InterruptedException {
- mutex.lock();
+ lock();
try {
stopped = true;
tree.clear();
@@ -984,7 +1078,7 @@ public class MockZooKeeper extends ZooKeeper {
log.error("MockZooKeeper shutdown had error", ex);
}
} finally {
- mutex.unlock();
+ unlockIfLocked();
}
}
diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
index 499da0e..a33d448 100644
--- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
+++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeperSession.java
@@ -44,7 +44,7 @@ public class MockZooKeeperSession extends ZooKeeper {
private static final Objenesis objenesis = new ObjenesisStd();
- private static final AtomicInteger sessionIdGenerator = new AtomicInteger(0);
+ private static final AtomicInteger sessionIdGenerator = new AtomicInteger(1000);
public static MockZooKeeperSession newInstance(MockZooKeeper mockZooKeeper) {
ObjectInstantiator<MockZooKeeperSession> instantiator = objenesis.getInstantiatorOf(MockZooKeeperSession.class);
@@ -80,13 +80,23 @@ public class MockZooKeeperSession extends ZooKeeper {
@Override
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
throws KeeperException, InterruptedException {
- return mockZooKeeper.create(path, data, acl, createMode);
+ try {
+ mockZooKeeper.overrideEpheralOwner(getSessionId());
+ return mockZooKeeper.create(path, data, acl, createMode);
+ } finally {
+ mockZooKeeper.removeEpheralOwnerOverride();
+ }
}
@Override
public void create(final String path, final byte[] data, final List<ACL> acl, CreateMode createMode,
final AsyncCallback.StringCallback cb, final Object ctx) {
- mockZooKeeper.create(path, data, acl, createMode, cb, ctx);
+ try {
+ mockZooKeeper.overrideEpheralOwner(getSessionId());
+ mockZooKeeper.create(path, data, acl, createMode, cb, ctx);
+ } finally {
+ mockZooKeeper.removeEpheralOwnerOverride();
+ }
}
@Override