You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/26 03:17:15 UTC

[pulsar] branch branch-2.10 updated (f53c9d5f7eb -> 917f997bef9)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from f53c9d5f7eb [fix][build] duplicate entry when merging services (#17659)
     new d17079a0134 [fix][storage]fix OpAddEntry release error when exception in ManagedLedgerInterceptor (#17394)
     new 917f997bef9 [fix][tiered-storage] Don't cleanup data when offload met Metastore exception (#17512)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 17 +++++-
 .../intercept/MangedLedgerInterceptorImplTest.java | 64 ++++++++++++++++++++++
 2 files changed, 78 insertions(+), 3 deletions(-)


[pulsar] 02/02: [fix][tiered-storage] Don't cleanup data when offload met Metastore exception (#17512)

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 917f997bef9fe78b722ec85b7cf7ae8643295121
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Wed Sep 14 10:41:13 2022 +0800

    [fix][tiered-storage] Don't cleanup data when offload met Metastore exception (#17512)
    
    * [fix][tiered-storage] Don't cleanup data when offload met BadVersion
    ---
    
    *Motivation*
    
    There have two ways that will cause the offload data cleanup. One is met
    offload conflict exception, and another is completeLedgerInfoForOffloaded
    reaches max retry time and throws zookeeper exceptions.
    
    We retry the zookeeper operation on connection loss exception. We should
    be careful about this exception, because we may loss data if the metadata
    update successfully.
    
    When a MetaStore exception happens, we can not make sure the metadata update is
    failed or not. Because we have a retry on the connection loss, it is
    possible to get a BadVersion or other exception after retrying.
    
    So we don't clean up the data if this happens.
    
    *Modification*
    
    - don't delete data if has meta store exception
    
    * log error when skip deleting
    
    * improve logs
    
    (cherry picked from commit c2588ba6a07c05b2c1ce9cc8a4cf33e5b4a2755d)
---
 .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 438539bfdd5..73b204f4749 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2996,8 +2996,21 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                                            scheduledExecutor, name)
                             .whenComplete((ignore2, exception) -> {
                                     if (exception != null) {
-                                        log.error("[{}] Failed to offload data for the ledgerId {}",
+                                        Throwable e = FutureUtil.unwrapCompletionException(exception);
+                                        if (e instanceof MetaStoreException) {
+                                            // When a MetaStore exception happens, we can not make sure the metadata
+                                            // update is failed or not. Because we have a retry on the connection loss,
+                                            // it is possible to get a BadVersion or other exception after retrying.
+                                            // So we don't clean up the data if it has metadata operation exception.
+                                            log.error("[{}] Failed to update offloaded metadata for the ledgerId {}, "
+                                                    + "the offloaded data will not be cleaned up",
                                                 name, ledgerId, exception);
+                                            return;
+                                        } else {
+                                            log.error("[{}] Failed to offload data for the ledgerId {}, "
+                                                    + "clean up the offloaded data",
+                                                name, ledgerId, exception);
+                                        }
                                         cleanupOffloaded(
                                             ledgerId, uuid,
                                             driverName, driverMetadata,


[pulsar] 01/02: [fix][storage]fix OpAddEntry release error when exception in ManagedLedgerInterceptor (#17394)

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d17079a0134acf4f3feaebd207ac50d2a75f79df
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Thu Sep 8 11:08:49 2022 +0800

    [fix][storage]fix OpAddEntry release error when exception in ManagedLedgerInterceptor (#17394)
    
    (cherry picked from commit c6a1875db2f7a9e5cab187dc98e90487cf08a805)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  2 -
 .../intercept/MangedLedgerInterceptorImplTest.java | 64 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 6d3a628a367..438539bfdd5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -34,7 +34,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
-import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
 import java.time.Clock;
 import java.util.ArrayList;
@@ -825,7 +824,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         } catch (Exception e) {
             addOperation.failed(
                     new ManagedLedgerInterceptException("Interceptor managed ledger before add to bookie failed."));
-            ReferenceCountUtil.release(addOperation.data);
             log.error("[{}] Failed to intercept adding an entry to bookie.", name, e);
             return false;
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
index 60067a7016c..830e1894832 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
@@ -20,13 +20,18 @@ package org.apache.pulsar.broker.intercept;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import java.util.concurrent.CountDownLatch;
 import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.OpAddEntry;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
@@ -256,6 +261,65 @@ public class MangedLedgerInterceptorImplTest  extends MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test
+    public void testBeforeAddEntryWithException() throws Exception {
+        final int MOCK_BATCH_SIZE = 2;
+        final String ledgerAndCursorName = "testBeforeAddEntryWithException";
+
+        ManagedLedgerInterceptor interceptor =
+                new MockManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null);
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(2);
+        config.setManagedLedgerInterceptor(interceptor);
+
+        ByteBuf buffer = Unpooled.wrappedBuffer("message".getBytes());
+        ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        try {
+            ledger.asyncAddEntry(buffer, MOCK_BATCH_SIZE, new AsyncCallbacks.AddEntryCallback() {
+                @Override
+                public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+                    countDownLatch.countDown();
+                }
+
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object ctx) {
+                    countDownLatch.countDown();
+                }
+            }, null);
+            countDownLatch.await();
+            assertEquals(buffer.refCnt(), 1);
+        } finally {
+            ledger.close();
+            factory.shutdown();
+        }
+    }
+
+    private class MockManagedLedgerInterceptorImpl extends ManagedLedgerInterceptorImpl {
+        private final Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;
+
+        public MockManagedLedgerInterceptorImpl(
+                Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors,
+                Set<ManagedLedgerPayloadProcessor> brokerEntryPayloadProcessors) {
+            super(brokerEntryMetadataInterceptors, brokerEntryPayloadProcessors);
+            this.brokerEntryMetadataInterceptors = brokerEntryMetadataInterceptors;
+        }
+
+        @Override
+        public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
+            if (op == null || numberOfMessages <= 0) {
+                return op;
+            }
+            op.setData(Commands.addBrokerEntryMetadata(op.getData(), brokerEntryMetadataInterceptors,
+                    numberOfMessages));
+            if (op != null) {
+                throw new RuntimeException("throw exception before add entry for test");
+            }
+            return op;
+        }
+    }
+
     public static Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() {
         Set<String> interceptorNames = new HashSet<>();
         interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");