You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/14 14:04:25 UTC
[pulsar] branch master updated: [pulsar-broker] Fix expiry monitor
to continue on non-recoverable error (#4818)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c5ba529 [pulsar-broker] Fix expiry monitor to continue on non-recoverable error (#4818)
c5ba529 is described below
commit c5ba52983fee994de61984aae7d1757e9b738caf
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed Aug 14 07:04:20 2019 -0700
[pulsar-broker] Fix expiry monitor to continue on non-recoverable error (#4818)
### Motivation
In #1046, we have added a flag (`autoSkipNonRecoverableData`) and mechanism to recover cursor if ledger data is deleted. However, expiery-monitor doesn't use that flag and it gets stuck when it finds non-recoverable ml-error while cleaning up expired message.
### Modification
Expiry-monitor can skip non-recoverable managed-ledger exception (eg: data/ledger doesn't exist anymore) when `autoSkipNonRecoverableData` flag is enabled.
---
.../apache/bookkeeper/mledger/AsyncCallbacks.java | 3 +-
.../apache/bookkeeper/mledger/ManagedCursor.java | 7 ++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 15 +++-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 4 +
.../bookkeeper/mledger/impl/OpFindNewest.java | 5 +-
.../mledger/impl/ManagedCursorContainerTest.java | 7 ++
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 4 +-
.../persistent/PersistentMessageExpiryMonitor.java | 14 +++-
.../persistent/PersistentMessageFinder.java | 7 +-
.../service/persistent/PersistentSubscription.java | 3 +-
.../service/PersistentMessageFinderTest.java | 89 +++++++++++++++++++---
11 files changed, 136 insertions(+), 22 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
index e861fad..8a21385 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger;
import com.google.common.annotations.Beta;
import java.util.List;
+import java.util.Optional;
/**
* Definition of all the callbacks used for the ManagedLedger asynchronous API.
@@ -116,7 +117,7 @@ public interface AsyncCallbacks {
interface FindEntryCallback {
void findEntryComplete(Position position, Object ctx);
- void findEntryFailed(ManagedLedgerException exception, Object ctx);
+ void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx);
}
interface ResetCursorCallback {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index fc3f6a4..03a68ba 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -587,4 +587,11 @@ public interface ManagedCursor {
*/
void setThrottleMarkDelete(double throttleMarkDelete);
+ /**
+ * Get {@link ManagedLedger} attached with cursor
+ *
+ * @return ManagedLedger
+ */
+ ManagedLedger getManagedLedger();
+
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index d185274..5292688 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -45,6 +45,7 @@ import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -71,6 +72,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
@@ -766,7 +768,8 @@ public class ManagedCursorImpl implements ManagedCursor {
}
@Override
- public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
+ public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition,
+ Object ctx) {
result.exception = exception;
counter.countDown();
}
@@ -796,11 +799,12 @@ public class ManagedCursorImpl implements ManagedCursor {
max = getNumberOfEntriesInStorage();
break;
default:
- callback.findEntryFailed(new ManagedLedgerException("Unknown position constraint"), ctx);
+ callback.findEntryFailed(new ManagedLedgerException("Unknown position constraint"), Optional.empty(), ctx);
return;
}
if (startPosition == null) {
- callback.findEntryFailed(new ManagedLedgerException("Couldn't find start position"), ctx);
+ callback.findEntryFailed(new ManagedLedgerException("Couldn't find start position"),
+ Optional.empty(), ctx);
return;
}
op = new OpFindNewest(this, startPosition, condition, max, callback, ctx);
@@ -2581,5 +2585,10 @@ public class ManagedCursorImpl implements ManagedCursor {
}
}
+ @Override
+ public ManagedLedger getManagedLedger() {
+ return this.ledger;
+ }
+
private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 749e560..a1a5a15 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -49,6 +49,7 @@ import java.util.Random;
import java.util.UUID;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -3023,6 +3024,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
public static ManagedLedgerException createManagedLedgerException(Throwable t) {
if (t instanceof org.apache.bookkeeper.client.api.BKException) {
return createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException) t).getCode());
+ } else if (t instanceof CompletionException
+ && !(t.getCause() instanceof CompletionException) /* check to avoid stackoverlflow */) {
+ return createManagedLedgerException(t.getCause());
} else {
return new ManagedLedgerException("Unknown exception");
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
index 57e8044..4bce569 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
@@ -21,6 +21,9 @@ package org.apache.bookkeeper.mledger.impl;
import com.google.common.base.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
+
+import java.util.Optional;
+
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
@@ -107,7 +110,7 @@ class OpFindNewest implements ReadEntryCallback {
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
- callback.findEntryFailed(exception, OpFindNewest.this.ctx);
+ callback.findEntryFailed(exception, Optional.ofNullable(searchPosition), OpFindNewest.this.ctx);
}
public void find() {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 60d3dc9..c415320 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -40,6 +40,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.testng.annotations.Test;
@@ -308,6 +309,12 @@ public class ManagedCursorContainerTest {
public double getThrottleMarkDelete() {
return -1;
}
+
+ @Override
+ public ManagedLedger getManagedLedger() {
+ return null;
+ }
+
}
@Test
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 4f2cce5..70db43c 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -39,6 +39,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -2077,7 +2078,8 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
}
@Override
- public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
+ public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition,
+ Object ctx) {
result.exception = exception;
counter.countDown();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 85d3785..deb2744 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -18,10 +18,12 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
+import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
@@ -37,6 +39,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
private final String subName;
private final String topicName;
private final Rate msgExpired;
+ private final boolean autoSkipNonRecoverableData;
private static final int FALSE = 0;
private static final int TRUE = 1;
@@ -50,6 +53,9 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
this.cursor = cursor;
this.subName = subscriptionName;
this.msgExpired = new Rate();
+ this.autoSkipNonRecoverableData = cursor.getManagedLedger() != null // check to avoid test failures
+ ? cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData()
+ : false;
}
public void expireMessages(int messageTTLInSeconds) {
@@ -124,10 +130,16 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
}
@Override
- public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
+ public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Finding expired entry operation failed", topicName, subName, exception);
}
+ if (autoSkipNonRecoverableData && failedReadPosition.isPresent()
+ && (exception instanceof NonRecoverableLedgerException)) {
+ log.warn("[{}][{}] read failed from ledger at position:{} : {}", topicName, subName, failedReadPosition,
+ exception.getMessage());
+ findEntryComplete(failedReadPosition.get(), ctx);
+ }
expirationCheckInProgress = FALSE;
updateRates();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
index a088200..9e75149 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import static com.google.common.base.Preconditions.checkArgument;
@@ -83,7 +84,7 @@ public class PersistentMessageFinder implements AsyncCallbacks.FindEntryCallback
}
callback.findEntryFailed(
new ManagedLedgerException.ConcurrentFindCursorPositionException("last find is still running"),
- null);
+ Optional.empty(), null);
}
}
@@ -106,7 +107,7 @@ public class PersistentMessageFinder implements AsyncCallbacks.FindEntryCallback
}
@Override
- public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
+ public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
checkArgument(ctx instanceof AsyncCallbacks.FindEntryCallback);
AsyncCallbacks.FindEntryCallback callback = (AsyncCallbacks.FindEntryCallback) ctx;
if (log.isDebugEnabled()) {
@@ -114,6 +115,6 @@ public class PersistentMessageFinder implements AsyncCallbacks.FindEntryCallback
timestamp, exception);
}
messageFindInProgress = FALSE;
- callback.findEntryFailed(exception, null);
+ callback.findEntryFailed(exception, failedReadPosition, null);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index d4adf45..34864a8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -593,7 +594,7 @@ public class PersistentSubscription implements Subscription {
}
@Override
- public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
+ public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
// todo - what can go wrong here that needs to be retried?
if (exception instanceof ConcurrentFindCursorPositionException) {
future.completeExceptionally(new SubscriptionBusyException(exception.getMessage()));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index ae30b29..086c087 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -29,6 +29,7 @@ import io.netty.buffer.UnpooledByteBufAllocator;
import java.lang.reflect.Field;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -41,7 +42,11 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -98,7 +103,8 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
}
@Override
- public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
+ public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition,
+ Object ctx) {
result.exception = exception;
future.completeExceptionally(exception);
}
@@ -167,20 +173,23 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
PersistentMessageFinder messageFinder = new PersistentMessageFinder("topicname", c1);
final AtomicBoolean ex = new AtomicBoolean(false);
- messageFinder.findEntryFailed(new ManagedLedgerException("failed"), new AsyncCallbacks.FindEntryCallback() {
- @Override
- public void findEntryComplete(Position position, Object ctx) {
- }
+ messageFinder.findEntryFailed(new ManagedLedgerException("failed"), Optional.empty(),
+ new AsyncCallbacks.FindEntryCallback() {
+ @Override
+ public void findEntryComplete(Position position, Object ctx) {
+ }
- @Override
- public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
- ex.set(true);
- }
- });
+ @Override
+ public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition,
+ Object ctx) {
+ ex.set(true);
+ }
+ });
assertTrue(ex.get());
PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1);
- monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), null);
+ monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"),
+ Optional.empty(), null);
Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress");
field.setAccessible(true);
assertEquals(0, field.get(monitor));
@@ -190,4 +199,62 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
ledger.close();
factory.shutdown();
}
+
+ /**
+ * It tests that message expiry doesn't get stuck if it can't read deleted ledger's entry.
+ *
+ * @throws Exception
+ */
+ @Test
+ void testMessageExpiryWithNonRecoverableException() throws Exception {
+
+ final String ledgerAndCursorName = "testPersistentMessageExpiryWithNonRecoverableLedgers";
+ final int entriesPerLedger = 2;
+ final int totalEntries = 10;
+ final int ttlSeconds = 1;
+
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setRetentionSizeInMB(10);
+ config.setMaxEntriesPerLedger(entriesPerLedger);
+ config.setRetentionTime(1, TimeUnit.HOURS);
+ config.setAutoSkipNonRecoverableData(true);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config);
+ ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
+
+ for (int i = 0; i < totalEntries; i++) {
+ ledger.addEntry(createMessageWrittenToLedger("msg" + i));
+ }
+
+ List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();
+ LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1);
+
+ assertEquals(ledgers.size(), totalEntries / entriesPerLedger);
+
+ // this will make sure that all entries should be deleted
+ Thread.sleep(ttlSeconds);
+
+ bkc.deleteLedger(ledgers.get(0).getLedgerId());
+ bkc.deleteLedger(ledgers.get(1).getLedgerId());
+ bkc.deleteLedger(ledgers.get(2).getLedgerId());
+
+ PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1);
+ Position previousMarkDelete = null;
+ for (int i = 0; i < totalEntries; i++) {
+ monitor.expireMessages(1);
+ Position previousPos = previousMarkDelete;
+ retryStrategically(
+ (test) -> c1.getMarkDeletedPosition() != null && !c1.getMarkDeletedPosition().equals(previousPos),
+ 5, 100);
+ previousMarkDelete = c1.getMarkDeletedPosition();
+ }
+
+ PositionImpl markDeletePosition = (PositionImpl) c1.getMarkDeletedPosition();
+ assertEquals(lastLedgerInfo.getLedgerId(), markDeletePosition.getLedgerId());
+ assertEquals(lastLedgerInfo.getEntries() - 1, markDeletePosition.getEntryId());
+
+ c1.close();
+ ledger.close();
+ factory.shutdown();
+
+ }
}