You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2020/10/29 10:28:43 UTC
[bookkeeper] branch master updated: BP-42: New Client API - list
ledgers
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 0c84186 BP-42: New Client API - list ledgers
0c84186 is described below
commit 0c841862b547374541815d9dd9dfb4488b49ab44
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Thu Oct 29 11:28:36 2020 +0100
BP-42: New Client API - list ledgers
Implementation of BP-42
### Motivation
Enhance new client API, added methods for
- listing ledgers
- access to ledger metadata
### Changes
- Added new interfaces in "api" client package for ledgers listing
- Added new method `getLedgerMetadata(long ledgerId)` to BookKeeper interface
- Added `getLedgerId()` to LedgerMetadata type
-- Ledger metadata ser/deser are not changed since ledgerId is not stored inside ZK data node
Updated all tests using `LedgerMetadataBuilder` because ledgerId is now required in order to call `build()` method
Master Issue: #2422
Reviewers: Jia Zhai <zh...@apache.org>, Enrico Olivelli <eo...@gmail.com>
This closes #2457 from nicoloboschi/fix/2422/bp-42-list-ledgers
---
.../org/apache/bookkeeper/client/BookKeeper.java | 119 +++++++++++++
.../apache/bookkeeper/client/LedgerCreateOp.java | 16 +-
.../bookkeeper/client/LedgerMetadataBuilder.java | 10 +-
.../bookkeeper/client/LedgerMetadataImpl.java | 15 +-
.../apache/bookkeeper/client/api/BookKeeper.java | 16 ++
.../bookkeeper/client/api/LedgerMetadata.java | 6 +
.../bookkeeper/client/api/LedgersIterator.java | 41 +++++
.../bookkeeper/client/api/ListLedgersResult.java | 40 +++++
.../client/api/ListLedgersResultBuilder.java | 33 ++++
.../bookkeeper/meta/AbstractZkLedgerManager.java | 6 +-
.../bookkeeper/meta/LedgerMetadataSerDe.java | 19 +-
.../bookkeeper/meta/MSLedgerManagerFactory.java | 4 +-
.../cli/commands/client/LedgerMetaDataCommand.java | 2 +-
.../bookkeeper/client/BookKeeperAdminTest.java | 3 +-
.../org/apache/bookkeeper/client/ClientUtil.java | 2 +-
.../bookkeeper/client/LedgerMetadataTest.java | 22 +--
.../bookkeeper/client/LedgerRecovery2Test.java | 1 +
.../bookkeeper/client/MetadataUpdateLoopTest.java | 17 +-
.../apache/bookkeeper/client/MockLedgerHandle.java | 2 +-
.../client/ReadLastConfirmedAndEntryOpTest.java | 2 +-
.../client/TestLedgerFragmentReplication.java | 4 +-
.../apache/bookkeeper/client/TestSequenceRead.java | 2 +-
.../bookkeeper/client/TestWatchEnsembleChange.java | 1 +
.../client/api/BookKeeperBuildersTest.java | 1 +
.../bookkeeper/client/api/LedgerMetadataTest.java | 192 +++++++++++++++++++++
.../meta/AbstractZkLedgerManagerTest.java | 1 +
.../org/apache/bookkeeper/meta/GcLedgersTest.java | 1 +
.../bookkeeper/meta/LedgerManagerIteratorTest.java | 1 +
.../apache/bookkeeper/meta/MockLedgerManager.java | 2 +-
.../bookkeeper/meta/TestLedgerMetadataSerDe.java | 26 +--
.../replication/AuditorLedgerCheckerTest.java | 5 +-
.../AuditorPlacementPolicyCheckTest.java | 15 ++
.../replication/AuditorReplicasCheckTest.java | 4 +-
.../metadata/etcd/EtcdLedgerManager.java | 29 ++--
.../metadata/etcd/EtcdLedgerManagerTest.java | 6 +-
.../commands/client/LedgerMetaDataCommandTest.java | 4 +-
36 files changed, 585 insertions(+), 85 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index b7656d3..0a594d3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -35,6 +35,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -58,6 +59,10 @@ import org.apache.bookkeeper.client.SyncCallbackUtils.SyncOpenCallback;
import org.apache.bookkeeper.client.api.BookKeeperBuilder;
import org.apache.bookkeeper.client.api.CreateBuilder;
import org.apache.bookkeeper.client.api.DeleteBuilder;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.LedgersIterator;
+import org.apache.bookkeeper.client.api.ListLedgersResult;
+import org.apache.bookkeeper.client.api.ListLedgersResultBuilder;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
@@ -71,6 +76,7 @@ import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.meta.CleanupLedgerManager;
import org.apache.bookkeeper.meta.LedgerIdGenerator;
import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.MetadataClientDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
@@ -86,6 +92,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.EventLoopUtil;
import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.bookkeeper.versioning.Versioned;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
@@ -1473,6 +1480,118 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
return new LedgerDeleteOp.DeleteBuilderImpl(this);
}
+ private static final class SyncLedgerIterator implements LedgersIterator {
+
+ private final LedgerRangeIterator iterator;
+ private final ListLedgersResultImpl parent;
+ Iterator<Long> currentRange = null;
+
+ public SyncLedgerIterator(LedgerRangeIterator iterator, ListLedgersResultImpl parent) {
+ this.iterator = iterator;
+ this.parent = parent;
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ parent.checkClosed();
+ if (currentRange != null) {
+ if (currentRange.hasNext()) {
+ return true;
+ }
+ } else if (iterator.hasNext()) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public long next() throws IOException {
+ parent.checkClosed();
+ if (currentRange == null || !currentRange.hasNext()) {
+ currentRange = iterator.next().getLedgers().iterator();
+ }
+ return currentRange.next();
+ }
+ }
+
+ private static final class ListLedgersResultImpl implements ListLedgersResult {
+
+ private final LedgerRangeIterator iterator;
+ private boolean closed = false;
+ private LedgersIterator ledgersIterator;
+
+ public ListLedgersResultImpl(LedgerRangeIterator iterator) {
+ this.iterator = iterator;
+ }
+
+ void checkClosed() {
+ if (closed) {
+ throw new IllegalStateException("ListLedgersResult is closed");
+ }
+ }
+
+ private void initLedgersIterator() {
+ if (ledgersIterator != null) {
+ throw new IllegalStateException("LedgersIterator must be requested once");
+ }
+ ledgersIterator = new SyncLedgerIterator(iterator, this);
+ }
+
+ @Override
+ public LedgersIterator iterator() {
+ checkClosed();
+ initLedgersIterator();
+ return ledgersIterator;
+ }
+
+ @Override
+ public Iterable<Long> toIterable() {
+ checkClosed();
+ initLedgersIterator();
+
+ return () -> new Iterator<Long>() {
+ @Override
+ public boolean hasNext() {
+ try {
+ return ledgersIterator.hasNext();
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public Long next() {
+ try {
+ return ledgersIterator.next();
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ };
+ }
+
+ @Override
+ public void close() throws Exception {
+ closed = true;
+ }
+ }
+
+ @Override
+ public ListLedgersResultBuilder newListLedgersOp() {
+ return () -> {
+ final LedgerRangeIterator iterator = getLedgerManager().getLedgerRanges(0);
+ return CompletableFuture.completedFuture(new ListLedgersResultImpl(iterator));
+ };
+ }
+
+ @Override
+ public CompletableFuture<LedgerMetadata> getLedgerMetadata(long ledgerId) {
+ CompletableFuture<Versioned<LedgerMetadata>> versioned = getLedgerManager().readLedgerMetadata(ledgerId);
+ return versioned.thenApply(versionedLedgerMetadata -> {
+ return versionedLedgerMetadata.getValue();
+ });
+ }
+
private final ClientContext clientCtx = new ClientContext() {
@Override
public ClientInternalConf getConf() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 557d0c7..ac27c86 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -147,17 +147,17 @@ class LedgerCreateOp {
}
- this.metadata = metadataBuilder.build();
if (this.generateLedgerId) {
- generateLedgerIdAndCreateLedger();
+ generateLedgerIdAndCreateLedger(metadataBuilder);
} else {
+ this.metadata = metadataBuilder.withId(ledgerId).build();
// Create ledger with supplied ledgerId
bk.getLedgerManager().createLedgerMetadata(ledgerId, metadata)
- .whenComplete((written, exception) -> metadataCallback(written, exception));
+ .whenComplete((written, exception) -> metadataCallback(written, exception, metadataBuilder));
}
}
- void generateLedgerIdAndCreateLedger() {
+ void generateLedgerIdAndCreateLedger(LedgerMetadataBuilder metadataBuilder) {
// generate a ledgerId
final LedgerIdGenerator ledgerIdGenerator = bk.getLedgerIdGenerator();
ledgerIdGenerator.generateLedgerId(new GenericCallback<Long>() {
@@ -168,9 +168,10 @@ class LedgerCreateOp {
return;
}
LedgerCreateOp.this.ledgerId = ledgerId;
+ LedgerCreateOp.this.metadata = metadataBuilder.withId(ledgerId).build();
// create a ledger with metadata
bk.getLedgerManager().createLedgerMetadata(ledgerId, metadata)
- .whenComplete((written, exception) -> metadataCallback(written, exception));
+ .whenComplete((written, exception) -> metadataCallback(written, exception, metadataBuilder));
}
});
}
@@ -190,12 +191,13 @@ class LedgerCreateOp {
/**
* Callback when metadata store has responded.
*/
- private void metadataCallback(Versioned<LedgerMetadata> writtenMetadata, Throwable exception) {
+ private void metadataCallback(Versioned<LedgerMetadata> writtenMetadata,
+ Throwable exception, LedgerMetadataBuilder metadataBuilder) {
if (exception != null) {
if (this.generateLedgerId
&& (BKException.getExceptionCode(exception) == BKException.Code.LedgerExistException)) {
// retry to generate a new ledger id
- generateLedgerIdAndCreateLedger();
+ generateLedgerIdAndCreateLedger(metadataBuilder);
} else {
createComplete(BKException.getExceptionCode(exception), null);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
index a962fe7..e75d051 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
@@ -45,6 +45,7 @@ import org.apache.bookkeeper.net.BookieId;
@Unstable
@VisibleForTesting
public class LedgerMetadataBuilder {
+ private long ledgerId = -1L;
private int metadataFormatVersion = CURRENT_METADATA_FORMAT_VERSION;
private int ensembleSize = 3;
private int writeQuorumSize = 3;
@@ -72,6 +73,7 @@ public class LedgerMetadataBuilder {
public static LedgerMetadataBuilder from(LedgerMetadata other) {
LedgerMetadataBuilder builder = new LedgerMetadataBuilder();
+ builder.ledgerId = other.getLedgerId();
builder.metadataFormatVersion = other.getMetadataFormatVersion();
builder.ensembleSize = other.getEnsembleSize();
builder.writeQuorumSize = other.getWriteQuorumSize();
@@ -100,6 +102,11 @@ public class LedgerMetadataBuilder {
return builder;
}
+ public LedgerMetadataBuilder withId(long ledgerId) {
+ this.ledgerId = ledgerId;
+ return this;
+ }
+
public LedgerMetadataBuilder withMetadataFormatVersion(int version) {
this.metadataFormatVersion = version;
return this;
@@ -190,10 +197,11 @@ public class LedgerMetadataBuilder {
}
public LedgerMetadata build() {
+ checkArgument(ledgerId >= 0, "Ledger id must be set");
checkArgument(ensembleSize >= writeQuorumSize, "Write quorum must be less or equal to ensemble size");
checkArgument(writeQuorumSize >= ackQuorumSize, "Write quorum must be greater or equal to ack quorum");
- return new LedgerMetadataImpl(metadataFormatVersion,
+ return new LedgerMetadataImpl(ledgerId, metadataFormatVersion,
ensembleSize, writeQuorumSize, ackQuorumSize,
state, lastEntryId, length, ensembles,
digestType, password, ctime, storeCtime,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataImpl.java
index 81fb4b5..22f7c04 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataImpl.java
@@ -45,10 +45,14 @@ import org.slf4j.LoggerFactory;
*
* <p>It provides parsing and serialization methods of such metadata.
*/
-@EqualsAndHashCode
+@EqualsAndHashCode(exclude =
+ "ledgerId" // ledgerId is not serialized inside ZK node data
+)
class LedgerMetadataImpl implements LedgerMetadata {
static final Logger LOG = LoggerFactory.getLogger(LedgerMetadataImpl.class);
+ private final long ledgerId;
+
private final int metadataFormatVersion;
private final int ensembleSize;
private final int writeQuorumSize;
@@ -71,7 +75,8 @@ class LedgerMetadataImpl implements LedgerMetadata {
private long cToken;
- LedgerMetadataImpl(int metadataFormatVersion,
+ LedgerMetadataImpl(long ledgerId,
+ int metadataFormatVersion,
int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
@@ -97,6 +102,7 @@ class LedgerMetadataImpl implements LedgerMetadata {
|| (!digestType.isPresent() && !password.isPresent()),
"Either both password and digest type must be set, or neither");
+ this.ledgerId = ledgerId;
this.metadataFormatVersion = metadataFormatVersion;
this.ensembleSize = ensembleSize;
this.writeQuorumSize = writeQuorumSize;
@@ -136,6 +142,11 @@ class LedgerMetadataImpl implements LedgerMetadata {
}
@Override
+ public long getLedgerId() {
+ return ledgerId;
+ }
+
+ @Override
public NavigableMap<Long, ? extends List<BookieId>> getAllEnsembles() {
return ensembles;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeper.java
index 7c4b678..84d8672 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BookKeeper.java
@@ -20,6 +20,7 @@
*/
package org.apache.bookkeeper.client.api;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.impl.BookKeeperBuilderImpl;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
@@ -66,6 +67,21 @@ public interface BookKeeper extends AutoCloseable {
DeleteBuilder newDeleteLedgerOp();
/**
+ * List ledgers.
+ *
+ * @return a builder useful to list ledgers.
+ */
+ ListLedgersResultBuilder newListLedgersOp();
+
+ /**
+ * Get ledger metadata of a given ledger id.
+ *
+ * @param ledgerId id of the ledger.
+ * @return a <code>CompletableFuture</code> instance containing ledger metadata.
+ */
+ CompletableFuture<LedgerMetadata> getLedgerMetadata(long ledgerId);
+
+ /**
* Close the client and release every resource.
*
* @throws BKException
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerMetadata.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerMetadata.java
index 5c703a7..bc18c6e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerMetadata.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerMetadata.java
@@ -34,6 +34,12 @@ import org.apache.bookkeeper.net.BookieId;
@LimitedPrivate
@Unstable
public interface LedgerMetadata {
+ /**
+ * Returns the id of this ledger.
+ *
+ * @return the id of this ledger.
+ */
+ long getLedgerId();
/**
* Returns the ensemble size of this ledger.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgersIterator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgersIterator.java
new file mode 100644
index 0000000..98c99e1
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgersIterator.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2020 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client.api;
+
+import java.io.IOException;
+
+/**
+ * Iterator for ledgers.
+ */
+public interface LedgersIterator {
+
+ /**
+ * Return true if there is at least one ledger to visit.
+ *
+ * @return true if there is at least one ledger to visit.
+ * @throws IOException thrown when there is a problem accessing the ledger metadata store.
+ */
+ boolean hasNext() throws IOException;
+
+ /**
+ * Return next ledger id.
+ *
+ * @return next ledger id.
+ * @throws IOException thrown when there is a problem accessing the ledger metadata store.
+ */
+ long next() throws IOException;
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ListLedgersResult.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ListLedgersResult.java
new file mode 100644
index 0000000..e5d080b
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ListLedgersResult.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2020 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client.api;
+
+/**
+ * Utility container for listing ledgers.
+ */
+public interface ListLedgersResult extends AutoCloseable {
+
+ /**
+ * Creates a <code>LedgersIterator</code>.
+ * This method must be called once per <code>ListLedgersResult</code> instance.
+ * @return a <code>LedgersIterator</code> instance.
+ */
+ LedgersIterator iterator();
+
+ /**
+ * Creates a <code>Iterable</code>, which wraps a <code>LedgersIterator</code>.
+ * This method must be called once per <code>ListLedgersResult</code> instance.
+ * <br>
+ * Metadata store access exceptions (<code>IOException</code>) are wrapped within a RuntimeException.
+ * if you want to take care of these cases, it is better to use <code>LedgersIterator</code>.
+ * @return a <code>Iterable</code> instance, containing ledger ids.
+ */
+ Iterable<Long> toIterable();
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ListLedgersResultBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ListLedgersResultBuilder.java
new file mode 100644
index 0000000..788029a
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ListLedgersResultBuilder.java
@@ -0,0 +1,33 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.client.api;
+
+import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
+
+/**
+ * Builder-style interface to list exiting ledgers.
+ */
+@Public
+@Unstable
+public interface ListLedgersResultBuilder extends OpBuilder<ListLedgersResult> {
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 8ecb9dd..0e16d69 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -259,7 +259,7 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
final long cToken = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
final LedgerMetadata metadata;
if (inputMetadata.getMetadataFormatVersion() > LedgerMetadataSerDe.METADATA_FORMAT_VERSION_2) {
- metadata = LedgerMetadataBuilder.from(inputMetadata).withCToken(cToken).build();
+ metadata = LedgerMetadataBuilder.from(inputMetadata).withId(ledgerId).withCToken(cToken).build();
} else {
metadata = inputMetadata;
}
@@ -430,7 +430,7 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
return readLedgerMetadata(ledgerId, null);
}
- protected CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long ledgerId, Watcher watcher) {
+ protected CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(final long ledgerId, Watcher watcher) {
CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
zk.getData(getLedgerPath(ledgerId), watcher, new DataCallback() {
@Override
@@ -460,7 +460,7 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
try {
LongVersion version = new LongVersion(stat.getVersion());
- LedgerMetadata metadata = serDe.parseConfig(data, Optional.of(stat.getCtime()));
+ LedgerMetadata metadata = serDe.parseConfig(data, ledgerId, Optional.of(stat.getCtime()));
promise.complete(new Versioned<>(metadata, version));
} catch (Throwable t) {
LOG.error("Could not parse ledger metadata for ledger: {}", ledgerId, t);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java
index 1e46b7e..eb2b91c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java
@@ -340,6 +340,7 @@ public class LedgerMetadataSerDe {
* if the given byte[] cannot be parsed
*/
public LedgerMetadata parseConfig(byte[] bytes,
+ long ledgerId,
Optional<Long> metadataStoreCtime) throws IOException {
if (log.isDebugEnabled()) {
log.debug("Deserializing {}", Base64.getEncoder().encodeToString(bytes));
@@ -356,11 +357,11 @@ public class LedgerMetadataSerDe {
switch (metadataFormatVersion) {
case METADATA_FORMAT_VERSION_3:
- return parseVersion3Config(is, metadataStoreCtime);
+ return parseVersion3Config(ledgerId, is, metadataStoreCtime);
case METADATA_FORMAT_VERSION_2:
- return parseVersion2Config(is, metadataStoreCtime);
+ return parseVersion2Config(ledgerId, is, metadataStoreCtime);
case METADATA_FORMAT_VERSION_1:
- return parseVersion1Config(is);
+ return parseVersion1Config(ledgerId, is);
default:
throw new IOException(
String.format("Metadata version not compatible. Expected between %d and %d, but got %d",
@@ -370,9 +371,10 @@ public class LedgerMetadataSerDe {
}
}
- private static LedgerMetadata parseVersion3Config(InputStream is, Optional<Long> metadataStoreCtime)
+ private static LedgerMetadata parseVersion3Config(long ledgerId, InputStream is, Optional<Long> metadataStoreCtime)
throws IOException {
LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
+ .withId(ledgerId)
.withMetadataFormatVersion(METADATA_FORMAT_VERSION_3);
LedgerMetadataFormat.Builder formatBuilder = LedgerMetadataFormat.newBuilder();
formatBuilder.mergeDelimitedFrom(is);
@@ -386,9 +388,10 @@ public class LedgerMetadataSerDe {
return builder.build();
}
- private static LedgerMetadata parseVersion2Config(InputStream is, Optional<Long> metadataStoreCtime)
+ private static LedgerMetadata parseVersion2Config(long ledgerId, InputStream is, Optional<Long> metadataStoreCtime)
throws IOException {
LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
+ .withId(ledgerId)
.withMetadataFormatVersion(METADATA_FORMAT_VERSION_2);
LedgerMetadataFormat.Builder formatBuilder = LedgerMetadataFormat.newBuilder();
@@ -449,9 +452,11 @@ public class LedgerMetadataSerDe {
}
}
- private static LedgerMetadata parseVersion1Config(InputStream is) throws IOException {
+ private static LedgerMetadata parseVersion1Config(long ledgerId, InputStream is) throws IOException {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, UTF_8.name()))) {
- LedgerMetadataBuilder builder = LedgerMetadataBuilder.create().withMetadataFormatVersion(1);
+ LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
+ .withId(ledgerId)
+ .withMetadataFormatVersion(1);
int quorumSize = Integer.parseInt(reader.readLine());
int ensembleSize = Integer.parseInt(reader.readLine());
long length = Long.parseLong(reader.readLine());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
index 08a33f8..e15e0a5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
@@ -430,7 +430,7 @@ public class MSLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
}
@Override
- public CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long ledgerId) {
+ public CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(final long ledgerId) {
final String key = ledgerId2Key(ledgerId);
CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
MetastoreCallback<Versioned<Value>> msCallback = new MetastoreCallback<Versioned<Value>>() {
@@ -450,7 +450,7 @@ public class MSLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
}
try {
LedgerMetadata metadata = serDe.parseConfig(
- value.getValue().getField(META_FIELD), Optional.empty());
+ value.getValue().getField(META_FIELD), ledgerId, Optional.empty());
promise.complete(new Versioned<>(metadata, value.getVersion()));
} catch (IOException e) {
LOG.error("Could not parse ledger metadata for ledger " + ledgerId + " : ", e);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/client/LedgerMetaDataCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/client/LedgerMetaDataCommand.java
index e7cd20c..3c9a394 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/client/LedgerMetaDataCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/client/LedgerMetaDataCommand.java
@@ -121,7 +121,7 @@ public class LedgerMetaDataCommand extends BookieCommand<LedgerMetaDataCommand.L
} else if (!flag.restoreFromFile.equals(DEFAULT)) {
byte[] serialized = Files.readAllBytes(
FileSystems.getDefault().getPath(flag.restoreFromFile));
- LedgerMetadata md = serDe.parseConfig(serialized, Optional.empty());
+ LedgerMetadata md = serDe.parseConfig(serialized, flag.ledgerId, Optional.empty());
m.createLedgerMetadata(flag.ledgerId, md).join();
} else {
printLedgerMetadata(flag.ledgerId, m.readLedgerMetadata(flag.ledgerId).get().getValue(), true);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
index f6477d4..a824b4a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
@@ -612,7 +612,8 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
ensembleOfSegment2.add(bookie2);
LedgerMetadataBuilder builder = LedgerMetadataBuilder.create();
- builder.withEnsembleSize(3)
+ builder.withId(ledgerId)
+ .withEnsembleSize(3)
.withWriteQuorumSize(3)
.withAckQuorumSize(2)
.withDigestType(digestType.toApiDigestType())
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
index a6b873e..1d15891 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
@@ -68,7 +68,7 @@ public class ClientUtil {
public static Versioned<LedgerMetadata> setupLedger(LedgerManager ledgerManager, long ledgerId,
LedgerMetadataBuilder builder) throws Exception {
- LedgerMetadata md = builder.withPassword(PASSWD).withDigestType(DIGEST_TYPE).build();
+ LedgerMetadata md = builder.withPassword(PASSWD).withDigestType(DIGEST_TYPE).withId(ledgerId).build();
return ledgerManager.createLedgerMetadata(ledgerId, md).get();
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java
index 25a4fd8..4444dd5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.bookkeeper.client;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -47,11 +46,13 @@ public class LedgerMetadataTest {
new BookieSocketAddress("192.0.2.2", 1234).toBookieId(),
new BookieSocketAddress("192.0.2.3", 1234).toBookieId());
org.apache.bookkeeper.client.api.LedgerMetadata metadata = LedgerMetadataBuilder.create()
- .withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(1)
- .withDigestType(DigestType.CRC32.toApiDigestType()).withPassword(passwd)
- .newEnsembleEntry(0L, ensemble)
- .build();
+ .withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(1)
+ .withDigestType(DigestType.CRC32.toApiDigestType()).withPassword(passwd)
+ .newEnsembleEntry(0L, ensemble)
+ .withId(100L)
+ .build();
+ assertEquals(100L, metadata.getLedgerId());
assertEquals(3, metadata.getEnsembleSize());
assertEquals(2, metadata.getWriteQuorumSize());
assertEquals(1, metadata.getAckQuorumSize());
@@ -73,13 +74,14 @@ public class LedgerMetadataTest {
new BookieSocketAddress("192.0.2.3", 1234).toBookieId());
LedgerMetadata lm1 = LedgerMetadataBuilder.create()
- .withDigestType(DigestType.CRC32.toApiDigestType())
- .withPassword(passwd)
- .newEnsembleEntry(0L, ensemble)
- .build();
+ .withDigestType(DigestType.CRC32.toApiDigestType())
+ .withPassword(passwd)
+ .newEnsembleEntry(0L, ensemble)
+ .withId(100L)
+ .build();
assertTrue("toString should contain password value",
- lm1.toString().contains(Base64.getEncoder().encodeToString(passwd)));
+ lm1.toString().contains(Base64.getEncoder().encodeToString(passwd)));
assertTrue("toSafeString should not contain password value", lm1.toSafeString().contains("OMITTED"));
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
index 4e2ca42..af6d980 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecovery2Test.java
@@ -54,6 +54,7 @@ public class LedgerRecovery2Test {
private static Versioned<LedgerMetadata> setupLedger(ClientContext clientCtx, long ledgerId,
List<BookieId> bookies) throws Exception {
LedgerMetadata md = LedgerMetadataBuilder.create()
+ .withId(ledgerId)
.withPassword(PASSWD).withDigestType(DigestType.CRC32C)
.newEnsembleEntry(0, bookies).build();
return clientCtx.getLedgerManager().createLedgerMetadata(1L, md).get();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
index 1f43cb4..43aa18c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
@@ -70,14 +70,17 @@ public class MetadataUpdateLoopTest {
@Test
public void testBasicUpdate() throws Exception {
try (LedgerManager lm = new MockLedgerManager()) {
- LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(5)
+ long ledgerId = 1234L;
+ LedgerMetadata initMeta = LedgerMetadataBuilder.create()
+ .withId(ledgerId)
+ .withEnsembleSize(5)
.withDigestType(DigestType.CRC32C).withPassword(new byte[0])
.newEnsembleEntry(0L, Lists.newArrayList(BookieId.parse("0.0.0.0:3181"),
BookieId.parse("0.0.0.1:3181"),
BookieId.parse("0.0.0.2:3181"),
BookieId.parse("0.0.0.3:3181"),
BookieId.parse("0.0.0.4:3181"))).build();
- long ledgerId = 1234L;
+
Versioned<LedgerMetadata> writtenMetadata = lm.createLedgerMetadata(ledgerId, initMeta).get();
AtomicReference<Versioned<LedgerMetadata>> reference = new AtomicReference<>(writtenMetadata);
@@ -116,7 +119,7 @@ public class MetadataUpdateLoopTest {
BookieId b2 = BookieId.parse("0.0.0.2:3181");
BookieId b3 = BookieId.parse("0.0.0.3:3181");
- LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(2)
+ LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(2).withId(ledgerId)
.withDigestType(DigestType.CRC32C).withPassword(new byte[0])
.withWriteQuorumSize(2).newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
Versioned<LedgerMetadata> writtenMetadata =
@@ -183,7 +186,7 @@ public class MetadataUpdateLoopTest {
BookieId b1 = BookieId.parse("0.0.0.1:3181");
BookieId b2 = BookieId.parse("0.0.0.2:3181");
- LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(2)
+ LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(2).withId(ledgerId)
.withDigestType(DigestType.CRC32C).withPassword(new byte[0])
.withWriteQuorumSize(2).newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
Versioned<LedgerMetadata> writtenMetadata = lm.createLedgerMetadata(ledgerId, initMeta).get();
@@ -237,7 +240,7 @@ public class MetadataUpdateLoopTest {
BookieId b2 = BookieId.parse("0.0.0.2:3181");
BookieId b3 = BookieId.parse("0.0.0.3:3181");
- LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(2)
+ LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(2).withId(ledgerId)
.withDigestType(DigestType.CRC32C).withPassword(new byte[0])
.withWriteQuorumSize(2).newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
Versioned<LedgerMetadata> writtenMetadata = lm.createLedgerMetadata(ledgerId, initMeta).get();
@@ -303,7 +306,7 @@ public class MetadataUpdateLoopTest {
.mapToObj((i) -> address(String.format("0.0.0.%d:3181", i)))
.collect(Collectors.toList());
- LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(ensembleSize)
+ LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(ensembleSize).withId(ledgerId)
.withDigestType(DigestType.CRC32C).withPassword(new byte[0])
.newEnsembleEntry(0L, initialEnsemble).build();
Versioned<LedgerMetadata> writtenMetadata = lm.createLedgerMetadata(ledgerId, initMeta).get();
@@ -347,7 +350,7 @@ public class MetadataUpdateLoopTest {
BookieId b0 = new BookieSocketAddress("0.0.0.0:3181").toBookieId();
BookieId b1 = new BookieSocketAddress("0.0.0.1:3181").toBookieId();
- LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(1)
+ LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(1).withId(ledgerId)
.withDigestType(DigestType.CRC32C).withPassword(new byte[0])
.withWriteQuorumSize(1).withAckQuorumSize(1)
.newEnsembleEntry(0L, Lists.newArrayList(b0)).build();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
index 5792fcf..e2b897d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
@@ -274,7 +274,7 @@ public class MockLedgerHandle extends LedgerHandle {
new BookieSocketAddress("192.0.2.2", 1234).toBookieId(),
new BookieSocketAddress("192.0.2.3", 1234).toBookieId());
return LedgerMetadataBuilder.create()
- .withDigestType(digest.toApiDigestType())
+ .withId(124L).withDigestType(digest.toApiDigestType())
.withPassword(passwd)
.newEnsembleEntry(0L, ensemble)
.build();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
index b1f385a..2b1537e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
@@ -108,7 +108,7 @@ public class ReadLastConfirmedAndEntryOpTest {
ensemble.add(new BookieSocketAddress("127.0.0.1", 3181 + i).toBookieId());
}
this.ledgerMetadata = LedgerMetadataBuilder.create()
- .withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(2)
+ .withId(124L).withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(2)
.withPassword(new byte[0])
.withDigestType(DigestType.CRC32.toApiDigestType())
.newEnsembleEntry(0L, ensemble).build();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
index 25a3de8..90e4fd1 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
@@ -246,7 +246,7 @@ public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
new BookieSocketAddress("192.0.2.2", 1234).toBookieId(),
new BookieSocketAddress("192.0.2.3", 1234).toBookieId());
LedgerMetadata metadata = LedgerMetadataBuilder.create()
- .withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(3)
+ .withId(124L).withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(3)
.withPassword(TEST_PSSWD).withDigestType(TEST_DIGEST_TYPE.toApiDigestType())
.withClosedState().withLastEntryId(-1).withLength(0)
.newEnsembleEntry(0L, ensemble)
@@ -363,7 +363,7 @@ public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
ensemble.add(BookieId.parse("bookie6:3181"));
LedgerMetadataBuilder builder = LedgerMetadataBuilder.create();
- builder.withEnsembleSize(7).withWriteQuorumSize(3).withAckQuorumSize(2)
+ builder.withId(124L).withEnsembleSize(7).withWriteQuorumSize(3).withAckQuorumSize(2)
.withDigestType(TEST_DIGEST_TYPE.toApiDigestType()).withPassword(TEST_PSSWD)
.newEnsembleEntry(0, ensemble).withLastEntryId(lastEntryId).withLength(512).withClosedState();
LedgerMetadata met = builder.build();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceRead.java
index e51368f..4d2ac8b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceRead.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceRead.java
@@ -45,7 +45,7 @@ public class TestSequenceRead extends BookKeeperClusterTestCase {
long ledgerId = 12345L;
// introduce duplicated bookies in an ensemble.
LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
- .withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(3)
+ .withId(ledgerId).withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(3)
.newEnsembleEntry(0L, Lists.newArrayList(getBookie(0), getBookie(0), getBookie(0)));
ClientUtil.setupLedger(bkc.getLedgerManager(), ledgerId, builder);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
index 2120df1..18c7833 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
@@ -141,6 +141,7 @@ public class TestWatchEnsembleChange extends BookKeeperClusterTestCase {
@Override
public void operationComplete(int rc, final Long lid) {
LedgerMetadata metadata = LedgerMetadataBuilder.create()
+ .withId(lid)
.withDigestType(digestType.toApiDigestType()).withPassword(new byte[0])
.withEnsembleSize(4).withWriteQuorumSize(2)
.withAckQuorumSize(2)
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
index f0e07e2..ae3ed6a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/BookKeeperBuildersTest.java
@@ -406,6 +406,7 @@ public class BookKeeperBuildersTest extends MockBookKeeperTestCase {
int writeQuorumSize, int ackQuorumSize, byte[] password,
Map<String, byte[]> customMetadata) {
return LedgerMetadataBuilder.create()
+ .withId(12L)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/LedgerMetadataTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/LedgerMetadataTest.java
new file mode 100644
index 0000000..59a36a5
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/api/LedgerMetadataTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client.api;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import java.util.Iterator;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+
+/**
+ * Bookkeeper Client API ledger metadata and ledgers listing test.
+ */
+public class LedgerMetadataTest extends BookKeeperClusterTestCase {
+
+ public LedgerMetadataTest() {
+ super(3);
+ }
+
+ @Test
+ public void testGetLedgerMetadata()
+ throws Exception {
+
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ try (BookKeeper bkc = BookKeeper.newBuilder(conf).build();) {
+ long ledgerId;
+ try (WriteHandle l = bkc
+ .newCreateLedgerOp()
+ .withDigestType(DigestType.CRC32)
+ .withPassword("testPasswd".getBytes())
+ .execute()
+ .get();) {
+ ledgerId = l.getId();
+ }
+
+ LedgerMetadata metadata = FutureUtils.result(bkc.getLedgerMetadata(ledgerId));
+ assertEquals(ledgerId, metadata.getLedgerId());
+ assertEquals(3, metadata.getEnsembleSize());
+ assertEquals(2, metadata.getAckQuorumSize());
+ assertEquals(2, metadata.getWriteQuorumSize());
+ assertArrayEquals("testPasswd".getBytes(), metadata.getPassword());
+ }
+
+ }
+
+ @Test
+ public void testListLedgers()
+ throws Exception {
+ int numOfLedgers = 10;
+
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+
+ try (BookKeeper bkc = BookKeeper.newBuilder(conf).build();) {
+ long[] ledgerIds = new long[numOfLedgers];
+ for (int i = 0; i < numOfLedgers; i++) {
+
+ try (WriteHandle l = bkc
+ .newCreateLedgerOp()
+ .withDigestType(DigestType.CRC32)
+ .withPassword("testPasswd".getBytes())
+ .execute()
+ .get();) {
+ ledgerIds[i] = l.getId();
+ }
+ }
+
+ try (ListLedgersResult result = FutureUtils.result(bkc.newListLedgersOp().execute());) {
+ int count = 0;
+
+ for (long ledgerId : result.toIterable()) {
+ assertEquals(ledgerIds[count++], ledgerId);
+ }
+
+ assertEquals("Unexpected ledgers count", numOfLedgers, count);
+ try {
+ result.iterator();
+ fail("Should thrown error");
+ } catch (IllegalStateException e) {
+ // ok
+ }
+ try {
+ result.toIterable();
+ fail("Should thrown error");
+ } catch (IllegalStateException e) {
+ // ok
+ }
+ }
+
+ try (ListLedgersResult result = FutureUtils.result(bkc.newListLedgersOp().execute());) {
+ int count = 0;
+
+ for (LedgersIterator iterator = result.iterator(); iterator.hasNext();) {
+ long ledgerId = iterator.next();
+ assertEquals(ledgerIds[count++], ledgerId);
+
+ }
+ assertEquals("Unexpected ledgers count", numOfLedgers, count);
+ try {
+ result.iterator();
+ fail("Should thrown error");
+ } catch (IllegalStateException e) {
+ // ok
+ }
+ try {
+ result.toIterable();
+ fail("Should thrown error");
+ } catch (IllegalStateException e) {
+ // ok
+ }
+ }
+ }
+
+ // check closed
+ {
+ ListLedgersResult result = FutureUtils.result(bkc.newListLedgersOp().execute());
+ result.close();
+ try {
+ result.toIterable();
+ fail("Should thrown error");
+ } catch (IllegalStateException e) {
+ // ok
+ }
+
+ try {
+ result.iterator();
+ fail("Should thrown error");
+ } catch (IllegalStateException e) {
+ // ok
+ }
+ }
+
+ { // iterator
+ ListLedgersResult result = FutureUtils.result(bkc.newListLedgersOp().execute());
+ LedgersIterator it = result.iterator();
+ result.close();
+ try {
+ it.hasNext();
+ fail("Should thrown error");
+ } catch (IllegalStateException e) {
+ // ok
+ }
+
+ try {
+ it.next();
+ fail("Should thrown error");
+ } catch (IllegalStateException e) {
+ // ok
+ }
+ }
+
+ { // iterable
+ ListLedgersResult result = FutureUtils.result(bkc.newListLedgersOp().execute());
+ Iterator<Long> it = result.toIterable().iterator();
+ result.close();
+ try {
+ it.hasNext();
+ fail("Should thrown error");
+ } catch (IllegalStateException e) {
+ // ok
+ }
+
+ try {
+ it.next();
+ fail("Should thrown error");
+ } catch (IllegalStateException e) {
+ // ok
+ }
+ }
+ }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
index 2086a3f..f9de13d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
@@ -122,6 +122,7 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
new BookieSocketAddress("192.0.2.4", 3181).toBookieId(),
new BookieSocketAddress("192.0.2.5", 3181).toBookieId());
this.metadata = LedgerMetadataBuilder.create()
+ .withId(123L)
.withDigestType(DigestType.CRC32C).withPassword(new byte[0])
.withEnsembleSize(5)
.withWriteQuorumSize(3)
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index b467d86..cfb17ec 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -123,6 +123,7 @@ public class GcLedgersTest extends LedgerManagerTestCase {
}
LedgerMetadata md = LedgerMetadataBuilder.create()
+ .withId(ledgerId)
.withDigestType(DigestType.CRC32C)
.withPassword(new byte[0])
.withEnsembleSize(1).withWriteQuorumSize(1).withAckQuorumSize(1)
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
index f1e9161..7712ddd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
@@ -92,6 +92,7 @@ public class LedgerManagerIteratorTest extends LedgerManagerTestCase {
new BookieSocketAddress("192.0.2.2", 1234).toBookieId(),
new BookieSocketAddress("192.0.2.3", 1234).toBookieId());
LedgerMetadata meta = LedgerMetadataBuilder.create()
+ .withId(ledgerId)
.withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2)
.withPassword("passwd".getBytes())
.withDigestType(BookKeeper.DigestType.CRC32.toApiDigestType())
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
index 952837b..279fb71 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
@@ -84,7 +84,7 @@ public class MockLedgerManager implements LedgerManager {
if (pair == null) {
return null;
} else {
- return new Versioned<>(serDe.parseConfig(pair.getRight(), Optional.empty()), pair.getLeft());
+ return new Versioned<>(serDe.parseConfig(pair.getRight(), ledgerId, Optional.empty()), pair.getLeft());
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerMetadataSerDe.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerMetadataSerDe.java
index 07e4bb3..6f4d414 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerMetadataSerDe.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerMetadataSerDe.java
@@ -66,7 +66,7 @@ public class TestLedgerMetadataSerDe {
private static void testDecodeEncode(String encoded) throws Exception {
LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
- LedgerMetadata md = serDe.parseConfig(Base64.getDecoder().decode(encoded), Optional.empty());
+ LedgerMetadata md = serDe.parseConfig(Base64.getDecoder().decode(encoded), 59L, Optional.empty());
String reserialized = Base64.getEncoder().encodeToString(serDe.serialize(md));
Assert.assertEquals(encoded, reserialized);
@@ -96,7 +96,7 @@ public class TestLedgerMetadataSerDe {
public void testJunkSerDe() throws Exception {
LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
String junk = "";
- serDe.parseConfig(junk.getBytes(UTF_8), Optional.empty());
+ serDe.parseConfig(junk.getBytes(UTF_8), 59L, Optional.empty());
}
@Test(expected = IOException.class)
@@ -104,27 +104,27 @@ public class TestLedgerMetadataSerDe {
byte[] randomBytes = new byte[1000];
new Random().nextBytes(randomBytes);
LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
- serDe.parseConfig(randomBytes, Optional.empty());
+ serDe.parseConfig(randomBytes, 59L, Optional.empty());
}
@Test(expected = IOException.class)
public void testJunkVersionSerDe() throws Exception {
byte[] junkVersion = "BookieMetadataFormatVersion\tfoobar\nblahblah".getBytes(UTF_8);
LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
- serDe.parseConfig(junkVersion, Optional.empty());
+ serDe.parseConfig(junkVersion, 59L, Optional.empty());
}
@Test(expected = IOException.class)
public void testVeryLongVersionSerDe() throws Exception {
byte[] veryLongVersion = "BookieMetadataFormatVersion\t123456789123456789\nblahblah".getBytes(UTF_8);
LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
- serDe.parseConfig(veryLongVersion, Optional.empty());
+ serDe.parseConfig(veryLongVersion, 59L, Optional.empty());
}
@Test
public void testPeggedToV3SerDe() throws Exception {
LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
- LedgerMetadata metadata = LedgerMetadataBuilder.create()
+ LedgerMetadata metadata = LedgerMetadataBuilder.create().withId(13L)
.withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(1)
.withPassword("foobar".getBytes(UTF_8)).withDigestType(DigestType.CRC32C)
.newEnsembleEntry(0L, Lists.newArrayList(new BookieSocketAddress("192.0.2.1", 3181).toBookieId(),
@@ -133,14 +133,14 @@ public class TestLedgerMetadataSerDe {
.build();
byte[] encoded = serDe.serialize(metadata);
- LedgerMetadata decoded = serDe.parseConfig(encoded, Optional.empty());
+ LedgerMetadata decoded = serDe.parseConfig(encoded, 59L, Optional.empty());
Assert.assertEquals(LedgerMetadataSerDe.METADATA_FORMAT_VERSION_3, decoded.getMetadataFormatVersion());
}
@Test
public void testStoreSystemtimeAsLedgerCtimeEnabledWithNewerVersion()
throws Exception {
- LedgerMetadata lm = LedgerMetadataBuilder.create()
+ LedgerMetadata lm = LedgerMetadataBuilder.create().withId(13L)
.withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(1)
.withPassword("foobar".getBytes(UTF_8)).withDigestType(DigestType.CRC32C)
.newEnsembleEntry(0L, Lists.newArrayList(new BookieSocketAddress("192.0.2.1", 1234).toBookieId(),
@@ -151,18 +151,18 @@ public class TestLedgerMetadataSerDe {
.build();
LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
byte[] serialized = serDe.serialize(lm);
- LedgerMetadata deserialized = serDe.parseConfig(serialized, Optional.of(654321L));
+ LedgerMetadata deserialized = serDe.parseConfig(serialized, 59L, Optional.of(654321L));
Assert.assertEquals(deserialized.getCtime(), 123456L);
// give it another round
- LedgerMetadata deserialized2 = serDe.parseConfig(serDe.serialize(deserialized), Optional.of(98765L));
+ LedgerMetadata deserialized2 = serDe.parseConfig(serDe.serialize(deserialized), 59L, Optional.of(98765L));
Assert.assertEquals(deserialized2.getCtime(), 123456L);
}
@Test
public void testStoreSystemtimeAsLedgerCtimeDisabledWithNewerVersion()
throws Exception {
- LedgerMetadata lm = LedgerMetadataBuilder.create()
+ LedgerMetadata lm = LedgerMetadataBuilder.create().withId(13L)
.withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(1)
.withPassword("foobar".getBytes(UTF_8)).withDigestType(DigestType.CRC32C)
.newEnsembleEntry(0L, Lists.newArrayList(new BookieSocketAddress("192.0.2.1", 1234).toBookieId(),
@@ -171,11 +171,11 @@ public class TestLedgerMetadataSerDe {
.build();
LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
byte[] serialized = serDe.serialize(lm);
- LedgerMetadata deserialized = serDe.parseConfig(serialized, Optional.of(654321L));
+ LedgerMetadata deserialized = serDe.parseConfig(serialized, 59L, Optional.of(654321L));
Assert.assertEquals(654321L, deserialized.getCtime());
// give it another round
- LedgerMetadata deserialized2 = serDe.parseConfig(serDe.serialize(deserialized), Optional.of(98765L));
+ LedgerMetadata deserialized2 = serDe.parseConfig(serDe.serialize(deserialized), 59L, Optional.of(98765L));
Assert.assertEquals(98765L, deserialized2.getCtime());
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index fba2246..ea5b33f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -552,14 +552,15 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
ensemble.add(new BookieSocketAddress("11.11.11.11:1111").toBookieId());
ensemble.add(new BookieSocketAddress("88.88.88.88:8888").toBookieId());
+ long ledgerId = (Math.abs(rand.nextLong())) % 100000000;
+
LedgerMetadata metadata = LedgerMetadataBuilder.create()
+ .withId(ledgerId)
.withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(2)
.withPassword("passwd".getBytes())
.withDigestType(DigestType.CRC32.toApiDigestType())
.newEnsembleEntry(0L, ensemble).build();
- long ledgerId = (Math.abs(rand.nextLong())) % 100000000;
-
try (LedgerManager lm = driver.getLedgerManagerFactory().newLedgerManager()) {
lm.createLedgerMetadata(ledgerId, metadata).get(2000, TimeUnit.MILLISECONDS);
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
index 3c8d0c4..21ca25e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
@@ -123,6 +123,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
// closed ledger
LedgerMetadata initMeta = LedgerMetadataBuilder.create()
+ .withId(1L)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
@@ -139,6 +140,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
ensembleSize = 4;
// closed ledger with multiple segments
initMeta = LedgerMetadataBuilder.create()
+ .withId(2L)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
@@ -156,6 +158,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
Collections.shuffle(bookieAddresses);
// non-closed ledger
initMeta = LedgerMetadataBuilder.create()
+ .withId(3L)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
@@ -168,6 +171,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
Collections.shuffle(bookieAddresses);
// non-closed ledger with multiple segments
initMeta = LedgerMetadataBuilder.create()
+ .withId(4L)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
@@ -241,6 +245,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
* 3 racks, and the ensembleSize is 5.
*/
LedgerMetadata initMeta = LedgerMetadataBuilder.create()
+ .withId(1L)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
@@ -259,6 +264,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
* adhering to placement policy
*/
initMeta = LedgerMetadataBuilder.create()
+ .withId(2L)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
@@ -326,6 +332,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
long ledgerId1 = 1L;
LedgerMetadata initMeta = LedgerMetadataBuilder.create()
+ .withId(ledgerId1)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
@@ -349,6 +356,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
ensembleSize = 3;
long ledgerId2 = 21234561L;
initMeta = LedgerMetadataBuilder.create()
+ .withId(ledgerId2)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
@@ -370,6 +378,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
*/
long ledgerId3 = 31234561L;
initMeta = LedgerMetadataBuilder.create()
+ .withId(ledgerId3)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
@@ -455,6 +464,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
* counted as ledgers not adhering to placement policy.
*/
LedgerMetadata initMeta = LedgerMetadataBuilder.create()
+ .withId(1L)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
@@ -481,6 +491,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
* policy, it should be counted as single ledger.
*/
initMeta = LedgerMetadataBuilder.create()
+ .withId(2L)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
@@ -556,6 +567,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
* ensemble is spread across 3 zones and 2 UDs
*/
LedgerMetadata initMeta = LedgerMetadataBuilder.create()
+ .withId(1L)
.withEnsembleSize(6)
.withWriteQuorumSize(6)
.withAckQuorumSize(2)
@@ -574,6 +586,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
* this shouldn't be reported
*/
initMeta = LedgerMetadataBuilder.create()
+ .withId(2L)
.withEnsembleSize(6)
.withWriteQuorumSize(5)
.withAckQuorumSize(2)
@@ -588,6 +601,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
* this ledger is not adhering to placement policy.
*/
initMeta = LedgerMetadataBuilder.create()
+ .withId(3L)
.withEnsembleSize(6)
.withWriteQuorumSize(5)
.withAckQuorumSize(2)
@@ -612,6 +626,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
newEnsemble.add(bookieAddresses.get(3));
newEnsemble.add(bookieAddresses.get(4));
initMeta = LedgerMetadataBuilder.create()
+ .withId(4L)
.withEnsembleSize(4)
.withWriteQuorumSize(4)
.withAckQuorumSize(2)
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
index 8d54113..a17ecfa 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
@@ -196,7 +196,7 @@ public class AuditorReplicasCheckTest extends BookKeeperClusterTestCase {
int ackQuorumSize, Map<Long, List<BookieId>> segmentEnsembles, long lastEntryId, int length,
DigestType digestType, byte[] password) throws InterruptedException, ExecutionException {
LedgerMetadataBuilder ledgerMetadataBuilder = LedgerMetadataBuilder.create();
- ledgerMetadataBuilder.withEnsembleSize(ensembleSize).withWriteQuorumSize(writeQuorumSize)
+ ledgerMetadataBuilder.withId(ledgerId).withEnsembleSize(ensembleSize).withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize).withClosedState().withLastEntryId(lastEntryId).withLength(length)
.withDigestType(digestType).withPassword(password);
for (Map.Entry<Long, List<BookieId>> mapEntry : segmentEnsembles.entrySet()) {
@@ -210,7 +210,7 @@ public class AuditorReplicasCheckTest extends BookKeeperClusterTestCase {
int ackQuorumSize, Map<Long, List<BookieId>> segmentEnsembles, DigestType digestType,
byte[] password) throws InterruptedException, ExecutionException {
LedgerMetadataBuilder ledgerMetadataBuilder = LedgerMetadataBuilder.create();
- ledgerMetadataBuilder.withEnsembleSize(ensembleSize).withWriteQuorumSize(writeQuorumSize)
+ ledgerMetadataBuilder.withId(ledgerId).withEnsembleSize(ensembleSize).withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize).withDigestType(digestType).withPassword(password);
for (Map.Entry<Long, List<BookieId>> mapEntry : segmentEnsembles.entrySet()) {
ledgerMetadataBuilder.newEnsembleEntry(mapEntry.getKey(), mapEntry.getValue());
diff --git a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
index f91f2f7..676b6cc 100644
--- a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
+++ b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
@@ -41,7 +41,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
-import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LedgerMetadata;
@@ -65,18 +64,6 @@ import org.apache.zookeeper.AsyncCallback.VoidCallback;
class EtcdLedgerManager implements LedgerManager {
private final LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
- private final Function<ByteSequence, LedgerMetadata> ledgerMetadataFunction = bs -> {
- try {
- return serDe.parseConfig(
- bs.getBytes(),
- Optional.empty()
- );
- } catch (IOException ioe) {
- log.error("Could not parse ledger metadata : {}", bs.toStringUtf8(), ioe);
- throw new RuntimeException(
- "Could not parse ledger metadata : " + bs.toStringUtf8(), ioe);
- }
- };
private final String scope;
private final Client client;
@@ -234,7 +221,7 @@ class EtcdLedgerManager implements LedgerManager {
KeyValue kv = getResp.getKvs().get(0);
byte[] data = kv.getValue().getBytes();
try {
- LedgerMetadata metadata = serDe.parseConfig(data, Optional.empty());
+ LedgerMetadata metadata = serDe.parseConfig(data, ledgerId, Optional.empty());
promise.complete(new Versioned<>(metadata, new LongVersion(kv.getModRevision())));
} catch (IOException ioe) {
log.error("Could not parse ledger metadata for ledger : {}", ledgerId, ioe);
@@ -327,7 +314,19 @@ class EtcdLedgerManager implements LedgerManager {
ledgerId, (lid) -> new ValueStream<>(
client,
watchClient,
- ledgerMetadataFunction,
+ bs -> {
+ try {
+ return serDe.parseConfig(
+ bs.getBytes(),
+ lid,
+ Optional.empty()
+ );
+ } catch (IOException ioe) {
+ log.error("Could not parse ledger metadata : {}", bs.toStringUtf8(), ioe);
+ throw new RuntimeException(
+ "Could not parse ledger metadata : " + bs.toStringUtf8(), ioe);
+ }
+ },
ByteSequence.fromString(EtcdUtils.getLedgerKey(scope, ledgerId)))
);
LedgerMetadataConsumer lmConsumer = listenerToConsumer(ledgerId, listener,
diff --git a/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java b/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java
index c30ef27..f206817 100644
--- a/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java
+++ b/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java
@@ -100,7 +100,7 @@ public class EtcdLedgerManagerTest extends EtcdTestBase {
BookieId.parse("192.0.2.1:1234"),
BookieId.parse("192.0.2.2:1234"),
BookieId.parse("192.0.2.3:1234"));
- LedgerMetadata metadata = LedgerMetadataBuilder.create()
+ LedgerMetadata metadata = LedgerMetadataBuilder.create().withId(ledgerId)
.withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2)
.withPassword("test-password".getBytes(UTF_8))
.withDigestType(DigestType.CRC32C.toApiDigestType())
@@ -239,7 +239,7 @@ public class EtcdLedgerManagerTest extends EtcdTestBase {
private void createNumLedgers(int numLedgers) throws Exception {
List<CompletableFuture<Versioned<LedgerMetadata>>> createFutures = new ArrayList<>(numLedgers);
for (int i = 0; i < numLedgers; i++) {
- LedgerMetadata metadata = LedgerMetadataBuilder.create()
+ LedgerMetadata metadata = LedgerMetadataBuilder.create().withId(i)
.withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2)
.withPassword("test-password".getBytes(UTF_8))
.withDigestType(DigestType.CRC32C.toApiDigestType())
@@ -254,7 +254,7 @@ public class EtcdLedgerManagerTest extends EtcdTestBase {
long ledgerId = System.currentTimeMillis();
// create a ledger metadata
- LedgerMetadata metadata = LedgerMetadataBuilder.create()
+ LedgerMetadata metadata = LedgerMetadataBuilder.create().withId(ledgerId)
.withEnsembleSize(3).withWriteQuorumSize(3).withAckQuorumSize(2)
.withPassword("test-password".getBytes(UTF_8))
.withDigestType(DigestType.CRC32C.toApiDigestType())
diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/client/LedgerMetaDataCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/client/LedgerMetaDataCommandTest.java
index d4d89c1..d363668 100644
--- a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/client/LedgerMetaDataCommandTest.java
+++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/client/LedgerMetaDataCommandTest.java
@@ -100,7 +100,7 @@ public class LedgerMetaDataCommandTest extends BookieCommandTestBase {
serDe = mock(LedgerMetadataSerDe.class);
whenNew(LedgerMetadataSerDe.class).withNoArguments().thenReturn(serDe);
when(serDe.serialize(eq(ledgerMetadata))).thenReturn(new byte[0]);
- when(serDe.parseConfig(eq(new byte[0]), eq(Optional.empty()))).thenReturn(ledgerMetadata);
+ when(serDe.parseConfig(eq(new byte[0]), anyLong(), eq(Optional.empty()))).thenReturn(ledgerMetadata);
when(ledgerManager.createLedgerMetadata(anyLong(), eq(ledgerMetadata))).thenReturn(future);
}
@@ -120,7 +120,7 @@ public class LedgerMetaDataCommandTest extends BookieCommandTestBase {
LedgerMetaDataCommand cmd = new LedgerMetaDataCommand();
Assert.assertTrue(cmd.apply(bkFlags, new String[] { "-l", "1", "-r", file.getAbsolutePath() }));
- verify(serDe, times(1)).parseConfig(eq(new byte[0]), eq(Optional.empty()));
+ verify(serDe, times(1)).parseConfig(eq(new byte[0]), anyLong(), eq(Optional.empty()));
verify(ledgerManager, times(1)).createLedgerMetadata(anyLong(), any(LedgerMetadata.class));
}