You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/05 16:40:49 UTC

[incubator-pulsar] branch master updated: Fixed managed ledger missing callback issue when unloading a topic (#1171)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5310f74  Fixed managed ledger missing callback issue when unloading a topic (#1171)
5310f74 is described below

commit 5310f74887f62f271213eb7f5191682b05a209c3
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Feb 5 08:40:45 2018 -0800

    Fixed managed ledger missing callback issue when unloading a topic (#1171)
---
 .../bookkeeper/mledger/ManagedLedgerException.java |  8 +++-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 +++--
 .../mledger/impl/ManagedLedgerBkTest.java          | 43 ++++++++++++++++++++++
 .../broker/service/BrokerServiceException.java     |  6 +++
 .../org/apache/pulsar/broker/service/Producer.java |  9 ++++-
 .../broker/service/persistent/PersistentTopic.java | 15 +++++++-
 6 files changed, 83 insertions(+), 7 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index f5c4243..9ad8284 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -37,7 +37,7 @@ public class ManagedLedgerException extends Exception {
         }
         return new ManagedLedgerException(e);
     }
-    
+
     public static class MetaStoreException extends ManagedLedgerException {
         public MetaStoreException(Exception e) {
             super(e);
@@ -84,6 +84,12 @@ public class ManagedLedgerException extends Exception {
         }
     }
 
+    public static class ManagedLedgerAlreadyClosedException extends ManagedLedgerException {
+        public ManagedLedgerAlreadyClosedException(String msg) {
+            super(msg);
+        }
+    }
+
     public static class CursorAlreadyClosedException extends ManagedLedgerException {
         public CursorAlreadyClosedException(String msg) {
             super(msg);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 9bbea72..68a7645 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -20,8 +20,7 @@ package org.apache.bookkeeper.mledger.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.lang.Math.min;
-import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
-import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
+import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 
 import java.util.Iterator;
@@ -59,6 +58,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
+import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
@@ -92,7 +92,6 @@ import com.google.common.util.concurrent.RateLimiter;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
 
 public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     private final static long MegaByte = 1024 * 1024;
@@ -1203,6 +1202,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         final State state = STATE_UPDATER.get(this);
         if (state == State.ClosingLedger || state == State.LedgerOpened) {
             STATE_UPDATER.set(this, State.ClosedLedger);
+        } else if (state == State.Closed) {
+            // The managed ledger was closed during the write operation
+            clearPendingAddEntries(new ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
+            return;
         } else {
             // In case we get multiple write errors for different outstanding write request, we should close the ledger
             // just once
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
index 5c27ffb..a945759 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
@@ -30,10 +30,13 @@ import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.BookKeeperTestClient;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
+import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -461,4 +464,44 @@ public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
         factory2.shutdown();
         factory.shutdown();
     }
+
+    @Test(timeOut = 30000)
+    public void managedLedgerClosed() throws Exception {
+        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2);
+        ManagedLedgerImpl ledger1 = (ManagedLedgerImpl) factory.open("my_test_ledger", config);
+
+        int N = 100;
+
+        AtomicReference<ManagedLedgerException> res = new AtomicReference<>();
+        CountDownLatch latch = new CountDownLatch(N);
+
+        for (int i = 0; i < N; i++) {
+            ledger1.asyncAddEntry(("entry-" + i).getBytes(), new AddEntryCallback() {
+
+                @Override
+                public void addComplete(Position position, Object ctx) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object ctx) {
+                    res.compareAndSet(null, exception);
+                    latch.countDown();
+                }
+            }, null);
+
+            if (i == 1) {
+                ledger1.close();
+            }
+        }
+
+        // Ensures all the callback must have been invoked
+        latch.await();
+        assertNotNull(res.get());
+        assertEquals(res.get().getClass(), ManagedLedgerAlreadyClosedException.class);
+        factory.shutdown();
+    }
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 2a6a195..1127237 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -47,6 +47,12 @@ public class BrokerServiceException extends Exception {
         }
     }
 
+    public static class TopicClosedException extends BrokerServiceException {
+        public TopicClosedException(Throwable t) {
+            super(t);
+        }
+    }
+
     public static class PersistenceException extends BrokerServiceException {
         public PersistenceException(Throwable t) {
             super(t);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index e97f489..72b12d6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 import org.apache.bookkeeper.mledger.util.Rate;
+import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
 import org.apache.pulsar.broker.service.Topic.PublishContext;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
@@ -279,8 +280,12 @@ public class Producer {
                         ? ServerError.TopicTerminatedError : ServerError.PersistenceError;
 
                 producer.cnx.ctx().channel().eventLoop().execute(() -> {
-                    producer.cnx.ctx().writeAndFlush(Commands.newSendError(producer.producerId, sequenceId, serverError,
-                            exception.getMessage()));
+                    if (!(exception instanceof TopicClosedException)) {
+                        // For TopicClosed exception there's no need to send explicit error, since the client was
+                        // already notified
+                        producer.cnx.ctx().writeAndFlush(Commands.newSendError(producer.producerId, sequenceId,
+                                serverError, exception.getMessage()));
+                    }
                     producer.cnx.completedSendOperation(producer.isNonPersistentTopic);
                     producer.publishOperationCompleted();
                     recycle();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d38fa3b..0bc36ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -44,6 +44,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException;
 import org.apache.bookkeeper.mledger.Position;
@@ -59,6 +60,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceExcept
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
+import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
@@ -253,7 +255,18 @@ public class PersistentTopic implements Topic, AddEntryCallback {
     @Override
     public void addFailed(ManagedLedgerException exception, Object ctx) {
         PublishContext callback = (PublishContext) ctx;
-        log.error("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
+
+        if (exception instanceof ManagedLedgerAlreadyClosedException) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
+            }
+
+            callback.completed(new TopicClosedException(exception), -1, -1);
+            return;
+
+        } else {
+            log.warn("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
+        }
 
         if (exception instanceof ManagedLedgerTerminatedException) {
             // Signal the producer that this topic is no longer available

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.