You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/08/12 20:43:10 UTC
[bookkeeper] branch master updated: Move MockLedgerManager out of
MetadataUpdateLoopTest
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 9a2efac Move MockLedgerManager out of MetadataUpdateLoopTest
9a2efac is described below
commit 9a2efacd08e2360868b79efe10296aade8e2e5b1
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Sun Aug 12 13:43:03 2018 -0700
Move MockLedgerManager out of MetadataUpdateLoopTest
This mock is useful for other tests, so move it out.
Author: Ivan Kelly <iv...@apache.org>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
This closes #1596 from ivankelly/mocklm-common
---
.../bookkeeper/client/MetadataUpdateLoopTest.java | 116 +----------
.../apache/bookkeeper/meta/MockLedgerManager.java | 211 +++++++++++++++++++++
2 files changed, 213 insertions(+), 114 deletions(-)
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
index 57a3de3..c708a84 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
@@ -25,13 +25,10 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -43,18 +40,14 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.MockLedgerManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.test.TestCallbacks.GenericCallbackFuture;
-import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
-import org.apache.zookeeper.AsyncCallback;
import org.junit.Assert;
import org.junit.Test;
@@ -411,7 +404,7 @@ public class MetadataUpdateLoopTest {
new ThreadFactoryBuilder().setNameFormat("non-deter-%d").build());
@Override
- void executeCallback(Runnable r) {
+ public void executeCallback(Runnable r) {
cbExecutor.execute(r);
}
@@ -483,109 +476,4 @@ public class MetadataUpdateLoopTest {
}
};
}
-
- static class MockLedgerManager implements LedgerManager {
- final Map<Long, Pair<LongVersion, byte[]>> metadataMap = new HashMap<>();
- final ExecutorService executor = Executors.newSingleThreadExecutor((r) -> new Thread(r, "MockLedgerManager"));
-
- private LedgerMetadata readMetadata(long ledgerId) throws Exception {
- Pair<LongVersion, byte[]> pair = metadataMap.get(ledgerId);
- if (pair == null) {
- return null;
- } else {
- return LedgerMetadata.parseConfig(pair.getRight(), pair.getLeft(), Optional.absent());
- }
- }
-
- void executeCallback(Runnable r) {
- r.run();
- }
-
- @Override
- public void createLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<LedgerMetadata> cb) {
- executor.submit(() -> {
- if (metadataMap.containsKey(ledgerId)) {
- executeCallback(() -> cb.operationComplete(BKException.Code.LedgerExistException, null));
- } else {
- metadataMap.put(ledgerId, Pair.of(new LongVersion(0L), metadata.serialize()));
- try {
- LedgerMetadata readBack = readMetadata(ledgerId);
- executeCallback(() -> cb.operationComplete(BKException.Code.OK, readBack));
- } catch (Exception e) {
- LOG.error("Error reading back written metadata", e);
- executeCallback(() -> cb.operationComplete(BKException.Code.MetaStoreException, null));
- }
- }
- });
- }
-
- @Override
- public void removeLedgerMetadata(long ledgerId, Version version, GenericCallback<Void> cb) {}
-
- @Override
- public void readLedgerMetadata(long ledgerId, GenericCallback<LedgerMetadata> cb) {
- executor.submit(() -> {
- try {
- LedgerMetadata metadata = readMetadata(ledgerId);
- if (metadata == null) {
- executeCallback(
- () -> cb.operationComplete(BKException.Code.NoSuchLedgerExistsException, null));
- } else {
- executeCallback(() -> cb.operationComplete(BKException.Code.OK, metadata));
- }
- } catch (Exception e) {
- LOG.error("Error reading metadata", e);
- executeCallback(() -> cb.operationComplete(BKException.Code.MetaStoreException, null));
- }
- });
- }
-
- @Override
- public void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<LedgerMetadata> cb) {
- executor.submit(() -> {
- try {
- LedgerMetadata oldMetadata = readMetadata(ledgerId);
- if (oldMetadata == null) {
- executeCallback(
- () -> cb.operationComplete(BKException.Code.NoSuchLedgerExistsException, null));
- } else if (!oldMetadata.getVersion().equals(metadata.getVersion())) {
- executeCallback(
- () -> cb.operationComplete(BKException.Code.MetadataVersionException, null));
- } else {
- LongVersion oldVersion = (LongVersion) oldMetadata.getVersion();
- metadataMap.put(ledgerId, Pair.of(new LongVersion(oldVersion.getLongVersion() + 1),
- metadata.serialize()));
- LedgerMetadata readBack = readMetadata(ledgerId);
- executeCallback(() -> cb.operationComplete(BKException.Code.OK, readBack));
- }
- } catch (Exception e) {
- LOG.error("Error writing metadata", e);
- executeCallback(
- () -> cb.operationComplete(BKException.Code.MetaStoreException, null));
- }
- });
-
- }
-
- @Override
- public void registerLedgerMetadataListener(long ledgerId, LedgerMetadataListener listener) {}
-
- @Override
- public void unregisterLedgerMetadataListener(long ledgerId, LedgerMetadataListener listener) {}
-
- @Override
- public void asyncProcessLedgers(Processor<Long> processor, AsyncCallback.VoidCallback finalCb,
- Object context, int successRc, int failureRc) {
- }
-
- @Override
- public LedgerRangeIterator getLedgerRanges() {
- return null;
- }
-
- @Override
- public void close() {
- executor.shutdownNow();
- }
- }
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
new file mode 100644
index 0000000..4586e09
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.meta;
+
+import com.google.common.base.Optional;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Consumer;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerMetadata;
+
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+
+import org.apache.bookkeeper.versioning.LongVersion;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.zookeeper.AsyncCallback;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mock implementation of Ledger Manager.
+ */
+public class MockLedgerManager implements LedgerManager {
+ static final Logger LOG = LoggerFactory.getLogger(MockLedgerManager.class);
+
+ boolean stallingWrites = false;
+ final List<Consumer<Integer>> stalledWrites = new ArrayList<>();
+
+ final Map<Long, Pair<LongVersion, byte[]>> metadataMap;
+ final ExecutorService executor;
+ final boolean ownsExecutor;
+
+ public MockLedgerManager() {
+ this(new HashMap<>(),
+ Executors.newSingleThreadExecutor((r) -> new Thread(r, "MockLedgerManager")), true);
+ }
+
+ private MockLedgerManager(Map<Long, Pair<LongVersion, byte[]>> metadataMap,
+ ExecutorService executor, boolean ownsExecutor) {
+ this.metadataMap = metadataMap;
+ this.executor = executor;
+ this.ownsExecutor = ownsExecutor;
+ }
+
+ public MockLedgerManager newClient() {
+ return new MockLedgerManager(metadataMap, executor, false);
+ }
+
+ private LedgerMetadata readMetadata(long ledgerId) throws Exception {
+ Pair<LongVersion, byte[]> pair = metadataMap.get(ledgerId);
+ if (pair == null) {
+ return null;
+ } else {
+ return LedgerMetadata.parseConfig(pair.getRight(), pair.getLeft(), Optional.absent());
+ }
+ }
+
+ public void stallWrites() throws Exception {
+ synchronized (this) {
+ stallingWrites = true;
+ }
+ }
+
+ public void releaseStalledWrites(int rc) {
+ List<Consumer<Integer>> toRelease;
+ synchronized (this) {
+ stallingWrites = false;
+ toRelease = new ArrayList<>(stalledWrites);
+ stalledWrites.clear();
+ }
+
+ executor.execute(() -> {
+ toRelease.forEach(w -> w.accept(rc));
+ });
+ }
+
+ public void executeCallback(Runnable r) {
+ r.run();
+ }
+
+ @Override
+ public void createLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<LedgerMetadata> cb) {
+ executor.submit(() -> {
+ if (metadataMap.containsKey(ledgerId)) {
+ executeCallback(() -> cb.operationComplete(BKException.Code.LedgerExistException, null));
+ } else {
+ metadataMap.put(ledgerId, Pair.of(new LongVersion(0L), metadata.serialize()));
+ try {
+ LedgerMetadata readBack = readMetadata(ledgerId);
+ executeCallback(() -> cb.operationComplete(BKException.Code.OK, readBack));
+ } catch (Exception e) {
+ LOG.error("Error reading back written metadata", e);
+ executeCallback(() -> cb.operationComplete(BKException.Code.MetaStoreException, null));
+ }
+ }
+ });
+ }
+
+ @Override
+ public void removeLedgerMetadata(long ledgerId, Version version, GenericCallback<Void> cb) {}
+
+ @Override
+ public void readLedgerMetadata(long ledgerId, GenericCallback<LedgerMetadata> cb) {
+ executor.submit(() -> {
+ try {
+ LedgerMetadata metadata = readMetadata(ledgerId);
+ if (metadata == null) {
+ executeCallback(
+ () -> cb.operationComplete(BKException.Code.NoSuchLedgerExistsException, null));
+ } else {
+ executeCallback(() -> cb.operationComplete(BKException.Code.OK, metadata));
+ }
+ } catch (Exception e) {
+ LOG.error("Error reading metadata", e);
+ executeCallback(() -> cb.operationComplete(BKException.Code.MetaStoreException, null));
+ }
+ });
+ }
+
+ @Override
+ public void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<LedgerMetadata> cb) {
+ Runnable write = () -> {
+ try {
+ LedgerMetadata oldMetadata = readMetadata(ledgerId);
+ if (oldMetadata == null) {
+ executeCallback(() -> cb.operationComplete(BKException.Code.NoSuchLedgerExistsException, null));
+ } else if (!oldMetadata.getVersion().equals(metadata.getVersion())) {
+ executeCallback(() -> cb.operationComplete(BKException.Code.MetadataVersionException, null));
+ } else {
+ LongVersion oldVersion = (LongVersion) oldMetadata.getVersion();
+ metadataMap.put(ledgerId, Pair.of(new LongVersion(oldVersion.getLongVersion() + 1),
+ metadata.serialize()));
+ LedgerMetadata readBack = readMetadata(ledgerId);
+ executeCallback(() -> cb.operationComplete(BKException.Code.OK, readBack));
+ }
+ } catch (Exception e) {
+ LOG.error("Error writing metadata", e);
+ executeCallback(() -> cb.operationComplete(BKException.Code.MetaStoreException, null));
+ }
+ };
+
+ synchronized (this) {
+ if (stallingWrites) {
+ LOG.info("[L{}, stallId={}] Stalling write of metadata", ledgerId, System.identityHashCode(write));
+ stalledWrites.add((rc) -> {
+ LOG.info("[L{}, stallid={}] Unstalled write", ledgerId, System.identityHashCode(write));
+
+ if (rc == BKException.Code.OK) {
+ write.run();
+ } else {
+ executeCallback(() -> cb.operationComplete(rc, null));
+ }
+ });
+ } else {
+ executor.execute(write);
+ }
+ }
+ }
+
+ @Override
+ public void registerLedgerMetadataListener(long ledgerId, LedgerMetadataListener listener) {}
+
+ @Override
+ public void unregisterLedgerMetadataListener(long ledgerId, LedgerMetadataListener listener) {}
+
+ @Override
+ public void asyncProcessLedgers(Processor<Long> processor, AsyncCallback.VoidCallback finalCb,
+ Object context, int successRc, int failureRc) {
+ }
+
+ @Override
+ public LedgerRangeIterator getLedgerRanges() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ if (ownsExecutor) {
+ executor.shutdownNow();
+ }
+ }
+
+}