You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/03/12 23:29:44 UTC
[bookkeeper] branch branch-4.6 updated: handle zookeeper session
expire in zk ledger manager
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-4.6
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.6 by this push:
new d0c9a9a handle zookeeper session expire in zk ledger manager
d0c9a9a is described below
commit d0c9a9a2c7d6e8a606b1edaf71936b7fa1df95eb
Author: Sijie Guo <si...@apache.org>
AuthorDate: Sat Feb 17 15:51:13 2018 +0800
handle zookeeper session expire in zk ledger manager
Descriptions of the changes in this PR:
cherry-pick twitter/bookkeeperdfcda5cc2efdc03db99fe126499f8e3347f50484
This includes a test case to test `AbstractZkLedgerManager`, including
1) create/delete/read/write ledger metadata
2) register/unregister listeners
3) handling various watched events
This also fixes a couple of cases in the `AbstractZkLedgerManager` what is caught with test case.
Author: Sijie Guo <si...@apache.org>
Author: Sijie Guo <si...@twitter.com>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Venkateswararao Jujjuri (JV) <None>
This closes #1130 from sijie/fix_session_expires
---
.../apache/bookkeeper/client/LedgerMetadata.java | 2 +
.../bookkeeper/meta/AbstractZkLedgerManager.java | 23 +-
.../apache/bookkeeper/versioning/LongVersion.java | 3 +
.../meta/AbstractZkLedgerManagerTest.java | 883 +++++++++++++++++++++
.../org/apache/bookkeeper/test/TestCallbacks.java | 10 +-
.../zookeeper/MockZooKeeperTestCase.java | 221 ++++++
6 files changed, 1135 insertions(+), 7 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
index 180c59e..8d747a8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -39,6 +39,7 @@ import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import lombok.EqualsAndHashCode;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
@@ -52,6 +53,7 @@ import org.slf4j.LoggerFactory;
*
* <p>It provides parsing and serialization methods of such metadata.
*/
+@EqualsAndHashCode
public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMetadata {
static final Logger LOG = LoggerFactory.getLogger(LedgerMetadata.class);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index c93a8a9..9a0cb81 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.bookkeeper.meta;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
@@ -64,7 +65,8 @@ abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
private final static Logger LOG = LoggerFactory.getLogger(AbstractZkLedgerManager.class);
- static int ZK_CONNECT_BACKOFF_MS = 200;
+ @VisibleForTesting
+ static final int ZK_CONNECT_BACKOFF_MS = 200;
protected final AbstractConfiguration conf;
protected final ZooKeeper zk;
@@ -125,6 +127,13 @@ abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
LOG.debug("Removed ledger metadata listener set on ledger {} as its ledger is deleted : {}",
ledgerId, listenerSet.size());
}
+ // notify `null` as indicator that a ledger is deleted
+ // make this behavior consistent with `NodeDeleted` watched event.
+ synchronized (listenerSet) {
+ for (LedgerMetadataListener listener : listenerSet) {
+ listener.onChanged(ledgerId, null);
+ }
+ }
}
} else {
LOG.warn("Failed on read ledger metadata of ledger {} : {}", ledgerId, rc);
@@ -175,8 +184,16 @@ abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
@Override
public void process(WatchedEvent event) {
- LOG.info("Received watched event {} from zookeeper based ledger manager.", event);
+ LOG.debug("Received watched event {} from zookeeper based ledger manager.", event);
if (Event.EventType.None == event.getType()) {
+ if (Event.KeeperState.Expired == event.getState()) {
+ LOG.info("ZooKeeper client expired on ledger manager.");
+ Set<Long> keySet = new HashSet<Long>(listeners.keySet());
+ for (Long lid : keySet) {
+ scheduler.submit(new ReadLedgerMetadataTask(lid));
+ LOG.info("Re-read ledger metadata for {} after zookeeper session expired.", lid);
+ }
+ }
return;
}
String path = event.getPath();
@@ -401,7 +418,7 @@ abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
public void writeLedgerMetadata(final long ledgerId, final LedgerMetadata metadata,
final GenericCallback<Void> cb) {
Version v = metadata.getVersion();
- if (Version.NEW == v || !(v instanceof LongVersion)) {
+ if (!(v instanceof LongVersion)) {
cb.operationComplete(BKException.Code.MetadataVersionException, null);
return;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/LongVersion.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/LongVersion.java
index 4ec4388..862977a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/LongVersion.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/LongVersion.java
@@ -17,9 +17,12 @@
*/
package org.apache.bookkeeper.versioning;
+import lombok.EqualsAndHashCode;
+
/**
* A version object holds integer version.
*/
+@EqualsAndHashCode
public class LongVersion implements Version {
protected long version;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
new file mode 100644
index 0000000..b7c8ef5
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
@@ -0,0 +1,883 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.bookkeeper.meta.AbstractZkLedgerManager.ZK_CONNECT_BACKOFF_MS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.common.testing.executors.MockExecutorController;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
+import org.apache.bookkeeper.test.TestCallbacks.GenericCallbackFuture;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.versioning.LongVersion;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.zookeeper.MockZooKeeperTestCase;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.data.Stat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Unit test of {@link AbstractZkLedgerManager}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ AbstractZkLedgerManager.class, ZkUtils.class })
+public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
+
+ private ClientConfiguration conf;
+ private AbstractZkLedgerManager ledgerManager;
+ private ScheduledExecutorService scheduler;
+ private MockExecutorController schedulerController;
+ private LedgerMetadata metadata;
+
+ @Before
+ public void setup() throws Exception {
+ PowerMockito.mockStatic(Executors.class);
+
+ super.setup();
+
+ this.scheduler = PowerMockito.mock(ScheduledExecutorService.class);
+ this.schedulerController = new MockExecutorController()
+ .controlSubmit(scheduler)
+ .controlSchedule(scheduler)
+ .controlExecute(scheduler)
+ .controlScheduleAtFixedRate(scheduler, 10);
+ PowerMockito.when(Executors.newSingleThreadScheduledExecutor(any()))
+ .thenReturn(scheduler);
+
+ this.conf = new ClientConfiguration();
+ this.ledgerManager = mock(
+ AbstractZkLedgerManager.class,
+ withSettings()
+ .useConstructor(conf, mockZk)
+ .defaultAnswer(CALLS_REAL_METHODS));
+ this.metadata = new LedgerMetadata(
+ 5, 3, 3,
+ DigestType.CRC32,
+ new byte[0],
+ Collections.emptyMap(),
+ false);
+
+ doAnswer(invocationOnMock -> {
+ long ledgerId = invocationOnMock.getArgument(0);
+ return String.valueOf(ledgerId);
+ }).when(ledgerManager).getLedgerPath(anyLong());
+ doAnswer(invocationOnMock -> {
+ String ledgerStr = invocationOnMock.getArgument(0);
+ return Long.parseLong(ledgerStr);
+ }).when(ledgerManager).getLedgerId(anyString());
+
+ // verify constructor
+ assertEquals(conf.getZkLedgersRootPath(), ledgerManager.ledgerRootPath);
+ assertSame(mockZk, ledgerManager.zk);
+ assertSame(conf, ledgerManager.conf);
+ assertSame(scheduler, ledgerManager.scheduler);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (null != ledgerManager) {
+ ledgerManager.close();
+
+ // zookeeper is passed in, it should not be closed.
+ verify(mockZk, times(0)).close();
+ verify(scheduler, times(1)).shutdown();
+ }
+ }
+
+ @Test
+ public void testCreateLedgerMetadataSuccess() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+ mockZkUtilsAsyncCreateFullPathOptimistic(
+ ledgerStr, CreateMode.PERSISTENT,
+ KeeperException.Code.OK.intValue(), ledgerStr
+ );
+
+ assertEquals(Version.NEW, metadata.getVersion());
+
+ GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+ ledgerManager.createLedgerMetadata(ledgerId, metadata, callbackFuture);
+ callbackFuture.get();
+
+ assertEquals(new LongVersion(0), metadata.getVersion());
+ }
+
+ @Test
+ public void testCreateLedgerMetadataNodeExists() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+ mockZkUtilsAsyncCreateFullPathOptimistic(
+ ledgerStr, CreateMode.PERSISTENT,
+ KeeperException.Code.NODEEXISTS.intValue(), null);
+
+ assertEquals(Version.NEW, metadata.getVersion());
+
+ GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+ ledgerManager.createLedgerMetadata(ledgerId, metadata, callbackFuture);
+ try {
+ result(callbackFuture);
+ fail("Should fail to create ledger metadata if the ledger already exists");
+ } catch (Exception e) {
+ assertTrue(e instanceof BKException);
+ BKException bke = (BKException) e;
+ assertEquals(Code.LedgerExistException, bke.getCode());
+ }
+
+ // creation failed, so metadata should not be modified
+ assertEquals(Version.NEW, metadata.getVersion());
+ }
+
+ @Test
+ public void testCreateLedgerMetadataException() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+ mockZkUtilsAsyncCreateFullPathOptimistic(
+ ledgerStr, CreateMode.PERSISTENT,
+ KeeperException.Code.CONNECTIONLOSS.intValue(), null);
+
+ assertEquals(Version.NEW, metadata.getVersion());
+
+ GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+ ledgerManager.createLedgerMetadata(ledgerId, metadata, callbackFuture);
+ try {
+ result(callbackFuture);
+ fail("Should fail to create ledger metadata when encountering zookeeper exception");
+ } catch (Exception e) {
+ assertTrue(e instanceof BKException);
+ BKException bke = (BKException) e;
+ assertEquals(Code.ZKException, bke.getCode());
+ }
+
+ // creation failed, so metadata should not be modified
+ assertEquals(Version.NEW, metadata.getVersion());
+ }
+
+ @Test
+ public void testRemoveLedgerMetadataSuccess() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+ LongVersion version = new LongVersion(1234L);
+
+ mockZkDelete(
+ ledgerStr, (int) version.getLongVersion(),
+ KeeperException.Code.OK.intValue());
+
+ GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+ ledgerManager.removeLedgerMetadata(ledgerId, version, callbackFuture);
+ result(callbackFuture);
+
+ verify(mockZk, times(1))
+ .delete(eq(ledgerStr), eq(1234), any(VoidCallback.class), eq(null));
+ }
+
+ @Test
+ public void testRemoveLedgerMetadataVersionAny() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+
+ mockZkDelete(
+ ledgerStr, -1,
+ KeeperException.Code.OK.intValue());
+
+ GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+ ledgerManager.removeLedgerMetadata(ledgerId, Version.ANY, callbackFuture);
+ result(callbackFuture);
+
+ verify(mockZk, times(1))
+ .delete(eq(ledgerStr), eq(-1), any(VoidCallback.class), eq(null));
+ }
+
+ @Test
+ public void testRemoveLedgerMetadataVersionNew() throws Exception {
+ testRemoveLedgerMetadataInvalidVersion(Version.NEW);
+ }
+
+ @Test
+ public void testRemoveLedgerMetadataUnknownVersionType() throws Exception {
+ Version version = mock(Version.class);
+ testRemoveLedgerMetadataInvalidVersion(version);
+ }
+
+ private void testRemoveLedgerMetadataInvalidVersion(Version version) throws Exception {
+ long ledgerId = System.currentTimeMillis();
+
+ GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+ ledgerManager.removeLedgerMetadata(ledgerId, version, callbackFuture);
+ try {
+ result(callbackFuture);
+ fail("Should fail to remove metadata if version is " + Version.NEW);
+ } catch (BKException bke) {
+ assertEquals(Code.MetadataVersionException, bke.getCode());
+ }
+ }
+
+ @Test
+ public void testRemoveLedgerMetadataNoNode() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+ LongVersion version = new LongVersion(1234L);
+
+ mockZkDelete(
+ ledgerStr, (int) version.getLongVersion(),
+ KeeperException.Code.NONODE.intValue());
+
+ GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+ ledgerManager.removeLedgerMetadata(ledgerId, version, callbackFuture);
+ try {
+ result(callbackFuture);
+ fail("Should fail to remove metadata if no such ledger exists");
+ } catch (BKException bke) {
+ assertEquals(Code.NoSuchLedgerExistsException, bke.getCode());
+ }
+
+ verify(mockZk, times(1))
+ .delete(eq(ledgerStr), eq(1234), any(VoidCallback.class), eq(null));
+ }
+
+ @Test
+ public void testRemoveLedgerMetadataException() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+ LongVersion version = new LongVersion(1234L);
+
+ mockZkDelete(
+ ledgerStr, (int) version.getLongVersion(),
+ KeeperException.Code.CONNECTIONLOSS.intValue());
+
+ GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+ ledgerManager.removeLedgerMetadata(ledgerId, version, callbackFuture);
+ try {
+ result(callbackFuture);
+ fail("Should fail to remove metadata if no such ledger exists");
+ } catch (BKException bke) {
+ assertEquals(Code.ZKException, bke.getCode());
+ }
+
+ verify(mockZk, times(1))
+ .delete(eq(ledgerStr), eq(1234), any(VoidCallback.class), eq(null));
+ }
+
+ @Test
+ public void testRemoveLedgerMetadataHierarchical() throws Exception {
+ HierarchicalLedgerManager hlm = new HierarchicalLedgerManager(conf, mockZk);
+ testRemoveLedgerMetadataHierarchicalLedgerManager(hlm);
+ }
+
+ @Test
+ public void testRemoveLedgerMetadataLongHierarchical() throws Exception {
+ LongHierarchicalLedgerManager hlm = new LongHierarchicalLedgerManager(conf, mockZk);
+ testRemoveLedgerMetadataHierarchicalLedgerManager(hlm);
+ }
+
+ private void testRemoveLedgerMetadataHierarchicalLedgerManager(AbstractZkLedgerManager lm) throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = lm.getLedgerPath(ledgerId);
+ LongVersion version = new LongVersion(1234L);
+
+ mockZkUtilsAsyncDeleteFullPathOptimistic(
+ ledgerStr, (int) version.getLongVersion(),
+ KeeperException.Code.OK.intValue());
+
+ GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+ lm.removeLedgerMetadata(ledgerId, version, callbackFuture);
+ result(callbackFuture);
+
+ PowerMockito.verifyStatic(
+ ZkUtils.class, times(1));
+ ZkUtils.asyncDeleteFullPathOptimistic(
+ eq(mockZk), eq(ledgerStr), eq(1234), any(VoidCallback.class), eq(ledgerStr));
+
+ }
+
+ @Test
+ public void testReadLedgerMetadataSuccess() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+
+ metadata.setVersion(new LongVersion(1234L));
+ Stat stat = mock(Stat.class);
+ when(stat.getVersion()).thenReturn(1234);
+ when(stat.getCtime()).thenReturn(metadata.getCtime());
+ mockZkGetData(
+ ledgerStr, false,
+ KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+ GenericCallbackFuture<LedgerMetadata> callbackFuture = new GenericCallbackFuture<>();
+ ledgerManager.readLedgerMetadata(ledgerId, callbackFuture);
+ LedgerMetadata readMetadata = result(callbackFuture);
+ assertEquals(metadata, readMetadata);
+
+ verify(mockZk, times(1))
+ .getData(eq(ledgerStr), eq(null), any(DataCallback.class), any());
+ }
+
+ @Test
+ public void testReadLedgerMetadataNoNode() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+
+ mockZkGetData(
+ ledgerStr, false,
+ KeeperException.Code.NONODE.intValue(), null, null);
+
+ GenericCallbackFuture<LedgerMetadata> callbackFuture = new GenericCallbackFuture<>();
+ ledgerManager.readLedgerMetadata(ledgerId, callbackFuture);
+ try {
+ result(callbackFuture);
+ fail("Should fail on reading ledger metadata if a ledger doesn't exist");
+ } catch (BKException bke) {
+ assertEquals(Code.NoSuchLedgerExistsException, bke.getCode());
+ }
+
+ verify(mockZk, times(1))
+ .getData(eq(ledgerStr), eq(null), any(DataCallback.class), any());
+ }
+
+ @Test
+ public void testReadLedgerMetadataException() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+
+ mockZkGetData(
+ ledgerStr, false,
+ KeeperException.Code.CONNECTIONLOSS.intValue(), null, null);
+
+ GenericCallbackFuture<LedgerMetadata> callbackFuture = new GenericCallbackFuture<>();
+ ledgerManager.readLedgerMetadata(ledgerId, callbackFuture);
+ try {
+ result(callbackFuture);
+ fail("Should fail on reading ledger metadata if a ledger doesn't exist");
+ } catch (BKException bke) {
+ assertEquals(Code.ZKException, bke.getCode());
+ }
+
+ verify(mockZk, times(1))
+ .getData(eq(ledgerStr), eq(null), any(DataCallback.class), any());
+ }
+
+ @Test
+ public void testReadLedgerMetadataStatMissing() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+
+ mockZkGetData(
+ ledgerStr, false,
+ KeeperException.Code.OK.intValue(), metadata.serialize(), null);
+
+ GenericCallbackFuture<LedgerMetadata> callbackFuture = new GenericCallbackFuture<>();
+ ledgerManager.readLedgerMetadata(ledgerId, callbackFuture);
+ try {
+ result(callbackFuture);
+ fail("Should fail on reading ledger metadata if a ledger doesn't exist");
+ } catch (BKException bke) {
+ assertEquals(Code.ZKException, bke.getCode());
+ }
+
+ verify(mockZk, times(1))
+ .getData(eq(ledgerStr), eq(null), any(DataCallback.class), any());
+ }
+
+ @Test
+ public void testReadLedgerMetadataDataCorrupted() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+
+ metadata.setVersion(new LongVersion(1234L));
+ Stat stat = mock(Stat.class);
+ when(stat.getVersion()).thenReturn(1234);
+ when(stat.getCtime()).thenReturn(metadata.getCtime());
+ mockZkGetData(
+ ledgerStr, false,
+ KeeperException.Code.OK.intValue(), new byte[0], stat);
+
+ GenericCallbackFuture<LedgerMetadata> callbackFuture = new GenericCallbackFuture<>();
+ ledgerManager.readLedgerMetadata(ledgerId, callbackFuture);
+ try {
+ result(callbackFuture);
+ fail("Should fail on reading ledger metadata if a ledger doesn't exist");
+ } catch (BKException bke) {
+ assertEquals(Code.ZKException, bke.getCode());
+ }
+
+ verify(mockZk, times(1))
+ .getData(eq(ledgerStr), eq(null), any(DataCallback.class), any());
+ }
+
+ @Test
+ public void testWriteLedgerMetadataSuccess() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+
+ metadata.setVersion(new LongVersion(1234L));
+ Stat stat = mock(Stat.class);
+ when(stat.getVersion()).thenReturn(1235);
+ when(stat.getCtime()).thenReturn(metadata.getCtime());
+ mockZkSetData(
+ ledgerStr, metadata.serialize(), 1234,
+ KeeperException.Code.OK.intValue(), stat);
+
+ assertEquals(new LongVersion(1234L), metadata.getVersion());
+
+ GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+ ledgerManager.writeLedgerMetadata(ledgerId, metadata, callbackFuture);
+ result(callbackFuture);
+
+ assertEquals(new LongVersion(1235L), metadata.getVersion());
+
+ verify(mockZk, times(1))
+ .setData(eq(ledgerStr), any(byte[].class), eq(1234), any(StatCallback.class), any());
+ }
+
+ @Test
+ public void testWriteLedgerMetadataBadVersion() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+
+ metadata.setVersion(new LongVersion(1234L));
+ mockZkSetData(
+ ledgerStr, metadata.serialize(), 1234,
+ KeeperException.Code.BADVERSION.intValue(), null);
+
+ assertEquals(new LongVersion(1234L), metadata.getVersion());
+
+ GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+ ledgerManager.writeLedgerMetadata(ledgerId, metadata, callbackFuture);
+ try {
+ result(callbackFuture);
+ fail("Should fail on writing ledger metadata if encountering bad version");
+ } catch (BKException bke) {
+ assertEquals(Code.MetadataVersionException, bke.getCode());
+ }
+
+ // version remain unchanged
+ assertEquals(new LongVersion(1234L), metadata.getVersion());
+
+ verify(mockZk, times(1))
+ .setData(eq(ledgerStr), any(byte[].class), eq(1234), any(StatCallback.class), any());
+ }
+
+ @Test
+ public void testWriteLedgerMetadataException() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+
+ metadata.setVersion(new LongVersion(1234L));
+ mockZkSetData(
+ ledgerStr, metadata.serialize(), 1234,
+ KeeperException.Code.CONNECTIONLOSS.intValue(), null);
+
+ assertEquals(new LongVersion(1234L), metadata.getVersion());
+
+ GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+ ledgerManager.writeLedgerMetadata(ledgerId, metadata, callbackFuture);
+ try {
+ result(callbackFuture);
+ fail("Should fail on writing ledger metadata if encountering zookeeper exceptions");
+ } catch (BKException bke) {
+ assertEquals(Code.ZKException, bke.getCode());
+ }
+
+ // version remain unchanged
+ assertEquals(new LongVersion(1234L), metadata.getVersion());
+
+ verify(mockZk, times(1))
+ .setData(eq(ledgerStr), any(byte[].class), eq(1234), any(StatCallback.class), any());
+ }
+
+ @Test
+ public void testWriteLedgerMetadataInvalidVersion() throws Exception {
+ Version[] versions = new Version[] {
+ Version.NEW,
+ Version.ANY,
+ mock(Version.class)
+ };
+ for (Version version : versions) {
+ testWriteLedgerMetadataInvalidVersion(version);
+ }
+ }
+
+ private void testWriteLedgerMetadataInvalidVersion(Version invalidVersion) throws Exception {
+ long ledgerId = System.currentTimeMillis();
+
+ metadata.setVersion(invalidVersion);
+
+ GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+ ledgerManager.writeLedgerMetadata(ledgerId, metadata, callbackFuture);
+ try {
+ result(callbackFuture);
+ fail("Should fail on writing ledger metadata if an invalid version is provided.");
+ } catch (BKException bke) {
+ assertEquals(Code.MetadataVersionException, bke.getCode());
+ }
+
+ verify(mockZk, times(0))
+ .setData(anyString(), any(byte[].class), anyInt(), any(StatCallback.class), any());
+ }
+
+ @Test
+ public void testLedgerMetadataListener() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+
+ LinkedBlockingQueue<LedgerMetadata> changes = new LinkedBlockingQueue<>();
+ LedgerMetadataListener listener = (ledgerId1, metadata) -> changes.add(metadata);
+
+ metadata.setVersion(new LongVersion(1234L));
+ Stat stat = mock(Stat.class);
+ when(stat.getVersion()).thenReturn(1234);
+ when(stat.getCtime()).thenReturn(metadata.getCtime());
+ mockZkGetData(
+ ledgerStr, true,
+ KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+ ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
+
+ // the listener will be notified with first get
+ LedgerMetadata change1 = changes.take();
+ assertEquals(metadata, change1);
+ verify(mockZk, times(1))
+ .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+ // the watcher is registered for receiving watched event
+ assertTrue(watchers.containsKey(ledgerStr));
+ Set<Watcher> watcherSet1 = watchers.get(ledgerStr);
+ assertEquals(1, watcherSet1.size());
+ Watcher registeredWatcher1 = watcherSet1.stream().findFirst().get();
+
+ // mock get data to return an updated metadata
+ metadata.setVersion(new LongVersion(1235L));
+ when(stat.getVersion()).thenReturn(1235);
+ mockZkGetData(
+ ledgerStr, true,
+ KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+ // notify the watcher event
+ notifyWatchedEvent(
+ EventType.NodeDataChanged, KeeperState.SyncConnected, ledgerStr);
+
+ // the listener should receive an updated metadata
+ LedgerMetadata change2 = changes.take();
+ assertEquals(metadata, change2);
+ verify(mockZk, times(2))
+ .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+ // after the listener receive an updated metadata, a new watcher should be registered
+ // for subsequent changes again.
+ assertTrue(watchers.containsKey(ledgerStr));
+ Set<Watcher> watcherSet2 = watchers.get(ledgerStr);
+ assertEquals(1, watcherSet2.size());
+ Watcher registeredWatcher2 = watcherSet2.stream().findFirst().get();
+
+ // zookeeper watchers are same, since there is only one giant watcher per ledger manager.
+ assertSame(registeredWatcher1, registeredWatcher2);
+
+ // verify scheduler
+ verify(scheduler, times(2)).submit(any(Runnable.class));
+ verify(scheduler, times(0))
+ .schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+ }
+
+ @Test
+ public void testLedgerMetadataListenerOnLedgerDeleted() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+
+ LinkedBlockingQueue<Optional<LedgerMetadata>> changes = new LinkedBlockingQueue<>();
+ LedgerMetadataListener listener = (ledgerId1, metadata) -> changes.add(Optional.ofNullable(metadata));
+
+ metadata.setVersion(new LongVersion(1234L));
+ Stat stat = mock(Stat.class);
+ when(stat.getVersion()).thenReturn(1234);
+ when(stat.getCtime()).thenReturn(metadata.getCtime());
+ mockZkGetData(
+ ledgerStr, true,
+ KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+ ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
+ assertTrue(ledgerManager.listeners.containsKey(ledgerId));
+
+ // the listener will be notified with first get
+ LedgerMetadata change1 = changes.take().get();
+ assertEquals(metadata, change1);
+ verify(mockZk, times(1))
+ .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+ // the watcher is registered for receiving watched event
+ assertTrue(watchers.containsKey(ledgerStr));
+
+ // mock get data to simulate an ledger is deleted
+ mockZkGetData(
+ ledgerStr, true,
+ KeeperException.Code.NONODE.intValue(), null, null);
+
+ // notify the watcher event
+ notifyWatchedEvent(
+ EventType.NodeDataChanged, KeeperState.SyncConnected, ledgerStr);
+
+ // the listener should be removed from listener set and not receive an updated metadata anymore
+ Optional<LedgerMetadata> change2 = changes.take();
+ assertFalse(change2.isPresent());
+ assertFalse(ledgerManager.listeners.containsKey(ledgerId));
+
+ // verify scheduler: the listener is only triggered once
+ verify(scheduler, times(1)).submit(any(Runnable.class));
+ verify(scheduler, times(0)).schedule(
+ any(Runnable.class), anyLong(), any(TimeUnit.class));
+
+ // no watcher is registered
+ assertFalse(watchers.containsKey(ledgerStr));
+ }
+
+ @Test
+ public void testLedgerMetadataListenerOnLedgerDeletedEvent() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+
+ LinkedBlockingQueue<Optional<LedgerMetadata>> changes = new LinkedBlockingQueue<>();
+ LedgerMetadataListener listener = (ledgerId1, metadata) -> changes.add(Optional.ofNullable(metadata));
+
+ metadata.setVersion(new LongVersion(1234L));
+ Stat stat = mock(Stat.class);
+ when(stat.getVersion()).thenReturn(1234);
+ when(stat.getCtime()).thenReturn(metadata.getCtime());
+ mockZkGetData(
+ ledgerStr, true,
+ KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+ ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
+ assertTrue(ledgerManager.listeners.containsKey(ledgerId));
+
+ // the listener will be notified with first get
+ LedgerMetadata change1 = changes.take().get();
+ assertEquals(metadata, change1);
+ verify(mockZk, times(1))
+ .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+ // the watcher is registered for receiving watched event
+ assertTrue(watchers.containsKey(ledgerStr));
+
+ // notify the watcher event
+ notifyWatchedEvent(
+ EventType.NodeDeleted, KeeperState.SyncConnected, ledgerStr);
+
+ // the listener should be removed from listener set and a null change is notified.
+ Optional<LedgerMetadata> change2 = changes.take();
+ assertFalse(change2.isPresent());
+ // no more `getData` is called.
+ verify(mockZk, times(1))
+ .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+ // listener is automatically unregistered after a ledger is deleted.
+ assertFalse(ledgerManager.listeners.containsKey(ledgerId));
+ }
+
+ @Test
+ public void testLedgerMetadataListenerOnRetries() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+
+ LinkedBlockingQueue<LedgerMetadata> changes = new LinkedBlockingQueue<>();
+ LedgerMetadataListener listener = (ledgerId1, metadata) -> changes.add(metadata);
+
+ metadata.setVersion(new LongVersion(1234L));
+ Stat stat = mock(Stat.class);
+ when(stat.getVersion()).thenReturn(1234);
+ when(stat.getCtime()).thenReturn(metadata.getCtime());
+
+ // fail the first get, so the ledger manager will retry get data again.
+ mockZkGetData(
+ ledgerStr, true,
+ KeeperException.Code.SESSIONEXPIRED.intValue(), null, null);
+
+ ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
+ assertTrue(ledgerManager.listeners.containsKey(ledgerId));
+
+ // the listener will not be notified with any updates
+ assertNull(changes.poll());
+ // an retry task is scheduled
+ verify(scheduler, times(1))
+ .schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+ // zookeeper is called once
+ verify(mockZk, times(1))
+ .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+ // watcher is not registered since getData call is failed
+ assertFalse(watchers.containsKey(ledgerStr));
+
+ // mock get data to return a valid response
+ mockZkGetData(
+ ledgerStr, true,
+ KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+ schedulerController.advance(Duration.ofMillis(ZK_CONNECT_BACKOFF_MS));
+
+ // the listener will be notified with first get
+ LedgerMetadata change = changes.take();
+ assertEquals(metadata, change);
+ verify(mockZk, times(2))
+ .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+ // watcher is registered after successfully `getData`
+ assertTrue(watchers.containsKey(ledgerStr));
+ }
+
+ @Test
+ public void testLedgerMetadataListenerOnSessionExpired() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+
+ LinkedBlockingQueue<LedgerMetadata> changes = new LinkedBlockingQueue<>();
+ LedgerMetadataListener listener = (ledgerId1, metadata) -> changes.add(metadata);
+
+ metadata.setVersion(new LongVersion(1234L));
+ Stat stat = mock(Stat.class);
+ when(stat.getVersion()).thenReturn(1234);
+ when(stat.getCtime()).thenReturn(metadata.getCtime());
+ mockZkGetData(
+ ledgerStr, true,
+ KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+ ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
+
+ // the listener will be notified with first get
+ LedgerMetadata change1 = changes.take();
+ assertEquals(metadata, change1);
+ verify(mockZk, times(1))
+ .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+ // the watcher is registered for receiving watched event
+ assertTrue(watchers.containsKey(ledgerStr));
+ Set<Watcher> watcherSet1 = watchers.get(ledgerStr);
+ assertEquals(1, watcherSet1.size());
+ Watcher registeredWatcher1 = watcherSet1.stream().findFirst().get();
+
+ // simulate session expired
+ notifyWatchedEvent(
+ EventType.None, KeeperState.Expired, ledgerStr);
+
+ // ledger manager will retry to read metadata again
+ LedgerMetadata change2 = changes.take();
+ assertEquals(metadata, change2);
+ verify(mockZk, times(2))
+ .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+ // the watcher is registered for receiving watched event
+ assertTrue(watchers.containsKey(ledgerStr));
+ Set<Watcher> watcherSet2 = watchers.get(ledgerStr);
+ assertEquals(1, watcherSet2.size());
+ Watcher registeredWatcher2 = watcherSet2.stream().findFirst().get();
+
+ assertSame(registeredWatcher1, registeredWatcher2);
+ }
+
+ @Test
+ public void testUnregisterLedgerMetadataListener() throws Exception {
+ long ledgerId = System.currentTimeMillis();
+ String ledgerStr = String.valueOf(ledgerId);
+
+ LinkedBlockingQueue<LedgerMetadata> changes = new LinkedBlockingQueue<>();
+ LedgerMetadataListener listener = (ledgerId1, metadata) -> changes.add(metadata);
+
+ metadata.setVersion(new LongVersion(1234L));
+ Stat stat = mock(Stat.class);
+ when(stat.getVersion()).thenReturn(1234);
+ when(stat.getCtime()).thenReturn(metadata.getCtime());
+ mockZkGetData(
+ ledgerStr, true,
+ KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+ ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
+ assertTrue(ledgerManager.listeners.containsKey(ledgerId));
+
+ // the listener will be notified with first get
+ LedgerMetadata change1 = changes.take();
+ assertEquals(metadata, change1);
+ verify(mockZk, times(1))
+ .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+ // the watcher is registered for receiving watched event
+ assertTrue(watchers.containsKey(ledgerStr));
+ Set<Watcher> watcherSet1 = watchers.get(ledgerStr);
+ assertEquals(1, watcherSet1.size());
+ Watcher registeredWatcher1 = watcherSet1.stream().findFirst().get();
+
+ // mock get data to return an updated metadata
+ metadata.setVersion(new LongVersion(1235L));
+ when(stat.getVersion()).thenReturn(1235);
+ mockZkGetData(
+ ledgerStr, true,
+ KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+ // unregister the listener
+ ledgerManager.unregisterLedgerMetadataListener(ledgerId, listener);
+ assertFalse(ledgerManager.listeners.containsKey(ledgerId));
+
+ // notify the watcher event
+ notifyWatchedEvent(
+ EventType.NodeDataChanged, KeeperState.SyncConnected, ledgerStr);
+
+ // since listener is already unregistered so no more `getData` is issued.
+ assertNull(changes.poll());
+ verify(mockZk, times(1))
+ .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+ }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
index 5d9f99b..7c52e3a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
@@ -20,11 +20,13 @@
*/
package org.apache.bookkeeper.test;
+import com.google.common.util.concurrent.AbstractFuture;
+
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import com.google.common.util.concurrent.AbstractFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,13 +38,13 @@ public class TestCallbacks {
private static final Logger logger = LoggerFactory.getLogger(TestCallbacks.class);
public static class GenericCallbackFuture<T>
- extends AbstractFuture<T> implements GenericCallback<T> {
+ extends CompletableFuture<T> implements GenericCallback<T> {
@Override
public void operationComplete(int rc, T value) {
if (rc != BKException.Code.OK) {
- setException(BKException.create(rc));
+ completeExceptionally(BKException.create(rc));
} else {
- set(value);
+ complete(value);
}
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
new file mode 100644
index 0000000..eb28229
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.zookeeper;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import com.google.common.collect.Maps;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.client.api.BKException.Code;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.powermock.api.mockito.PowerMockito;
+
+/**
+ * A test base that provides mocked zookeeper.
+ */
+public abstract class MockZooKeeperTestCase {
+
+ protected final ConcurrentMap<String, Set<Watcher>> watchers = Maps.newConcurrentMap();
+ protected ZooKeeper mockZk;
+
+ protected void setup() throws Exception {
+ this.mockZk = mock(ZooKeeper.class);
+
+ PowerMockito.mockStatic(ZkUtils.class);
+ }
+
+ private void addWatcher(String path, Watcher watcher) {
+ if (null == watcher) {
+ return;
+ }
+ Set<Watcher> watcherSet = watchers.get(path);
+ if (null == watcherSet) {
+ watcherSet = new HashSet<>();
+ watchers.put(path, watcherSet);
+ }
+ watcherSet.add(watcher);
+ }
+
+ protected void mockZkUtilsAsyncCreateFullPathOptimistic(
+ String expectedLedgerPath,
+ CreateMode expectedCreateMode,
+ int retCode,
+ String retCreatedZnodeName
+ ) throws Exception {
+
+ PowerMockito.doAnswer(invocationOnMock -> {
+ String path = invocationOnMock.getArgument(1);
+ StringCallback callback = invocationOnMock.getArgument(5);
+ Object ctx = invocationOnMock.getArgument(6);
+
+ callback.processResult(
+ retCode, path, ctx, retCreatedZnodeName);
+ return null;
+ }).when(
+ ZkUtils.class,
+ "asyncCreateFullPathOptimistic",
+ eq(mockZk),
+ eq(expectedLedgerPath),
+ any(byte[].class),
+ anyList(),
+ eq(expectedCreateMode),
+ any(StringCallback.class),
+ any());
+
+ }
+
+ protected void mockZkDelete(
+ String expectedLedgerPath,
+ int expectedVersion,
+ int retCode
+ ) throws Exception {
+
+ doAnswer(invocationOnMock -> {
+ String path = invocationOnMock.getArgument(0);
+ VoidCallback callback = invocationOnMock.getArgument(2);
+ Object ctx = invocationOnMock.getArgument(3);
+
+ callback.processResult(
+ retCode, path, ctx
+ );
+
+ return null;
+ }).when(mockZk).delete(
+ eq(expectedLedgerPath),
+ eq(expectedVersion),
+ any(VoidCallback.class),
+ any());
+
+ }
+
+ protected void mockZkUtilsAsyncDeleteFullPathOptimistic(
+ String expectedLedgerPath,
+ int expectedZnodeVersion,
+ int retCode
+ ) throws Exception {
+
+ PowerMockito.doAnswer(invocationOnMock -> {
+ String path = invocationOnMock.getArgument(1);
+ VoidCallback callback = invocationOnMock.getArgument(3);
+
+ callback.processResult(
+ retCode, path, null);
+ return null;
+ }).when(
+ ZkUtils.class,
+ "asyncDeleteFullPathOptimistic",
+ eq(mockZk),
+ eq(expectedLedgerPath),
+ eq(expectedZnodeVersion),
+ any(VoidCallback.class),
+ eq(expectedLedgerPath));
+
+ }
+
+ protected void mockZkGetData(
+ String expectedLedgerPath,
+ boolean expectedWatcher,
+ int retCode,
+ byte[] retData,
+ Stat retStat
+ ) throws Exception {
+
+ doAnswer(invocationOnMock -> {
+ String path = invocationOnMock.getArgument(0);
+ Watcher watcher = invocationOnMock.getArgument(1);
+ DataCallback callback = invocationOnMock.getArgument(2);
+ Object ctx = invocationOnMock.getArgument(3);
+
+ if (Code.OK == retCode) {
+ addWatcher(path, watcher);
+ }
+
+ callback.processResult(
+ retCode, path, ctx, retData, retStat
+ );
+
+ return null;
+ }).when(mockZk).getData(
+ eq(expectedLedgerPath),
+ expectedWatcher ? any(Watcher.class) : eq(null),
+ any(DataCallback.class),
+ any());
+
+ }
+
+ protected void mockZkSetData(
+ String expectedLedgerPath,
+ byte[] expectedBytes,
+ int expectedVersion,
+ int retCode,
+ Stat retStat
+ ) throws Exception {
+
+ doAnswer(invocationOnMock -> {
+ String path = invocationOnMock.getArgument(0);
+ StatCallback callback = invocationOnMock.getArgument(3);
+ Object ctx = invocationOnMock.getArgument(4);
+
+ callback.processResult(
+ retCode, path, ctx, retStat
+ );
+
+ return null;
+ }).when(mockZk).setData(
+ eq(expectedLedgerPath),
+ eq(expectedBytes),
+ eq(expectedVersion),
+ any(StatCallback.class),
+ any());
+
+ }
+
+ protected boolean notifyWatchedEvent(EventType eventType,
+ KeeperState keeperState,
+ String path) {
+ Set<Watcher> watcherSet = watchers.remove(path);
+ if (null == watcherSet) {
+ return false;
+ }
+ WatchedEvent event = new WatchedEvent(
+ eventType, keeperState, path);
+ for (Watcher watcher : watcherSet) {
+ watcher.process(event);
+ }
+ return true;
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.