You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/28 18:19:55 UTC
[pulsar] branch master updated: Fix bug related to managedLedger
properties (#7357)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a3a63a3 Fix bug related to managedLedger properties (#7357)
a3a63a3 is described below
commit a3a63a35c9ff406be05bcfd388f520d40580954e
Author: Hao Zhang <zh...@cmss.chinamobile.com>
AuthorDate: Mon Jun 29 02:19:37 2020 +0800
Fix bug related to managedLedger properties (#7357)
* Remove re-read from zk, and use the same mutex when update metadata.
* Add setProperty(), deleteProperty() API and test ledger changed when add metadata.
* Add AsyncSetProperty(), asyncDeleteProperty() API and add related unit tests.
* Fix unit test.
* Fix exception propagation.
---
.../apache/bookkeeper/mledger/AsyncCallbacks.java | 6 +-
.../apache/bookkeeper/mledger/ManagedLedger.java | 46 ++++++-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 135 +++++++++++++--------
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 110 +++++++++++++----
4 files changed, 221 insertions(+), 76 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
index 0add10f..1ced748 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
@@ -139,9 +139,9 @@ public interface AsyncCallbacks {
void offloadFailed(ManagedLedgerException exception, Object ctx);
}
- interface SetPropertiesCallback {
- void setPropertiesComplete(Map<String, String> properties, Object ctx);
+ interface UpdatePropertiesCallback {
+ void updatePropertiesComplete(Map<String, String> properties, Object ctx);
- void setPropertiesFailed(ManagedLedgerException exception, Object ctx);
+ void updatePropertiesFailed(ManagedLedgerException exception, Object ctx);
}
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 4f4b8ca..a8a4509 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -451,11 +451,51 @@ public interface ManagedLedger {
Map<String, String> getProperties();
/**
+ * Add key-value to propertiesMap.
+ *
+ * @param key key of property to add
+ * @param value value of property to add
+ * @throws InterruptedException
+ * @throws ManagedLedgerException
+ */
+ void setProperty(String key, String value) throws InterruptedException, ManagedLedgerException;
+
+ /**
+ * Async add key-value to propertiesMap.
+ *
+ * @param key key of property to add
+ * @param value value of property to add
+ * @param callback a callback which will be supplied with the newest properties in managedLedger.
+ * @param ctx a context object which will be passed to the callback on completion.
+ **/
+ void asyncSetProperty(String key, String value, final AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx);
+
+ /**
+ * Delete the property by key.
+ *
+ * @param key key of property to delete
+ * @throws InterruptedException
+ * @throws ManagedLedgerException
+ */
+ void deleteProperty(String key) throws InterruptedException, ManagedLedgerException;
+
+ /**
+ * Async delete the property by key.
+ *
+ * @param key key of property to delete
+ * @param callback a callback which will be supplied with the newest properties in managedLedger.
+ * @param ctx a context object which will be passed to the callback on completion.
+ */
+ void asyncDeleteProperty(String key, final AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx);
+
+ /**
* Update managed-ledger's properties.
*
* @param properties key-values of properties
+ * @throws InterruptedException
+ * @throws ManagedLedgerException
*/
- void setProperties(Map<String, String> properties) throws InterruptedException;
+ void setProperties(Map<String, String> properties) throws InterruptedException, ManagedLedgerException;
/**
* Async update managed-ledger's properties.
@@ -464,9 +504,9 @@ public interface ManagedLedger {
* @param callback a callback which will be supplied with the newest properties in managedLedger.
* @param ctx a context object which will be passed to the callback on completion.
*/
- void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.SetPropertiesCallback callback,
+ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.UpdatePropertiesCallback callback,
Object ctx);
-
+
/**
* Trim consumed ledgers in background
* @param promise
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 27d8848..f45b341 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkState;
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableMap;
@@ -31,12 +30,10 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Range;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
-
import java.time.Clock;
import java.util.Collections;
import java.util.HashMap;
@@ -67,7 +64,6 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -90,8 +86,8 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.SetPropertiesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -176,10 +172,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private ScheduledFuture<?> timeoutTask;
/**
- * This lock is held while the ledgers list is updated asynchronously on the metadata store. Since we use the store
+ * This lock is held while the ledgers list or propertiesMap is updated asynchronously on the metadata store. Since we use the store
* version, we cannot have multiple concurrent updates.
*/
- private final CallbackMutex ledgersListMutex = new CallbackMutex();
+ private final CallbackMutex metadataMutex = new CallbackMutex();
private final CallbackMutex trimmerMutex = new CallbackMutex();
private final CallbackMutex offloadMutex = new CallbackMutex();
@@ -1230,7 +1226,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
log.debug("[{}] Updating of ledgers list after create complete. version={}", name, stat);
}
ledgersStat = stat;
- ledgersListMutex.unlock();
+ metadataMutex.unlock();
updateLedgersIdsComplete(stat);
synchronized (ManagedLedgerImpl.this) {
mbean.addLedgerSwitchLatencySample(System.nanoTime() - lastLedgerCreationInitiationTimestamp,
@@ -1267,7 +1263,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
}, null);
- ledgersListMutex.unlock();
+ metadataMutex.unlock();
synchronized (ManagedLedgerImpl.this) {
lastLedgerCreationFailureTimestamp = clock.millis();
@@ -1282,7 +1278,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback) {
- if (!ledgersListMutex.tryLock()) {
+ if (!metadataMutex.tryLock()) {
// Defer update for later
scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback), 100, TimeUnit.MILLISECONDS);
return;
@@ -2062,7 +2058,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
if (STATE_UPDATER.get(this) == State.CreatingLedger // Give up now and schedule a new trimming
- || !ledgersListMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list
+ || !metadataMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list
scheduleDeferredTrimming(promise);
trimmerMutex.unlock();
return;
@@ -2101,7 +2097,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
ledgersStat = stat;
- ledgersListMutex.unlock();
+ metadataMutex.unlock();
trimmerMutex.unlock();
for (LedgerInfo ls : ledgersToDelete) {
@@ -2119,7 +2115,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Failed to update the list of ledgers after trimming", name, e);
- ledgersListMutex.unlock();
+ metadataMutex.unlock();
trimmerMutex.unlock();
promise.completeExceptionally(e);
@@ -2531,7 +2527,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private void tryTransformLedgerInfo(long ledgerId, LedgerInfoTransformation transformation,
CompletableFuture<Void> finalPromise) {
synchronized (this) {
- if (!ledgersListMutex.tryLock()) {
+ if (!metadataMutex.tryLock()) {
// retry in 100 milliseconds
scheduledExecutor.schedule(
safeRun(() -> tryTransformLedgerInfo(ledgerId, transformation, finalPromise)), 100,
@@ -2539,7 +2535,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
} else { // lock acquired
CompletableFuture<Void> unlockingPromise = new CompletableFuture<>();
unlockingPromise.whenComplete((res, ex) -> {
- ledgersListMutex.unlock();
+ metadataMutex.unlock();
if (ex != null) {
finalPromise.completeExceptionally(ex);
} else {
@@ -3020,6 +3016,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId())
.setEntryId(lastConfirmedEntry.getEntryId()));
}
+ for (Map.Entry<String, String> property : propertiesMap.entrySet()) {
+ mlInfo.addProperties(MLDataFormats.KeyValue.newBuilder()
+ .setKey(property.getKey()).setValue(property.getValue()));
+ }
return mlInfo.build();
}
@@ -3271,57 +3271,96 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
@Override
- public void setProperties(Map<String, String> properties) throws InterruptedException {
+ public void setProperty(String key, String value) throws InterruptedException, ManagedLedgerException {
+ Map<String, String> map = new HashMap<>();
+ map.put(key, value);
+ updateProperties(map, false, null);
+ }
+
+ @Override
+ public void asyncSetProperty(String key, String value, final UpdatePropertiesCallback callback, Object ctx) {
+ Map<String, String> map = new HashMap<>();
+ map.put(key, value);
+ asyncUpdateProperties(map, false, null, callback, ctx);
+ }
+
+ @Override
+ public void deleteProperty(String key) throws InterruptedException, ManagedLedgerException {
+ updateProperties(null, true, key);
+ }
+
+ @Override
+ public void asyncDeleteProperty(String key, final UpdatePropertiesCallback callback, Object ctx) {
+ asyncUpdateProperties(null, true, key, callback, ctx);
+ }
+
+ @Override
+ public void setProperties(Map<String, String> properties) throws InterruptedException, ManagedLedgerException {
+ updateProperties(properties, false, null);
+ }
+
+ @Override
+ public void asyncSetProperties(Map<String, String> properties, final UpdatePropertiesCallback callback,
+ Object ctx) {
+ asyncUpdateProperties(properties, false, null, callback, ctx);
+ }
+
+ private void updateProperties(Map<String, String> properties, boolean isDelete,
+ String deleteKey) throws InterruptedException, ManagedLedgerException {
final CountDownLatch latch = new CountDownLatch(1);
- this.asyncSetProperties(properties, new SetPropertiesCallback() {
+ class Result {
+ ManagedLedgerException exception = null;
+ }
+ final Result result = new Result();
+ this.asyncUpdateProperties(properties, isDelete, deleteKey, new UpdatePropertiesCallback() {
@Override
- public void setPropertiesComplete(Map<String, String> properties, Object ctx) {
+ public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
latch.countDown();
}
@Override
- public void setPropertiesFailed(ManagedLedgerException exception, Object ctx) {
- log.error("[{}] Update manageLedger's info failed:{}", name, exception.getMessage());
+ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
+ result.exception = exception;
latch.countDown();
}
}, null);
- latch.await();
+ if (!latch.await(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
+ throw new ManagedLedgerException("Timeout during update managedLedger's properties");
+ }
+
+ if (result.exception != null) {
+ log.error("[{}] Update managedLedger's properties failed", name, result.exception);
+ throw result.exception;
+ }
}
- @Override
- public void asyncSetProperties(Map<String, String> properties, final SetPropertiesCallback callback, Object ctx) {
- store.getManagedLedgerInfo(name, false, new MetaStoreCallback<ManagedLedgerInfo>() {
+ private void asyncUpdateProperties(Map<String, String> properties, boolean isDelete,
+ String deleteKey, final UpdatePropertiesCallback callback, Object ctx) {
+ if (!metadataMutex.tryLock()) {
+ // Defer update for later
+ scheduledExecutor.schedule(() -> asyncUpdateProperties(properties, isDelete, deleteKey,
+ callback, ctx), 100, TimeUnit.MILLISECONDS);
+ return;
+ }
+ if (isDelete) {
+ propertiesMap.remove(deleteKey);
+ } else {
+ propertiesMap.putAll(properties);
+ }
+ store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
@Override
- public void operationComplete(ManagedLedgerInfo result, Stat version) {
+ public void operationComplete(Void result, Stat version) {
ledgersStat = version;
- // Update manageLedger's properties.
- ManagedLedgerInfo.Builder info = ManagedLedgerInfo.newBuilder(result);
- info.clearProperties();
- for (Map.Entry<String, String> property : properties.entrySet()) {
- info.addProperties(MLDataFormats.KeyValue.newBuilder().setKey(property.getKey()).setValue(property.getValue()));
- }
- store.asyncUpdateLedgerIds(name, info.build(), version, new MetaStoreCallback<Void>() {
- @Override
- public void operationComplete(Void result, Stat version) {
- ledgersStat = version;
- propertiesMap.clear();
- propertiesMap.putAll(properties);
- callback.setPropertiesComplete(properties, ctx);
- }
-
- @Override
- public void operationFailed(MetaStoreException e) {
- log.error("[{}] Update manageLedger's info failed:{}", name, e.getMessage());
- callback.setPropertiesFailed(e, ctx);
- }
- });
+ callback.updatePropertiesComplete(propertiesMap, ctx);
+ metadataMutex.unlock();
}
@Override
public void operationFailed(MetaStoreException e) {
- log.error("[{}] Get manageLedger's info failed:{}", name, e.getMessage());
- callback.setPropertiesFailed(e, ctx);
+ log.error("[{}] Update managedLedger's properties failed", name, e);
+ callback.updatePropertiesFailed(e, ctx);
+ metadataMutex.unlock();
}
});
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index e507c99..2806a10 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -53,6 +53,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@@ -63,7 +64,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -95,7 +95,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
@@ -1168,7 +1167,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
}
@Test
- public void testSetProperties() throws Exception {
+ public void testUpdateProperties() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
Map<String, String> properties = new HashMap<>();
properties.put("key1", "value1");
@@ -1177,40 +1176,107 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
ledger.setProperties(properties);
assertEquals(ledger.getProperties(), properties);
+ properties.put("key4", "value4");
+ ledger.setProperty("key4", "value4");
+ assertEquals(ledger.getProperties(), properties);
+
+ ledger.deleteProperty("key4");
+ properties.remove("key4");
+ assertEquals(ledger.getProperties(), properties);
+
Map<String, String> newProperties = new HashMap<>();
- newProperties.put("key4", "value4");
newProperties.put("key5", "value5");
- newProperties.put("key6", "value6");
+ newProperties.put("key1", "value6");
+ newProperties.putAll(properties);
ledger.setProperties(newProperties);
assertEquals(ledger.getProperties(), newProperties);
}
@Test
- public void testAsyncSetProperties() throws Exception {
- final CountDownLatch latch = new CountDownLatch(1);
+ public void testAsyncUpdateProperties() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(3);
ManagedLedger ledger = factory.open("my_test_ledger");
- Map<String, String> properties = new HashMap<>();
- properties.put("key1", "value1");
- properties.put("key2", "value2");
- properties.put("key3", "value3");
- ledger.setProperties(properties);
- Map<String, String> newProperties = new HashMap<>();
- newProperties.put("key4", "value4");
- newProperties.put("key5", "value5");
- newProperties.put("key6", "value6");
- ledger.asyncSetProperties(newProperties, new AsyncCallbacks.SetPropertiesCallback() {
+ Map<String, String> prop = new HashMap<>();
+ prop.put("key1", "value1");
+ prop.put("key2", "value2");
+ prop.put("key3", "value3");
+ ledger.asyncSetProperties(prop, new AsyncCallbacks.UpdatePropertiesCallback() {
@Override
- public void setPropertiesComplete(Map<String, String> properties, Object ctx) {
+ public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
+ assertEquals(prop, properties);
latch.countDown();
}
@Override
- public void setPropertiesFailed(ManagedLedgerException exception, Object ctx) {
- fail("should have succeeded");
+ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
}
}, null);
- latch.await();
- assertEquals(ledger.getProperties(), newProperties);
+
+ ledger.asyncSetProperty("key4", "value4", new AsyncCallbacks.UpdatePropertiesCallback() {
+ @Override
+ public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
+ assertNotNull(properties.get("key4"));
+ assertEquals("value4", properties.get("key4"));
+ latch.countDown();
+ }
+
+ @Override
+ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
+ }
+ }, null);
+
+ prop.remove("key1");
+ ledger.asyncDeleteProperty("key1", new AsyncCallbacks.UpdatePropertiesCallback() {
+ @Override
+ public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
+ assertNull(properties.get("key1"));
+ latch.countDown();
+ }
+
+ @Override
+ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
+ }
+ }, null);
+ assertTrue(latch.await(60, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testConcurrentAsyncSetProperties() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1000);
+ ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+ ExecutorService executor = Executors.newCachedThreadPool();
+ for (int i = 0; i < 1000; i++) {
+ final int finalI = i;
+ executor.execute(() -> {
+ Map<String, String> newProperties = new HashMap<>();
+ newProperties.put("key0", String.valueOf(finalI));
+ newProperties.put("key1", "value1");
+ newProperties.put("key2", "value2");
+ newProperties.put("key3", "value3");
+ ledger.asyncSetProperties(newProperties, new AsyncCallbacks.UpdatePropertiesCallback() {
+ @Override
+ public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
+ assertEquals(properties, newProperties);
+ latch.countDown();
+ }
+
+ @Override
+ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
+ }
+ }, null);
+ });
+ }
+ try {
+ for (int i = 0; i < 100; i++) {
+ ledger.addEntry("data".getBytes(Encoding));
+ Thread.sleep(300);
+ }
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ assertTrue(latch.await(300, TimeUnit.SECONDS));
+ executor.shutdown();
+ factory.shutdown();
}
@Test