You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/05/27 12:59:32 UTC

[pulsar] branch master updated: Add properties for managed-ledger. (#7054)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d68829c  Add properties for managed-ledger. (#7054)
d68829c is described below

commit d68829c00101649f41ab78cb808ce7da02bbd9ec
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed May 27 20:59:16 2020 +0800

    Add properties for managed-ledger. (#7054)
    
    ### Motivation
    
    Add properties for managed-ledger. We can use properties to store some metadata such as some metadata that protocol handler need to persist.
    
    ### Verifying this change
    
    New unit tests added
---
 .../apache/bookkeeper/mledger/AsyncCallbacks.java  |  7 ++
 .../apache/bookkeeper/mledger/ManagedLedger.java   | 24 ++++++
 .../bookkeeper/mledger/ManagedLedgerInfo.java      |  2 +
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  8 ++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 73 +++++++++++++++++
 .../bookkeeper/mledger/impl/MetaStoreImpl.java     |  1 +
 managed-ledger/src/main/proto/MLDataFormats.proto  |  2 +
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 93 ++++++++++++++++++++++
 8 files changed, 210 insertions(+)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
index 8a21385..0add10f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger;
 
 import com.google.common.annotations.Beta;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -137,4 +138,10 @@ public interface AsyncCallbacks {
 
         void offloadFailed(ManagedLedgerException exception, Object ctx);
     }
+
+    interface SetPropertiesCallback {
+        void setPropertiesComplete(Map<String, String> properties, Object ctx);
+
+        void setPropertiesFailed(ManagedLedgerException exception, Object ctx);
+    }
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index d1174f6..714dd0e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -444,6 +444,30 @@ public interface ManagedLedger {
     void readyToCreateNewLedger();
 
     /**
+     * Returns managed-ledger's properties.
+     *
+     * @return key-values of properties
+     */
+    Map<String, String> getProperties();
+
+    /**
+     * Update managed-ledger's properties.
+     *
+     * @param properties key-values of properties
+     */
+    void setProperties(Map<String, String> properties) throws InterruptedException;
+
+    /**
+     * Async update managed-ledger's properties.
+     *
+     * @param properties key-values of properties.
+     * @param callback   a callback which will be supplied with the newest properties in managedLedger.
+     * @param ctx        a context object which will be passed to the callback on completion.
+     */
+    void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.SetPropertiesCallback callback,
+        Object ctx);
+  
+    /**
      * Trim consumed ledgers in background
      * @param promise
      */
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java
index c03da89..2afd119 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java
@@ -33,6 +33,8 @@ public class ManagedLedgerInfo {
 
     public Map<String, CursorInfo> cursors;
 
+    public Map<String, String> properties;
+
     public static class LedgerInfo {
         public long ledgerId;
         public Long entries;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 66ad3ba..bf581e7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -519,6 +519,14 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
                     info.terminatedPosition.entryId = pbInfo.getTerminatedPosition().getEntryId();
                 }
 
+                if (pbInfo.getPropertiesCount() > 0) {
+                    info.properties = Maps.newTreeMap();
+                    for (int i = 0; i < pbInfo.getPropertiesCount(); i++) {
+                        MLDataFormats.KeyValue property = pbInfo.getProperties(i);
+                        info.properties.put(property.getKey(), property.getValue());
+                    }
+                }
+
                 for (int i = 0; i < pbInfo.getLedgerInfoCount(); i++) {
                     MLDataFormats.ManagedLedgerInfo.LedgerInfo pbLedgerInfo = pbInfo.getLedgerInfo(i);
                     LedgerInfo ledgerInfo = new LedgerInfo();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c7dc19d..e70b60c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -88,6 +88,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.SetPropertiesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -109,6 +110,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.offload.OffloadUtils;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;
@@ -133,6 +135,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     private final BookKeeper.DigestType digestType;
 
     protected ManagedLedgerConfig config;
+    protected Map<String, String> propertiesMap;
     protected final MetaStore store;
 
     private final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache = new ConcurrentLongHashMap<>(
@@ -280,6 +283,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         // Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time
         this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0));
         this.mlOwnershipChecker = mlOwnershipChecker;
+        this.propertiesMap = Maps.newHashMap();
     }
 
     synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) {
@@ -300,6 +304,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     ledgers.put(ls.getLedgerId(), ls);
                 }
 
+                if (mlInfo.getPropertiesCount() > 0) {
+                    propertiesMap = Maps.newHashMap();
+                    for (int i = 0; i < mlInfo.getPropertiesCount(); i++) {
+                        MLDataFormats.KeyValue property = mlInfo.getProperties(i);
+                        propertiesMap.put(property.getKey(), property.getValue());
+                    }
+                }
+
                 // Last ledger stat may be zeroed, we must update it
                 if (ledgers.size() > 0) {
                     final long id = ledgers.lastKey();
@@ -3174,6 +3186,67 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         return offloadedSize;
     }
 
+    @Override
+    public Map<String, String> getProperties() {
+        return propertiesMap;
+    }
+
+    @Override
+    public void setProperties(Map<String, String> properties) throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+        this.asyncSetProperties(properties, new SetPropertiesCallback() {
+            @Override
+            public void setPropertiesComplete(Map<String, String> properties, Object ctx) {
+                latch.countDown();
+            }
+
+            @Override
+            public void setPropertiesFailed(ManagedLedgerException exception, Object ctx) {
+                log.error("[{}] Update manageLedger's info failed:{}", name, exception.getMessage());
+                latch.countDown();
+            }
+        }, null);
+
+        latch.await();
+    }
+
+    @Override
+    public void asyncSetProperties(Map<String, String> properties, final SetPropertiesCallback callback, Object ctx) {
+        store.getManagedLedgerInfo(name, false, new MetaStoreCallback<ManagedLedgerInfo>() {
+            @Override
+            public void operationComplete(ManagedLedgerInfo result, Stat version) {
+                ledgersStat = version;
+                // Update manageLedger's  properties.
+                ManagedLedgerInfo.Builder info = ManagedLedgerInfo.newBuilder(result);
+                info.clearProperties();
+                for (Map.Entry<String, String> property : properties.entrySet()) {
+                    info.addProperties(MLDataFormats.KeyValue.newBuilder().setKey(property.getKey()).setValue(property.getValue()));
+                }
+                store.asyncUpdateLedgerIds(name, info.build(), version, new MetaStoreCallback<Void>() {
+                    @Override
+                    public void operationComplete(Void result, Stat version) {
+                        ledgersStat = version;
+                        propertiesMap.clear();
+                        propertiesMap.putAll(properties);
+                        callback.setPropertiesComplete(properties, ctx);
+                    }
+
+                    @Override
+                    public void operationFailed(MetaStoreException e) {
+                        log.error("[{}] Update manageLedger's info failed:{}", name, e.getMessage());
+                        callback.setPropertiesFailed(e, ctx);
+                    }
+                });
+            }
+
+            @Override
+            public void operationFailed(MetaStoreException e) {
+                log.error("[{}] Get manageLedger's info failed:{}", name, e.getMessage());
+                callback.setPropertiesFailed(e, ctx);
+            }
+        });
+    }
+
     @VisibleForTesting
     public void setEntriesAddedCounter(long count) {
         ENTRIES_ADDED_COUNTER_UPDATER.set(this, count);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index 50cf13a..caa21a1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -253,6 +253,7 @@ public class MetaStoreImpl implements MetaStore {
         if (info.hasTerminatedPosition()) {
             mlInfo.setTerminatedPosition(info.getTerminatedPosition());
         }
+        mlInfo.addAllProperties(info.getPropertiesList());
         return mlInfo.build();
     }
 
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto
index 4dbd231..151cd69 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -56,6 +56,8 @@ message ManagedLedgerInfo {
     // committed entry.
     // No more entries can be written.
     optional NestedPositionInfo terminatedPosition = 2;
+
+    repeated KeyValue properties = 3;
 }
 
 message PositionInfo {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index a5e6408..2b02e0b 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -47,6 +47,7 @@ import java.nio.charset.Charset;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -75,6 +76,7 @@ import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
@@ -97,6 +99,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
@@ -1170,6 +1173,52 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
     }
 
     @Test
+    public void testSetProperties() throws Exception {
+        ManagedLedger ledger = factory.open("my_test_ledger");
+        Map<String, String> properties = new HashMap<>();
+        properties.put("key1", "value1");
+        properties.put("key2", "value2");
+        properties.put("key3", "value3");
+        ledger.setProperties(properties);
+        assertEquals(ledger.getProperties(), properties);
+
+        Map<String, String> newProperties = new HashMap<>();
+        newProperties.put("key4", "value4");
+        newProperties.put("key5", "value5");
+        newProperties.put("key6", "value6");
+        ledger.setProperties(newProperties);
+        assertEquals(ledger.getProperties(), newProperties);
+    }
+
+    @Test
+    public void testAsyncSetProperties() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        ManagedLedger ledger = factory.open("my_test_ledger");
+        Map<String, String> properties = new HashMap<>();
+        properties.put("key1", "value1");
+        properties.put("key2", "value2");
+        properties.put("key3", "value3");
+        ledger.setProperties(properties);
+        Map<String, String> newProperties = new HashMap<>();
+        newProperties.put("key4", "value4");
+        newProperties.put("key5", "value5");
+        newProperties.put("key6", "value6");
+        ledger.asyncSetProperties(newProperties, new AsyncCallbacks.SetPropertiesCallback() {
+            @Override
+            public void setPropertiesComplete(Map<String, String> properties, Object ctx) {
+                latch.countDown();
+            }
+
+            @Override
+            public void setPropertiesFailed(ManagedLedgerException exception, Object ctx) {
+                fail("should have succeeded");
+            }
+        }, null);
+        latch.await();
+        assertEquals(ledger.getProperties(), newProperties);
+    }
+
+    @Test
     public void ledgersList() throws Exception {
         MetaStore store = factory.getMetaStore();
 
@@ -2522,6 +2571,50 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         return failed.getValue();
     }
 
