You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/05 16:40:49 UTC
[incubator-pulsar] branch master updated: Fixed managed ledger
missing callback issue when unloading a topic (#1171)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5310f74 Fixed managed ledger missing callback issue when unloading a topic (#1171)
5310f74 is described below
commit 5310f74887f62f271213eb7f5191682b05a209c3
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Feb 5 08:40:45 2018 -0800
Fixed managed ledger missing callback issue when unloading a topic (#1171)
---
.../bookkeeper/mledger/ManagedLedgerException.java | 8 +++-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 +++--
.../mledger/impl/ManagedLedgerBkTest.java | 43 ++++++++++++++++++++++
.../broker/service/BrokerServiceException.java | 6 +++
.../org/apache/pulsar/broker/service/Producer.java | 9 ++++-
.../broker/service/persistent/PersistentTopic.java | 15 +++++++-
6 files changed, 83 insertions(+), 7 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index f5c4243..9ad8284 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -37,7 +37,7 @@ public class ManagedLedgerException extends Exception {
}
return new ManagedLedgerException(e);
}
-
+
public static class MetaStoreException extends ManagedLedgerException {
public MetaStoreException(Exception e) {
super(e);
@@ -84,6 +84,12 @@ public class ManagedLedgerException extends Exception {
}
}
+ public static class ManagedLedgerAlreadyClosedException extends ManagedLedgerException {
+ public ManagedLedgerAlreadyClosedException(String msg) {
+ super(msg);
+ }
+ }
+
public static class CursorAlreadyClosedException extends ManagedLedgerException {
public CursorAlreadyClosedException(String msg) {
super(msg);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 9bbea72..68a7645 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -20,8 +20,7 @@ package org.apache.bookkeeper.mledger.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.min;
-import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
-import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import java.util.Iterator;
@@ -59,6 +58,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
+import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
@@ -92,7 +92,6 @@ import com.google.common.util.concurrent.RateLimiter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final static long MegaByte = 1024 * 1024;
@@ -1203,6 +1202,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
final State state = STATE_UPDATER.get(this);
if (state == State.ClosingLedger || state == State.LedgerOpened) {
STATE_UPDATER.set(this, State.ClosedLedger);
+ } else if (state == State.Closed) {
+ // The managed ledger was closed during the write operation
+ clearPendingAddEntries(new ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
+ return;
} else {
// In case we get multiple write errors for different outstanding write request, we should close the ledger
// just once
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
index 5c27ffb..a945759 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
@@ -30,10 +30,13 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.BookKeeperTestClient;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
+import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -461,4 +464,44 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
factory2.shutdown();
factory.shutdown();
}
+
+ @Test(timeOut = 30000)
+ public void managedLedgerClosed() throws Exception {
+ ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2);
+ ManagedLedgerImpl ledger1 = (ManagedLedgerImpl) factory.open("my_test_ledger", config);
+
+ int N = 100;
+
+ AtomicReference<ManagedLedgerException> res = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(N);
+
+ for (int i = 0; i < N; i++) {
+ ledger1.asyncAddEntry(("entry-" + i).getBytes(), new AddEntryCallback() {
+
+ @Override
+ public void addComplete(Position position, Object ctx) {
+ latch.countDown();
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object ctx) {
+ res.compareAndSet(null, exception);
+ latch.countDown();
+ }
+ }, null);
+
+ if (i == 1) {
+ ledger1.close();
+ }
+ }
+
+ // Ensures all the callback must have been invoked
+ latch.await();
+ assertNotNull(res.get());
+ assertEquals(res.get().getClass(), ManagedLedgerAlreadyClosedException.class);
+ factory.shutdown();
+ }
+
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 2a6a195..1127237 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -47,6 +47,12 @@ public class BrokerServiceException extends Exception {
}
}
+ public static class TopicClosedException extends BrokerServiceException {
+ public TopicClosedException(Throwable t) {
+ super(t);
+ }
+ }
+
public static class PersistenceException extends BrokerServiceException {
public PersistenceException(Throwable t) {
super(t);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index e97f489..72b12d6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.bookkeeper.mledger.util.Rate;
+import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.Topic.PublishContext;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
@@ -279,8 +280,12 @@ public class Producer {
? ServerError.TopicTerminatedError : ServerError.PersistenceError;
producer.cnx.ctx().channel().eventLoop().execute(() -> {
- producer.cnx.ctx().writeAndFlush(Commands.newSendError(producer.producerId, sequenceId, serverError,
- exception.getMessage()));
+ if (!(exception instanceof TopicClosedException)) {
+ // For TopicClosed exception there's no need to send explicit error, since the client was
+ // already notified
+ producer.cnx.ctx().writeAndFlush(Commands.newSendError(producer.producerId, sequenceId,
+ serverError, exception.getMessage()));
+ }
producer.cnx.completedSendOperation(producer.isNonPersistentTopic);
producer.publishOperationCompleted();
recycle();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d38fa3b..0bc36ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -44,6 +44,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException;
import org.apache.bookkeeper.mledger.Position;
@@ -59,6 +60,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceExcept
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
+import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
@@ -253,7 +255,18 @@ public class PersistentTopic implements Topic, AddEntryCallback {
@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
PublishContext callback = (PublishContext) ctx;
- log.error("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
+
+ if (exception instanceof ManagedLedgerAlreadyClosedException) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
+ }
+
+ callback.completed(new TopicClosedException(exception), -1, -1);
+ return;
+
+ } else {
+ log.warn("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
+ }
if (exception instanceof ManagedLedgerTerminatedException) {
// Signal the producer that this topic is no longer available
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.