You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/28 18:19:55 UTC

[pulsar] branch master updated: Fix bug related to managedLedger properties (#7357)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a3a63a3  Fix bug related to managedLedger properties (#7357)
a3a63a3 is described below

commit a3a63a35c9ff406be05bcfd388f520d40580954e
Author: Hao Zhang <zh...@cmss.chinamobile.com>
AuthorDate: Mon Jun 29 02:19:37 2020 +0800

    Fix bug related to managedLedger properties (#7357)
    
    * Remove re-read from zk, and use the same mutex when update metadata.
    
    * Add setProperty(), deleteProperty() API and test ledger changed when add metadata.
    
    * Add AsyncSetProperty(), asyncDeleteProperty() API and add related unit tests.
    
    * Fix unit test.
    
    * Fix exception propagation.
---
 .../apache/bookkeeper/mledger/AsyncCallbacks.java  |   6 +-
 .../apache/bookkeeper/mledger/ManagedLedger.java   |  46 ++++++-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 135 +++++++++++++--------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 110 +++++++++++++----
 4 files changed, 221 insertions(+), 76 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
index 0add10f..1ced748 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
@@ -139,9 +139,9 @@ public interface AsyncCallbacks {
         void offloadFailed(ManagedLedgerException exception, Object ctx);
     }
 
-    interface SetPropertiesCallback {
-        void setPropertiesComplete(Map<String, String> properties, Object ctx);
+    interface UpdatePropertiesCallback {
+        void updatePropertiesComplete(Map<String, String> properties, Object ctx);
 
-        void setPropertiesFailed(ManagedLedgerException exception, Object ctx);
+        void updatePropertiesFailed(ManagedLedgerException exception, Object ctx);
     }
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 4f4b8ca..a8a4509 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -451,11 +451,51 @@ public interface ManagedLedger {
     Map<String, String> getProperties();
 
     /**
+     * Add key-value to propertiesMap.
+     *
+     * @param key key of property to add
+     * @param value value of property to add
+     * @throws InterruptedException
+     * @throws ManagedLedgerException
+     */
+    void setProperty(String key, String value) throws InterruptedException, ManagedLedgerException;
+
+    /**
+     * Async add key-value to propertiesMap.
+     *
+     * @param key      key of property to add
+     * @param value    value of property to add
+     * @param callback a callback which will be supplied with the newest properties in managedLedger.
+     * @param ctx      a context object which will be passed to the callback on completion.
+     **/
+    void asyncSetProperty(String key, String value, final AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx);
+
+    /**
+     * Delete the property by key.
+     *
+     * @param key key of property to delete
+     * @throws InterruptedException
+     * @throws ManagedLedgerException
+     */
+    void deleteProperty(String key) throws InterruptedException, ManagedLedgerException;
+
+    /**
+     * Async delete the property by key.
+     *
+     * @param key      key of property to delete
+     * @param callback a callback which will be supplied with the newest properties in managedLedger.
+     * @param ctx      a context object which will be passed to the callback on completion.
+     */
+    void asyncDeleteProperty(String key, final AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx);
+
+    /**
      * Update managed-ledger's properties.
      *
      * @param properties key-values of properties
+     * @throws InterruptedException
+     * @throws ManagedLedgerException
      */
-    void setProperties(Map<String, String> properties) throws InterruptedException;
+    void setProperties(Map<String, String> properties) throws InterruptedException, ManagedLedgerException;
 
     /**
      * Async update managed-ledger's properties.
@@ -464,9 +504,9 @@ public interface ManagedLedger {
      * @param callback   a callback which will be supplied with the newest properties in managedLedger.
      * @param ctx        a context object which will be passed to the callback on completion.
      */
-    void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.SetPropertiesCallback callback,
+    void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.UpdatePropertiesCallback callback,
         Object ctx);
-  
+
     /**
      * Trim consumed ledgers in background
      * @param promise
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 27d8848..f45b341 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkState;
 import static java.lang.Math.min;
 import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.ImmutableMap;
@@ -31,12 +30,10 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Range;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
-
 import java.time.Clock;
 import java.util.Collections;
 import java.util.HashMap;
@@ -67,7 +64,6 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
-
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -90,8 +86,8 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.SetPropertiesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -176,10 +172,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     private ScheduledFuture<?> timeoutTask;
 
     /**
-     * This lock is held while the ledgers list is updated asynchronously on the metadata store. Since we use the store
+     * This lock is held while the ledgers list or propertiesMap is updated asynchronously on the metadata store. Since we use the store
      * version, we cannot have multiple concurrent updates.
      */
-    private final CallbackMutex ledgersListMutex = new CallbackMutex();
+    private final CallbackMutex metadataMutex = new CallbackMutex();
     private final CallbackMutex trimmerMutex = new CallbackMutex();
 
     private final CallbackMutex offloadMutex = new CallbackMutex();
@@ -1230,7 +1226,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                         log.debug("[{}] Updating of ledgers list after create complete. version={}", name, stat);
                     }
                     ledgersStat = stat;
-                    ledgersListMutex.unlock();
+                    metadataMutex.unlock();
                     updateLedgersIdsComplete(stat);
                     synchronized (ManagedLedgerImpl.this) {
                         mbean.addLedgerSwitchLatencySample(System.nanoTime() - lastLedgerCreationInitiationTimestamp,
@@ -1267,7 +1263,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                         }
                     }, null);
 
-                    ledgersListMutex.unlock();
+                    metadataMutex.unlock();
 
                     synchronized (ManagedLedgerImpl.this) {
                         lastLedgerCreationFailureTimestamp = clock.millis();
@@ -1282,7 +1278,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback) {
-        if (!ledgersListMutex.tryLock()) {
+        if (!metadataMutex.tryLock()) {
             // Defer update for later
             scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback), 100, TimeUnit.MILLISECONDS);
             return;
@@ -2062,7 +2058,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             }
 
             if (STATE_UPDATER.get(this) == State.CreatingLedger // Give up now and schedule a new trimming
-                    || !ledgersListMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list
+                    || !metadataMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list
                 scheduleDeferredTrimming(promise);
                 trimmerMutex.unlock();
                 return;
@@ -2101,7 +2097,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
                             TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
                     ledgersStat = stat;
-                    ledgersListMutex.unlock();
+                    metadataMutex.unlock();
                     trimmerMutex.unlock();
 
                     for (LedgerInfo ls : ledgersToDelete) {
@@ -2119,7 +2115,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 @Override
                 public void operationFailed(MetaStoreException e) {
                     log.warn("[{}] Failed to update the list of ledgers after trimming", name, e);
-                    ledgersListMutex.unlock();
+                    metadataMutex.unlock();
                     trimmerMutex.unlock();
 
                     promise.completeExceptionally(e);
@@ -2531,7 +2527,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     private void tryTransformLedgerInfo(long ledgerId, LedgerInfoTransformation transformation,
             CompletableFuture<Void> finalPromise) {
         synchronized (this) {
-            if (!ledgersListMutex.tryLock()) {
+            if (!metadataMutex.tryLock()) {
                 // retry in 100 milliseconds
                 scheduledExecutor.schedule(
                         safeRun(() -> tryTransformLedgerInfo(ledgerId, transformation, finalPromise)), 100,
@@ -2539,7 +2535,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             } else { // lock acquired
                 CompletableFuture<Void> unlockingPromise = new CompletableFuture<>();
                 unlockingPromise.whenComplete((res, ex) -> {
-                    ledgersListMutex.unlock();
+                    metadataMutex.unlock();
                     if (ex != null) {
                         finalPromise.completeExceptionally(ex);
                     } else {
@@ -3020,6 +3016,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId())
                     .setEntryId(lastConfirmedEntry.getEntryId()));
         }
+        for (Map.Entry<String, String> property : propertiesMap.entrySet()) {
+            mlInfo.addProperties(MLDataFormats.KeyValue.newBuilder()
+                    .setKey(property.getKey()).setValue(property.getValue()));
+        }
 
         return mlInfo.build();
     }
@@ -3271,57 +3271,96 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     @Override
-    public void setProperties(Map<String, String> properties) throws InterruptedException {
+    public void setProperty(String key, String value) throws InterruptedException, ManagedLedgerException {
+        Map<String, String> map = new HashMap<>();
+        map.put(key, value);
+        updateProperties(map, false, null);
+    }
+
+    @Override
+    public void asyncSetProperty(String key, String value, final UpdatePropertiesCallback callback, Object ctx) {
+        Map<String, String> map = new HashMap<>();
+        map.put(key, value);
+        asyncUpdateProperties(map, false, null, callback, ctx);
+    }
+
+    @Override
+    public void deleteProperty(String key) throws InterruptedException, ManagedLedgerException {
+        updateProperties(null, true, key);
+    }
+
+    @Override
+    public void asyncDeleteProperty(String key, final UpdatePropertiesCallback callback, Object ctx) {
+        asyncUpdateProperties(null, true, key, callback, ctx);
+    }
+
+    @Override
+    public void setProperties(Map<String, String> properties) throws InterruptedException, ManagedLedgerException {
+        updateProperties(properties, false, null);
+    }
+
+    @Override
+    public void asyncSetProperties(Map<String, String> properties, final UpdatePropertiesCallback callback,
+        Object ctx) {
+        asyncUpdateProperties(properties, false, null, callback, ctx);
+    }
+
+    private void updateProperties(Map<String, String> properties, boolean isDelete,
+        String deleteKey) throws InterruptedException, ManagedLedgerException {
         final CountDownLatch latch = new CountDownLatch(1);
-        this.asyncSetProperties(properties, new SetPropertiesCallback() {
+        class Result {
+            ManagedLedgerException exception = null;
+        }
+        final Result result = new Result();
+        this.asyncUpdateProperties(properties, isDelete, deleteKey, new UpdatePropertiesCallback() {
             @Override
-            public void setPropertiesComplete(Map<String, String> properties, Object ctx) {
+            public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
                 latch.countDown();
             }
 
             @Override
-            public void setPropertiesFailed(ManagedLedgerException exception, Object ctx) {
-                log.error("[{}] Update manageLedger's info failed:{}", name, exception.getMessage());
+            public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
+                result.exception = exception;
                 latch.countDown();
             }
         }, null);
 
-        latch.await();
+        if (!latch.await(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
+            throw new ManagedLedgerException("Timeout during update managedLedger's properties");
+        }
+
+        if (result.exception != null) {
+            log.error("[{}] Update managedLedger's properties failed", name, result.exception);
+            throw result.exception;
+        }
     }
 
-    @Override
-    public void asyncSetProperties(Map<String, String> properties, final SetPropertiesCallback callback, Object ctx) {
-        store.getManagedLedgerInfo(name, false, new MetaStoreCallback<ManagedLedgerInfo>() {
+    private void asyncUpdateProperties(Map<String, String> properties, boolean isDelete,
+        String deleteKey, final UpdatePropertiesCallback callback, Object ctx) {
+        if (!metadataMutex.tryLock()) {
+            // Defer update for later
+            scheduledExecutor.schedule(() -> asyncUpdateProperties(properties, isDelete, deleteKey,
+                callback, ctx), 100, TimeUnit.MILLISECONDS);
+            return;
+        }
+        if (isDelete) {
+            propertiesMap.remove(deleteKey);
+        } else {
+            propertiesMap.putAll(properties);
+        }
+        store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
             @Override
-            public void operationComplete(ManagedLedgerInfo result, Stat version) {
+            public void operationComplete(Void result, Stat version) {
                 ledgersStat = version;
-                // Update manageLedger's  properties.
-                ManagedLedgerInfo.Builder info = ManagedLedgerInfo.newBuilder(result);
-                info.clearProperties();
-                for (Map.Entry<String, String> property : properties.entrySet()) {
-                    info.addProperties(MLDataFormats.KeyValue.newBuilder().setKey(property.getKey()).setValue(property.getValue()));
-                }
-                store.asyncUpdateLedgerIds(name, info.build(), version, new MetaStoreCallback<Void>() {
-                    @Override
-                    public void operationComplete(Void result, Stat version) {
-                        ledgersStat = version;
-                        propertiesMap.clear();
-                        propertiesMap.putAll(properties);
-                        callback.setPropertiesComplete(properties, ctx);
-                    }
-
-                    @Override
-                    public void operationFailed(MetaStoreException e) {
-                        log.error("[{}] Update manageLedger's info failed:{}", name, e.getMessage());
-                        callback.setPropertiesFailed(e, ctx);
-                    }
-                });
+                callback.updatePropertiesComplete(propertiesMap, ctx);
+                metadataMutex.unlock();
             }
 
             @Override
             public void operationFailed(MetaStoreException e) {
-                log.error("[{}] Get manageLedger's info failed:{}", name, e.getMessage());
-                callback.setPropertiesFailed(e, ctx);
+                log.error("[{}] Update managedLedger's properties failed", name, e);
+                callback.updatePropertiesFailed(e, ctx);
+                metadataMutex.unlock();
             }
         });
     }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index e507c99..2806a10 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -53,6 +53,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -63,7 +64,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -95,7 +95,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
@@ -1168,7 +1167,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
     }
 
     @Test
-    public void testSetProperties() throws Exception {
+    public void testUpdateProperties() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger");
         Map<String, String> properties = new HashMap<>();
         properties.put("key1", "value1");
@@ -1177,40 +1176,107 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ledger.setProperties(properties);
         assertEquals(ledger.getProperties(), properties);
 
+        properties.put("key4", "value4");
+        ledger.setProperty("key4", "value4");
+        assertEquals(ledger.getProperties(), properties);
+
+        ledger.deleteProperty("key4");
+        properties.remove("key4");
+        assertEquals(ledger.getProperties(), properties);
+
         Map<String, String> newProperties = new HashMap<>();
-        newProperties.put("key4", "value4");
         newProperties.put("key5", "value5");
-        newProperties.put("key6", "value6");
+        newProperties.put("key1", "value6");
+        newProperties.putAll(properties);
         ledger.setProperties(newProperties);
         assertEquals(ledger.getProperties(), newProperties);
     }
 
     @Test
-    public void testAsyncSetProperties() throws Exception {
-        final CountDownLatch latch = new CountDownLatch(1);
+    public void testAsyncUpdateProperties() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(3);
         ManagedLedger ledger = factory.open("my_test_ledger");
-        Map<String, String> properties = new HashMap<>();
-        properties.put("key1", "value1");
-        properties.put("key2", "value2");
-        properties.put("key3", "value3");
-        ledger.setProperties(properties);
-        Map<String, String> newProperties = new HashMap<>();
-        newProperties.put("key4", "value4");
-        newProperties.put("key5", "value5");
-        newProperties.put("key6", "value6");
-        ledger.asyncSetProperties(newProperties, new AsyncCallbacks.SetPropertiesCallback() {
+        Map<String, String> prop = new HashMap<>();
+        prop.put("key1", "value1");
+        prop.put("key2", "value2");
+        prop.put("key3", "value3");
+        ledger.asyncSetProperties(prop, new AsyncCallbacks.UpdatePropertiesCallback() {
             @Override
-            public void setPropertiesComplete(Map<String, String> properties, Object ctx) {
+            public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
+                assertEquals(prop, properties);
                 latch.countDown();
             }
 
             @Override
-            public void setPropertiesFailed(ManagedLedgerException exception, Object ctx) {
-                fail("should have succeeded");
+            public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
             }
         }, null);
-        latch.await();
-        assertEquals(ledger.getProperties(), newProperties);
+
+        ledger.asyncSetProperty("key4", "value4", new AsyncCallbacks.UpdatePropertiesCallback() {
+            @Override
+            public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
+                assertNotNull(properties.get("key4"));
+                assertEquals("value4", properties.get("key4"));
+                latch.countDown();
+            }
+
+            @Override
+            public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
+            }
+        }, null);
+
+        prop.remove("key1");
+        ledger.asyncDeleteProperty("key1", new AsyncCallbacks.UpdatePropertiesCallback() {
+            @Override
+            public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
+                assertNull(properties.get("key1"));
+                latch.countDown();
+            }
+
+            @Override
+            public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
+            }
+        }, null);
+        assertTrue(latch.await(60, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testConcurrentAsyncSetProperties() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1000);
+        ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+        ExecutorService executor = Executors.newCachedThreadPool();
+        for (int i = 0; i < 1000; i++) {
+            final int finalI = i;
+            executor.execute(() -> {
+                Map<String, String> newProperties = new HashMap<>();
+                newProperties.put("key0", String.valueOf(finalI));
+                newProperties.put("key1", "value1");
+                newProperties.put("key2", "value2");
+                newProperties.put("key3", "value3");
+                ledger.asyncSetProperties(newProperties, new AsyncCallbacks.UpdatePropertiesCallback() {
+                    @Override
+                    public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
+                        assertEquals(properties, newProperties);
+                        latch.countDown();
+                    }
+
+                    @Override
+                    public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
+                    }
+                }, null);
+            });
+        }
+        try {
+            for (int i = 0; i < 100; i++) {
+                ledger.addEntry("data".getBytes(Encoding));
+                Thread.sleep(300);
+            }
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+        assertTrue(latch.await(300, TimeUnit.SECONDS));
+        executor.shutdown();
+        factory.shutdown();
     }
 
     @Test