+
+    @Test
+    public void testPropertiesForMeta() throws Exception {
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+
+        final String mLName = "properties_test";
+        factory.open(mLName);
+        MetaStore store = new MetaStoreImpl(new ZKMetadataStore(zkc), executor);
+
+        ManagedLedgerInfo.Builder builder = ManagedLedgerInfo.newBuilder();
+        builder.addProperties(MLDataFormats.KeyValue.newBuilder().setKey("key1").setValue("value1").build());
+        builder.addProperties(MLDataFormats.KeyValue.newBuilder().setKey("key2").setValue("value2").build());
+
+        CountDownLatch l2 = new CountDownLatch(1);
+        store.asyncUpdateLedgerIds(mLName, builder.build(),
+                new Stat(1, 0, 0),
+                new MetaStoreCallback<Void>() {
+            @Override
+            public void operationComplete(Void result, Stat version) {
+                l2.countDown();
+            }
+
+            @Override
+            public void operationFailed(MetaStoreException e) {
+                fail("on asyncUpdateLedgerIds");
+            }
+        });
+
+        // get ManagedLedgerInfo from meta store
+        org.apache.bookkeeper.mledger.ManagedLedgerInfo managedLedgerInfo = factory.getManagedLedgerInfo(mLName);
+        Map<String, String> properties = managedLedgerInfo.properties;
+        assertEquals(properties.get("key1"), "value1");
+        assertEquals(properties.get("key2"), "value2");
+
+        factory.shutdown();
+        factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+
+        // reopen managedLedger
+        ManagedLedger ml = factory.open(mLName);
+        properties = ml.getProperties();
+        assertEquals(properties.get("key1"), "value1");
+        assertEquals(properties.get("key2"), "value2");
+    }
+
     private void createLedger(ManagedLedgerFactoryImpl factory, MutableObject<ManagedLedger> ledger1,
             MutableObject<ManagedCursorImpl> cursor1, boolean checkOwnershipFlag) throws Exception {
         CountDownLatch latch = new CountDownLatch(1);