You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/05/27 12:59:32 UTC
[pulsar] branch master updated: Add properties for managed-ledger.
(#7054)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d68829c Add properties for managed-ledger. (#7054)
d68829c is described below
commit d68829c00101649f41ab78cb808ce7da02bbd9ec
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed May 27 20:59:16 2020 +0800
Add properties for managed-ledger. (#7054)
### Motivation
Add properties for managed-ledger. We can use properties to store some metadata such as some metadata that protocol handler need to persist.
### Verifying this change
New unit tests added
---
.../apache/bookkeeper/mledger/AsyncCallbacks.java | 7 ++
.../apache/bookkeeper/mledger/ManagedLedger.java | 24 ++++++
.../bookkeeper/mledger/ManagedLedgerInfo.java | 2 +
.../mledger/impl/ManagedLedgerFactoryImpl.java | 8 ++
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 73 +++++++++++++++++
.../bookkeeper/mledger/impl/MetaStoreImpl.java | 1 +
managed-ledger/src/main/proto/MLDataFormats.proto | 2 +
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 93 ++++++++++++++++++++++
8 files changed, 210 insertions(+)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
index 8a21385..0add10f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger;
import com.google.common.annotations.Beta;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
/**
@@ -137,4 +138,10 @@ public interface AsyncCallbacks {
void offloadFailed(ManagedLedgerException exception, Object ctx);
}
+
+ interface SetPropertiesCallback {
+ void setPropertiesComplete(Map<String, String> properties, Object ctx);
+
+ void setPropertiesFailed(ManagedLedgerException exception, Object ctx);
+ }
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index d1174f6..714dd0e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -444,6 +444,30 @@ public interface ManagedLedger {
void readyToCreateNewLedger();
/**
+ * Returns managed-ledger's properties.
+ *
+ * @return key-values of properties
+ */
+ Map<String, String> getProperties();
+
+ /**
+ * Update managed-ledger's properties.
+ *
+ * @param properties key-values of properties
+ */
+ void setProperties(Map<String, String> properties) throws InterruptedException;
+
+ /**
+ * Async update managed-ledger's properties.
+ *
+ * @param properties key-values of properties.
+ * @param callback a callback which will be supplied with the newest properties in managedLedger.
+ * @param ctx a context object which will be passed to the callback on completion.
+ */
+ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.SetPropertiesCallback callback,
+ Object ctx);
+
+ /**
* Trim consumed ledgers in background
* @param promise
*/
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java
index c03da89..2afd119 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java
@@ -33,6 +33,8 @@ public class ManagedLedgerInfo {
public Map<String, CursorInfo> cursors;
+ public Map<String, String> properties;
+
public static class LedgerInfo {
public long ledgerId;
public Long entries;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 66ad3ba..bf581e7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -519,6 +519,14 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
info.terminatedPosition.entryId = pbInfo.getTerminatedPosition().getEntryId();
}
+ if (pbInfo.getPropertiesCount() > 0) {
+ info.properties = Maps.newTreeMap();
+ for (int i = 0; i < pbInfo.getPropertiesCount(); i++) {
+ MLDataFormats.KeyValue property = pbInfo.getProperties(i);
+ info.properties.put(property.getKey(), property.getValue());
+ }
+ }
+
for (int i = 0; i < pbInfo.getLedgerInfoCount(); i++) {
MLDataFormats.ManagedLedgerInfo.LedgerInfo pbLedgerInfo = pbInfo.getLedgerInfo(i);
LedgerInfo ledgerInfo = new LedgerInfo();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index c7dc19d..e70b60c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -88,6 +88,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.SetPropertiesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -109,6 +110,7 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.offload.OffloadUtils;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;
@@ -133,6 +135,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final BookKeeper.DigestType digestType;
protected ManagedLedgerConfig config;
+ protected Map<String, String> propertiesMap;
protected final MetaStore store;
private final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache = new ConcurrentLongHashMap<>(
@@ -280,6 +283,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
// Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time
this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0));
this.mlOwnershipChecker = mlOwnershipChecker;
+ this.propertiesMap = Maps.newHashMap();
}
synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) {
@@ -300,6 +304,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
ledgers.put(ls.getLedgerId(), ls);
}
+ if (mlInfo.getPropertiesCount() > 0) {
+ propertiesMap = Maps.newHashMap();
+ for (int i = 0; i < mlInfo.getPropertiesCount(); i++) {
+ MLDataFormats.KeyValue property = mlInfo.getProperties(i);
+ propertiesMap.put(property.getKey(), property.getValue());
+ }
+ }
+
// Last ledger stat may be zeroed, we must update it
if (ledgers.size() > 0) {
final long id = ledgers.lastKey();
@@ -3174,6 +3186,67 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
return offloadedSize;
}
+ @Override
+ public Map<String, String> getProperties() {
+ return propertiesMap;
+ }
+
+ @Override
+ public void setProperties(Map<String, String> properties) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ this.asyncSetProperties(properties, new SetPropertiesCallback() {
+ @Override
+ public void setPropertiesComplete(Map<String, String> properties, Object ctx) {
+ latch.countDown();
+ }
+
+ @Override
+ public void setPropertiesFailed(ManagedLedgerException exception, Object ctx) {
+ log.error("[{}] Update manageLedger's info failed:{}", name, exception.getMessage());
+ latch.countDown();
+ }
+ }, null);
+
+ latch.await();
+ }
+
+ @Override
+ public void asyncSetProperties(Map<String, String> properties, final SetPropertiesCallback callback, Object ctx) {
+ store.getManagedLedgerInfo(name, false, new MetaStoreCallback<ManagedLedgerInfo>() {
+ @Override
+ public void operationComplete(ManagedLedgerInfo result, Stat version) {
+ ledgersStat = version;
+ // Update manageLedger's properties.
+ ManagedLedgerInfo.Builder info = ManagedLedgerInfo.newBuilder(result);
+ info.clearProperties();
+ for (Map.Entry<String, String> property : properties.entrySet()) {
+ info.addProperties(MLDataFormats.KeyValue.newBuilder().setKey(property.getKey()).setValue(property.getValue()));
+ }
+ store.asyncUpdateLedgerIds(name, info.build(), version, new MetaStoreCallback<Void>() {
+ @Override
+ public void operationComplete(Void result, Stat version) {
+ ledgersStat = version;
+ propertiesMap.clear();
+ propertiesMap.putAll(properties);
+ callback.setPropertiesComplete(properties, ctx);
+ }
+
+ @Override
+ public void operationFailed(MetaStoreException e) {
+ log.error("[{}] Update manageLedger's info failed:{}", name, e.getMessage());
+ callback.setPropertiesFailed(e, ctx);
+ }
+ });
+ }
+
+ @Override
+ public void operationFailed(MetaStoreException e) {
+ log.error("[{}] Get manageLedger's info failed:{}", name, e.getMessage());
+ callback.setPropertiesFailed(e, ctx);
+ }
+ });
+ }
+
@VisibleForTesting
public void setEntriesAddedCounter(long count) {
ENTRIES_ADDED_COUNTER_UPDATER.set(this, count);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
index 50cf13a..caa21a1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java
@@ -253,6 +253,7 @@ public class MetaStoreImpl implements MetaStore {
if (info.hasTerminatedPosition()) {
mlInfo.setTerminatedPosition(info.getTerminatedPosition());
}
+ mlInfo.addAllProperties(info.getPropertiesList());
return mlInfo.build();
}
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto
index 4dbd231..151cd69 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -56,6 +56,8 @@ message ManagedLedgerInfo {
// committed entry.
// No more entries can be written.
optional NestedPositionInfo terminatedPosition = 2;
+
+ repeated KeyValue properties = 3;
}
message PositionInfo {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index a5e6408..2b02e0b 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -47,6 +47,7 @@ import java.nio.charset.Charset;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -75,6 +76,7 @@ import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
@@ -97,6 +99,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
@@ -1170,6 +1173,52 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
}
@Test
+ public void testSetProperties() throws Exception {
+ ManagedLedger ledger = factory.open("my_test_ledger");
+ Map<String, String> properties = new HashMap<>();
+ properties.put("key1", "value1");
+ properties.put("key2", "value2");
+ properties.put("key3", "value3");
+ ledger.setProperties(properties);
+ assertEquals(ledger.getProperties(), properties);
+
+ Map<String, String> newProperties = new HashMap<>();
+ newProperties.put("key4", "value4");
+ newProperties.put("key5", "value5");
+ newProperties.put("key6", "value6");
+ ledger.setProperties(newProperties);
+ assertEquals(ledger.getProperties(), newProperties);
+ }
+
+ @Test
+ public void testAsyncSetProperties() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ ManagedLedger ledger = factory.open("my_test_ledger");
+ Map<String, String> properties = new HashMap<>();
+ properties.put("key1", "value1");
+ properties.put("key2", "value2");
+ properties.put("key3", "value3");
+ ledger.setProperties(properties);
+ Map<String, String> newProperties = new HashMap<>();
+ newProperties.put("key4", "value4");
+ newProperties.put("key5", "value5");
+ newProperties.put("key6", "value6");
+ ledger.asyncSetProperties(newProperties, new AsyncCallbacks.SetPropertiesCallback() {
+ @Override
+ public void setPropertiesComplete(Map<String, String> properties, Object ctx) {
+ latch.countDown();
+ }
+
+ @Override
+ public void setPropertiesFailed(ManagedLedgerException exception, Object ctx) {
+ fail("should have succeeded");
+ }
+ }, null);
+ latch.await();
+ assertEquals(ledger.getProperties(), newProperties);
+ }
+
+ @Test
public void ledgersList() throws Exception {
MetaStore store = factory.getMetaStore();
@@ -2522,6 +2571,50 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
return failed.getValue();
}
+
+ @Test
+ public void testPropertiesForMeta() throws Exception {
+ ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+
+ final String mLName = "properties_test";
+ factory.open(mLName);
+ MetaStore store = new MetaStoreImpl(new ZKMetadataStore(zkc), executor);
+
+ ManagedLedgerInfo.Builder builder = ManagedLedgerInfo.newBuilder();
+ builder.addProperties(MLDataFormats.KeyValue.newBuilder().setKey("key1").setValue("value1").build());
+ builder.addProperties(MLDataFormats.KeyValue.newBuilder().setKey("key2").setValue("value2").build());
+
+ CountDownLatch l2 = new CountDownLatch(1);
+ store.asyncUpdateLedgerIds(mLName, builder.build(),
+ new Stat(1, 0, 0),
+ new MetaStoreCallback<Void>() {
+ @Override
+ public void operationComplete(Void result, Stat version) {
+ l2.countDown();
+ }
+
+ @Override
+ public void operationFailed(MetaStoreException e) {
+ fail("on asyncUpdateLedgerIds");
+ }
+ });
+
+ // get ManagedLedgerInfo from meta store
+ org.apache.bookkeeper.mledger.ManagedLedgerInfo managedLedgerInfo = factory.getManagedLedgerInfo(mLName);
+ Map<String, String> properties = managedLedgerInfo.properties;
+ assertEquals(properties.get("key1"), "value1");
+ assertEquals(properties.get("key2"), "value2");
+
+ factory.shutdown();
+ factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+
+ // reopen managedLedger
+ ManagedLedger ml = factory.open(mLName);
+ properties = ml.getProperties();
+ assertEquals(properties.get("key1"), "value1");
+ assertEquals(properties.get("key2"), "value2");
+ }
+
private void createLedger(ManagedLedgerFactoryImpl factory, MutableObject<ManagedLedger> ledger1,
MutableObject<ManagedCursorImpl> cursor1, boolean checkOwnershipFlag) throws Exception {
CountDownLatch latch = new CountDownLatch(1);