You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/26 11:27:07 UTC

[pulsar] 01/02: Fix fake complete issue in offloading (#9306)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dc21282348c45228226722e5e458aab49ed5b072
Author: Renkai Ge <ga...@gmail.com>
AuthorDate: Tue Jan 26 17:23:26 2021 +0800

    Fix fake complete issue in offloading (#9306)
    
    ### Motivation
    In our current code, complete in offloading context may set true even sync metadata to Zookeeper failed, which may lead to more fatal error like data in Bookkeeper will be deleted but other managed ledger will see data not offloaded and try to read from Bookkeeper.
    
    ### Modification
    This PR make sure local ledger info will be updated after Zookeeper updated.
    * prevent ledgers info change without write to zk succeed
    * add unit test to prevent fake positive when offload failed
    
    (cherry picked from commit 3c22b473ddb124941d4bc9044ed6caaad97fab53)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 23 +++++++++-
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 49 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 1e6b898..10c54bb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2637,12 +2637,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 } else {
                     try {
                         LedgerInfo newInfo = transformation.transform(oldInfo);
-                        ledgers.put(ledgerId, newInfo);
-                        store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat,
+                        final HashMap<Long, LedgerInfo> newLedgers = new HashMap<>(ledgers);
+                        newLedgers.put(ledgerId, newInfo);
+                        store.asyncUpdateLedgerIds(name, buildManagedLedgerInfo(newLedgers), ledgersStat,
                                 new MetaStoreCallback<Void>() {
                                     @Override
                                     public void operationComplete(Void result, Stat stat) {
                                         ledgersStat = stat;
+                                        ledgers.put(ledgerId, newInfo);
                                         unlockingPromise.complete(null);
                                     }
 
@@ -3111,6 +3113,23 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         return mlInfo.build();
     }
 
+    private ManagedLedgerInfo buildManagedLedgerInfo(Map<Long, LedgerInfo> ledgers) {
+        ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values());
+        if (state == State.Terminated) {
+            mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId())
+                    .setEntryId(lastConfirmedEntry.getEntryId()));
+        }
+        if (managedLedgerInterceptor != null) {
+            managedLedgerInterceptor.onUpdateManagedLedgerInfo(propertiesMap);
+        }
+        for (Map.Entry<String, String> property : propertiesMap.entrySet()) {
+            mlInfo.addProperties(MLDataFormats.KeyValue.newBuilder()
+                    .setKey(property.getKey()).setValue(property.getValue()));
+        }
+
+        return mlInfo.build();
+    }
+
     /**
      * Throws an exception if the managed ledger has been previously fenced.
      *
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 807876e..81080db 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -47,11 +47,13 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.zookeeper.MockZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -979,8 +981,13 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
     }
 
     static class MockLedgerOffloader implements LedgerOffloader {
+        interface InjectAfterOffload {
+            void call();
+        }
+
         ConcurrentHashMap<Long, UUID> offloads = new ConcurrentHashMap<Long, UUID>();
         ConcurrentHashMap<Long, UUID> deletes = new ConcurrentHashMap<Long, UUID>();
+        InjectAfterOffload inject = null;
 
         Set<Long> offloadedLedgers() {
             return offloads.keySet();
@@ -1012,6 +1019,10 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
             } else {
                 promise.completeExceptionally(new Exception("Already exists exception"));
             }
+
+            if (inject != null) {
+                inject.call();
+            }
             return promise;
         }
 
@@ -1047,6 +1058,44 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
         }
     }
 
+    @Test
+    public void testFailByZk() throws Exception {
+        MockLedgerOffloader offloader = new MockLedgerOffloader();
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+        config.setRetentionTime(10, TimeUnit.MINUTES);
+        config.setRetentionSizeInMB(10);
+        config.setLedgerOffloader(offloader);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", config);
+
+        int i = 0;
+        for (; i < 25; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
+        assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+
+        offloader.inject = () -> {
+            try {
+                stopZooKeeper();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        };
+
+        try {
+            ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+        } catch (Exception e) {
+
+        }
+        final LedgerInfo ledgerInfo = ledger.getLedgersInfoAsList().get(0);
+        final MLDataFormats.OffloadContext offloadContext = ledgerInfo.getOffloadContext();
+        //should not set complete when
+        assertEquals(offloadContext.getComplete(), false);
+        zkc = MockZooKeeper.newInstance();
+    }
+
     static class ErroringMockLedgerOffloader extends MockLedgerOffloader {
         CompletableFuture<Set<Long>> errorLedgers = new CompletableFuture<>();