You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/10 15:01:44 UTC

[pulsar] branch branch-2.9 updated (43234c63bab -> 3411bf21176)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 43234c63bab Allow PULSAR_MEM & PULSAR_GC to be Overridden in pulsar_tool_env.sh (#15868)
     new 55e4a1d66a7 [cleanup] [broker] Remove useless code to avoid confusion in OpReadEntry#checkReadCompletion. (#15104)
     new 960b80f59e8 [fix][broker] Fix MultiRolesTokenAuthorizationProvider `authorize` issue. (#15454)
     new b2de04362b3 [feat] [tiered-storage] Add pure S3 provider for the offloader (#15710)
     new c1e66066820 fix bug in getNumberOfEntriesInStorage (#15627)
     new 89e005f939f Fix NPE in MessageDeduplication. (#15820)
     new d6565a5d3da Fix avro conversion order of registration (#15863)
     new 4a471d06ce1 [Revert] [#15483] Remove sensitive msg from consumer/producer stats log (#15817)
     new dac1c6f39d1 [fix][broker]Fast return if ack cumulative illegal (#15695)
     new faac51937d2 [fix][txn]Fix transasction ack batch message (#15875)
     new 43ab20b735f [fix][auth] Generate correct well-known OpenID configuration URL (#15928)
     new 53615e05cff [Fix][broker] Fix NPE when ledger id not found in `OpReadEntry` (#15837)
     new 3411bf21176 [fix][txn] fix NPE of TransactionMetaStoreHandler (#15840)

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   9 +-
 .../bookkeeper/mledger/impl/OpReadEntry.java       |  10 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  57 ++++++
 .../MultiRolesTokenAuthorizationProvider.java      |  37 +---
 .../org/apache/pulsar/broker/service/Consumer.java |   9 +-
 .../service/persistent/MessageDeduplication.java   |   2 +-
 .../broker/service/MessageCumulativeAckTest.java   | 194 +++++++++++++++++++++
 .../service/persistent/MessageDuplicationTest.java |   7 +
 .../pendingack/PendingAckPersistentTest.java       |  71 ++++++++
 pulsar-client-cpp/lib/auth/AuthOauth2.cc           |   9 +-
 pulsar-client-cpp/lib/auth/AuthOauth2.h            |   1 +
 pulsar-client-cpp/tests/AuthPluginTest.cc          |  20 +++
 .../client/impl/TransactionMetaStoreHandler.java   |   8 +-
 .../client/impl/conf/ClientConfigurationData.java  |   7 -
 .../pulsar/client/impl/schema/AvroSchema.java      |   3 +-
 .../impl/conf/ClientConfigurationDataTest.java     |   1 -
 .../pulsar/client/impl/schema/AvroSchemaTest.java  |  21 +++
 .../org/apache/pulsar/common/util/FutureUtil.java  |  47 ++++-
 .../apache/pulsar/common/util/FutureUtilTest.java  |  45 +++++
 .../jcloud/provider/JCloudBlobStoreProvider.java   |  54 ++++--
 .../provider/TieredStorageConfiguration.java       |  13 ++
 .../provider/JCloudBlobStoreProviderTests.java     |  31 +++-
 .../provider/TieredStorageConfigurationTests.java  |  17 ++
 24 files changed, 597 insertions(+), 78 deletions(-)
 create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java


[pulsar] 02/12: [fix][broker] Fix MultiRolesTokenAuthorizationProvider `authorize` issue. (#15454)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 960b80f59e8b8700e6a5d30aca7ceb784d267ced
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon May 9 09:11:19 2022 +0800

    [fix][broker] Fix MultiRolesTokenAuthorizationProvider `authorize` issue. (#15454)
    
    (cherry picked from commit 19f61d53b88bb195fabb367be722694902c79d22)
---
 .../MultiRolesTokenAuthorizationProvider.java      | 37 ++++-------------
 .../org/apache/pulsar/common/util/FutureUtil.java  | 47 +++++++++++++++++++++-
 .../apache/pulsar/common/util/FutureUtilTest.java  | 45 +++++++++++++++++++++
 3 files changed, 98 insertions(+), 31 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
index c508ccbd5b4..b8f46a52483 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
@@ -23,10 +23,15 @@ import io.jsonwebtoken.Jwt;
 import io.jsonwebtoken.JwtParser;
 import io.jsonwebtoken.Jwts;
 import io.jsonwebtoken.RequiredTypeException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
-import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -39,14 +44,6 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Function;
-
 public class MultiRolesTokenAuthorizationProvider extends PulsarAuthorizationProvider {
     private static final Logger log = LoggerFactory.getLogger(MultiRolesTokenAuthorizationProvider.class);
 
@@ -137,27 +134,7 @@ public class MultiRolesTokenAuthorizationProvider extends PulsarAuthorizationPro
         }
         List<CompletableFuture<Boolean>> futures = new ArrayList<>(roles.size());
         roles.forEach(r -> futures.add(authorizeFunc.apply(r)));
-        return CompletableFuture.supplyAsync(() -> {
-            do {
-                try {
-                    List<CompletableFuture<Boolean>> doneFutures = new ArrayList<>();
-                    FutureUtil.waitForAny(futures).get();
-                    for (CompletableFuture<Boolean> future : futures) {
-                        if (!future.isDone()) continue;
-                        doneFutures.add(future);
-                        if (future.get()) {
-                            futures.forEach(f -> {
-                                if (!f.isDone()) f.cancel(false);
-                            });
-                            return true;
-                        }
-                    }
-                    futures.removeAll(doneFutures);
-                } catch (InterruptedException | ExecutionException ignored) {
-                }
-            } while (!futures.isEmpty());
-            return false;
-        });
+        return FutureUtil.waitForAny(futures, ret -> (boolean) ret).thenApply(v -> v.isPresent());
     }
 
     /**
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 2cdd9fce995..dac204db98e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.common.util;
 
 import java.time.Duration;
+import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -28,7 +29,9 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 /**
  * This class is aimed at simplifying work with {@code CompletableFuture}.
@@ -51,10 +54,52 @@ public class FutureUtil {
      * @param futures futures to wait any
      * @return a new CompletableFuture that is completed when any of the given CompletableFutures complete
      */
-    public static CompletableFuture<Object> waitForAny(List<? extends CompletableFuture<?>> futures) {
+    public static CompletableFuture<Object> waitForAny(Collection<? extends CompletableFuture<?>> futures) {
         return CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]));
     }
 
+    /**
+     * Return a future that represents the completion of any future that match the predicate in the provided Collection.
+     *
+     * @param futures futures to wait any
+     * @param tester if any future match the predicate
+     * @return a new CompletableFuture that is completed when any of the given CompletableFutures match the tester
+     */
+    public static CompletableFuture<Optional<Object>> waitForAny(Collection<? extends CompletableFuture<?>> futures,
+                                                                 Predicate<Object> tester) {
+        return waitForAny(futures).thenCompose(v -> {
+            if (tester.test(v)) {
+                futures.forEach(f -> {
+                    if (!f.isDone()) {
+                        f.cancel(true);
+                    }
+                });
+                return CompletableFuture.completedFuture(Optional.of(v));
+            }
+            Collection<CompletableFuture<?>> doneFutures = futures.stream()
+                    .filter(f -> f.isDone())
+                    .collect(Collectors.toList());
+            futures.removeAll(doneFutures);
+            Optional<?> value = doneFutures.stream()
+                    .filter(f -> !f.isCompletedExceptionally())
+                    .map(CompletableFuture::join)
+                    .filter(tester)
+                    .findFirst();
+            if (!value.isPresent()) {
+                if (futures.size() == 0) {
+                    return CompletableFuture.completedFuture(Optional.empty());
+                }
+                return waitForAny(futures, tester);
+            }
+            futures.forEach(f -> {
+                if (!f.isDone()) {
+                    f.cancel(true);
+                }
+            });
+            return CompletableFuture.completedFuture(Optional.of(value.get()));
+        });
+    }
+
 
     /**
      * Return a future that represents the completion of the futures in the provided list.
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
index b9458bf8e1e..0de40767656 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
@@ -25,13 +25,18 @@ import static org.testng.Assert.fail;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.time.Duration;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 import lombok.Cleanup;
+import org.assertj.core.util.Lists;
 import org.testng.annotations.Test;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
 public class FutureUtilTest {
 
@@ -91,4 +96,44 @@ public class FutureUtilTest {
             assertEquals(executionException.getCause(), e);
         }
     }
+
+    @Test
+    public void testWaitForAny() {
+        CompletableFuture<String> f1 = new CompletableFuture<>();
+        CompletableFuture<String> f2 = new CompletableFuture<>();
+        CompletableFuture<String> f3 = new CompletableFuture<>();
+        CompletableFuture<String> f4 = new CompletableFuture<>();
+        f1.complete("1");
+        f2.complete("2");
+        f3.complete("3");
+        f4.complete("4");
+        CompletableFuture<Optional<Object>> ret = FutureUtil.waitForAny(Lists.newArrayList(f1, f2, f3, f4), p -> p.equals("3"));
+        assertEquals(ret.join().get(), "3");
+        // test not matched predicate result
+        CompletableFuture<String> f5 = new CompletableFuture<>();
+        CompletableFuture<String> f6 = new CompletableFuture<>();
+        f5.complete("5");
+        f6.complete("6");
+        ret = FutureUtil.waitForAny(Lists.newArrayList(f5, f6), p -> p.equals("3"));
+        assertFalse(ret.join().isPresent());
+        // test one complete, others are cancelled.
+        CompletableFuture<String> f55 = new CompletableFuture<>();
+        CompletableFuture<String> f66 = new CompletableFuture<>();
+        f55.complete("55");
+        ret = FutureUtil.waitForAny(Lists.newArrayList(f55, f66), p -> p.equals("55"));
+        assertTrue(ret.join().isPresent());
+        assertTrue(f66.isCancelled());
+        // test with exception
+        CompletableFuture<String> f7 = new CompletableFuture<>();
+        CompletableFuture<String> f8 = new CompletableFuture<>();
+        f8.completeExceptionally(new RuntimeException("f7 exception"));
+        f8.completeExceptionally(new RuntimeException("f8 exception"));
+        ret = FutureUtil.waitForAny(Lists.newArrayList(f7, f8), p -> p.equals("3"));
+        try {
+            ret.join();
+            fail("Should have failed");
+        } catch (CompletionException ex) {
+            assertTrue(ex.getCause() instanceof RuntimeException);
+        }
+    }
 }
\ No newline at end of file


[pulsar] 11/12: [Fix][broker] Fix NPE when ledger id not found in `OpReadEntry` (#15837)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 53615e05cff4010ef0834f15f057b78c1eb6fc77
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Tue Jun 7 21:52:05 2022 +0800

    [Fix][broker] Fix NPE when ledger id not found in `OpReadEntry` (#15837)
    
    (cherry picked from commit 7a3ad611f51511afca4bcaa1de299517a1907e8e)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 ++----
 .../bookkeeper/mledger/impl/OpReadEntry.java       |  4 +--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 32 ++++++++++++++++++++--
 .../broker/service/MessageCumulativeAckTest.java   | 15 ++++------
 4 files changed, 39 insertions(+), 21 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index d03d98a3f06..c79b8a9f8f7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2153,14 +2153,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
     }
 
-    PositionImpl startReadOperationOnLedger(PositionImpl position, OpReadEntry opReadEntry) {
+    PositionImpl startReadOperationOnLedger(PositionImpl position) {
         Long ledgerId = ledgers.ceilingKey(position.getLedgerId());
-        if (null == ledgerId) {
-            opReadEntry.readEntriesFailed(new ManagedLedgerException.NoMoreEntriesToReadException("The ceilingKey(K key) method is used to return the " +
-                    "least key greater than or equal to the given key, or null if there is no such key"), null);
-        }
-
-        if (ledgerId != position.getLedgerId()) {
+        if (ledgerId != null && ledgerId != position.getLedgerId()) {
             // The ledger pointed by this position does not exist anymore. It was deleted because it was empty. We need
             // to skip on the next available ledger
             position = new PositionImpl(ledgerId, 0);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index d7eb0467f56..27e99169e31 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -48,7 +48,7 @@ class OpReadEntry implements ReadEntriesCallback {
     public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPositionRef, int count,
             ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition) {
         OpReadEntry op = RECYCLER.get();
-        op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op);
+        op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef);
         op.cursor = cursor;
         op.count = count;
         op.callback = callback;
@@ -140,7 +140,7 @@ class OpReadEntry implements ReadEntriesCallback {
 
             // We still have more entries to read from the next ledger, schedule a new async operation
             cursor.ledger.getExecutor().execute(safeRun(() -> {
-                readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
+                readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition);
                 cursor.ledger.asyncReadEntries(OpReadEntry.this);
             }));
         } else {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 8826f0d99fc..317fb7e2b30 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -52,6 +52,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
@@ -408,6 +409,33 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test
+    public void testStartReadOperationOnLedgerWithEmptyLedgers() throws ManagedLedgerException, InterruptedException {
+        ManagedLedger ledger = factory.open("my_test_ledger_1");
+        ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
+        NavigableMap<Long, LedgerInfo> ledgers = ledgerImpl.getLedgersInfo();
+        LedgerInfo ledgerInfo = ledgers.firstEntry().getValue();
+        ledgers.clear();
+        ManagedCursor c1 = ledger.openCursor("c1");
+        PositionImpl position = new PositionImpl(ledgerInfo.getLedgerId(), 0);
+        PositionImpl maxPosition = new PositionImpl(ledgerInfo.getLedgerId(), 99);
+        OpReadEntry opReadEntry = OpReadEntry.create((ManagedCursorImpl) c1, position, 20,
+                new ReadEntriesCallback() {
+
+                    @Override
+                    public void readEntriesComplete(List<Entry> entries, Object ctx) {
+
+                    }
+
+                    @Override
+                    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+
+                    }
+                }, null, maxPosition);
+        Assert.assertEquals(opReadEntry.readPosition, position);
+    }
+
+
     @Test(timeOut = 20000)
     public void spanningMultipleLedgersWithSize() throws Exception {
         ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1000000);
@@ -2262,8 +2290,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
         managedLedger.rollCurrentLedgerIfFull();
         Awaitility.await().untilAsserted(() -> {
-            assertEquals(managedLedger.getLedgersInfo().size(), 2);
-            assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger);
+            assertEquals(managedLedger.getLedgersInfo().size(), 3);
+            assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.LedgerOpened);
         });
         assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries());
         assertEquals(5, managedLedger.getLedgersInfoAsList().get(1).getEntries());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
index 86754efc0c2..d45054fab79 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -25,7 +25,6 @@ import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Exclus
 import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Failover;
 import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Key_Shared;
 import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Shared;
-import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -39,7 +38,6 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import java.net.InetSocketAddress;
-import java.util.Optional;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
@@ -79,7 +77,7 @@ public class MessageCumulativeAckTest {
         executor = OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-cumulative-ack-test").build();
         ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
         svcConfig.setBrokerShutdownTimeoutMs(0L);
-        svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+        svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(1.0d);
         svcConfig.setClusterName("pulsar-cluster");
         pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig);
         doReturn(svcConfig).when(pulsar).getConfiguration();
@@ -89,7 +87,7 @@ public class MessageCumulativeAckTest {
         doReturn(TransactionTestBase.createMockBookKeeper(executor))
             .when(pulsar).getBookKeeperClient();
 
-        store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build());
+        store = MetadataStoreFactory.create("memory://local", MetadataStoreConfig.builder().build());
         doReturn(store).when(pulsar).getLocalMetadataStore();
         doReturn(store).when(pulsar).getConfigurationMetadataStore();
 
@@ -154,8 +152,7 @@ public class MessageCumulativeAckTest {
     @Test(timeOut = 5000, dataProvider = "individualAckModes")
     public void testAckWithIndividualAckMode(CommandSubscribe.SubType subType) throws Exception {
         Consumer consumer = new Consumer(sub, subType, "topic-1", consumerId, 0,
-            "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
-            MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+            "Cons1", 50000, serverCnx, "myrole-1", emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null, MessageId.latest);
 
         CommandAck commandAck = new CommandAck();
         commandAck.setAckType(Cumulative);
@@ -169,8 +166,7 @@ public class MessageCumulativeAckTest {
     @Test(timeOut = 5000, dataProvider = "notIndividualAckModes")
     public void testAckWithNotIndividualAckMode(CommandSubscribe.SubType subType) throws Exception {
         Consumer consumer = new Consumer(sub, subType, "topic-1", consumerId, 0,
-            "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
-            MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+                "Cons1", 50000, serverCnx, "myrole-1", emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null, MessageId.latest);
 
         CommandAck commandAck = new CommandAck();
         commandAck.setAckType(Cumulative);
@@ -184,8 +180,7 @@ public class MessageCumulativeAckTest {
     @Test(timeOut = 5000)
     public void testAckWithMoreThanNoneMessageIds() throws Exception {
         Consumer consumer = new Consumer(sub, Failover, "topic-1", consumerId, 0,
-            "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
-            MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+                "Cons1", 50000, serverCnx, "myrole-1", emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null, MessageId.latest);
 
         CommandAck commandAck = new CommandAck();
         commandAck.setAckType(Cumulative);


[pulsar] 08/12: [fix][broker]Fast return if ack cumulative illegal (#15695)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dac1c6f39d17b4f64c5dcc8ccd342fc88ecbc2af
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Mon Jun 6 10:59:55 2022 +0800

    [fix][broker]Fast return if ack cumulative illegal (#15695)
    
    (cherry picked from commit 02dca31a78523a7d061ac1d6702ff6600a7f4ec4)
---
 .../org/apache/pulsar/broker/service/Consumer.java |   2 +
 .../broker/service/MessageCumulativeAckTest.java   | 199 +++++++++++++++++++++
 2 files changed, 201 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index f7ed211fb2c..031574975d1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -358,11 +358,13 @@ public class Consumer {
         if (ack.getAckType() == AckType.Cumulative) {
             if (ack.getMessageIdsCount() != 1) {
                 log.warn("[{}] [{}] Received multi-message ack", subscription, consumerId);
+                return CompletableFuture.completedFuture(null);
             }
 
             if (Subscription.isIndividualAckMode(subType)) {
                 log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring",
                         subscription, consumerId);
+                return CompletableFuture.completedFuture(null);
             }
             PositionImpl position = PositionImpl.earliest;
             if (ack.getMessageIdsCount() == 1) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
new file mode 100644
index 00000000000..86754efc0c2
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.apache.pulsar.common.api.proto.CommandAck.AckType.Cumulative;
+import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Exclusive;
+import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Failover;
+import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Key_Shared;
+import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Shared;
+import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import java.net.InetSocketAddress;
+import java.util.Optional;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.proto.CommandAck;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.api.proto.ProtocolVersion;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class MessageCumulativeAckTest {
+    private final int consumerId = 1;
+    private BrokerService brokerService;
+    private ServerCnx serverCnx;
+    private MetadataStore store;
+    protected PulsarService pulsar;
+    private OrderedExecutor executor;
+    private EventLoopGroup eventLoopGroup;
+    private PersistentSubscription sub;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        executor = OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-cumulative-ack-test").build();
+        ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
+        svcConfig.setBrokerShutdownTimeoutMs(0L);
+        svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+        svcConfig.setClusterName("pulsar-cluster");
+        pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig);
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+
+        ManagedLedgerFactory mlFactoryMock = mock(ManagedLedgerFactory.class);
+        doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+        doReturn(TransactionTestBase.createMockBookKeeper(executor))
+            .when(pulsar).getBookKeeperClient();
+
+        store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build());
+        doReturn(store).when(pulsar).getLocalMetadataStore();
+        doReturn(store).when(pulsar).getConfigurationMetadataStore();
+
+        PulsarResources pulsarResources = new PulsarResources(store, store);
+        doReturn(pulsarResources).when(pulsar).getPulsarResources();
+
+        serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
+        doReturn(true).when(serverCnx).isActive();
+        doReturn(true).when(serverCnx).isWritable();
+        doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress();
+        when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getValue());
+        when(serverCnx.ctx()).thenReturn(mock(ChannelHandlerContext.class));
+        doReturn(new PulsarCommandSenderImpl(null, serverCnx))
+            .when(serverCnx).getCommandSender();
+
+        eventLoopGroup = new NioEventLoopGroup();
+        brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup);
+        doReturn(brokerService).when(pulsar).getBrokerService();
+
+        String topicName = TopicName.get("MessageCumulativeAckTest").toString();
+        PersistentTopic persistentTopic = new PersistentTopic(topicName, mock(ManagedLedger.class), brokerService);
+        sub = spy(new PersistentSubscription(persistentTopic, "sub-1",
+            mock(ManagedCursorImpl.class), false));
+        doNothing().when(sub).acknowledgeMessage(any(), any(), any());
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void shutdown() throws Exception {
+        if (brokerService != null) {
+            brokerService.close();
+            brokerService = null;
+        }
+        if (pulsar != null) {
+            pulsar.close();
+            pulsar = null;
+        }
+
+        executor.shutdown();
+        if (eventLoopGroup != null) {
+            eventLoopGroup.shutdownGracefully().get();
+        }
+        store.close();
+        sub = null;
+    }
+
+    @DataProvider(name = "individualAckModes")
+    public static Object[][] individualAckModes() {
+        return new Object[][]{
+            {Shared},
+            {Key_Shared},
+        };
+    }
+
+    @DataProvider(name = "notIndividualAckModes")
+    public static Object[][] notIndividualAckModes() {
+        return new Object[][]{
+            {Exclusive},
+            {Failover},
+        };
+    }
+
+    @Test(timeOut = 5000, dataProvider = "individualAckModes")
+    public void testAckWithIndividualAckMode(CommandSubscribe.SubType subType) throws Exception {
+        Consumer consumer = new Consumer(sub, subType, "topic-1", consumerId, 0,
+            "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
+            MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+
+        CommandAck commandAck = new CommandAck();
+        commandAck.setAckType(Cumulative);
+        commandAck.setConsumerId(consumerId);
+        commandAck.addMessageId().setEntryId(0L).setLedgerId(1L);
+
+        consumer.messageAcked(commandAck).get();
+        verify(sub, never()).acknowledgeMessage(any(), any(), any());
+    }
+
+    @Test(timeOut = 5000, dataProvider = "notIndividualAckModes")
+    public void testAckWithNotIndividualAckMode(CommandSubscribe.SubType subType) throws Exception {
+        Consumer consumer = new Consumer(sub, subType, "topic-1", consumerId, 0,
+            "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
+            MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+
+        CommandAck commandAck = new CommandAck();
+        commandAck.setAckType(Cumulative);
+        commandAck.setConsumerId(consumerId);
+        commandAck.addMessageId().setEntryId(0L).setLedgerId(1L);
+
+        consumer.messageAcked(commandAck).get();
+        verify(sub, times(1)).acknowledgeMessage(any(), any(), any());
+    }
+
+    @Test(timeOut = 5000)
+    public void testAckWithMoreThanNoneMessageIds() throws Exception {
+        Consumer consumer = new Consumer(sub, Failover, "topic-1", consumerId, 0,
+            "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
+            MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+
+        CommandAck commandAck = new CommandAck();
+        commandAck.setAckType(Cumulative);
+        commandAck.setConsumerId(consumerId);
+        commandAck.addMessageId().setEntryId(0L).setLedgerId(1L);
+        commandAck.addMessageId().setEntryId(0L).setLedgerId(2L);
+
+        consumer.messageAcked(commandAck).get();
+        verify(sub, never()).acknowledgeMessage(any(), any(), any());
+    }
+}


[pulsar] 07/12: [Revert] [#15483] Remove sensitive msg from consumer/producer stats log (#15817)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4a471d06ce173a99850866c095c54c88810e2ee4
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Sun Jun 5 09:44:47 2022 +0800

    [Revert] [#15483] Remove sensitive msg from consumer/producer stats log (#15817)
    
    ### Motivation
    See #15483
    The `@Secret` annotation works well, and introduced in #8910
    
    ### Modifications
    - Revert the unneeded `@JsonIgnore`
    - remove `Assert.assertFalse(s.contains("Password"));` `Password` is printed in a key. The sensitive field's value is `****`.
    
    (cherry picked from commit 67361e8db632b0cd4c23198c5c569f3f2193fc70)
---
 .../apache/pulsar/client/impl/conf/ClientConfigurationData.java    | 7 -------
 .../pulsar/client/impl/conf/ClientConfigurationDataTest.java       | 1 -
 2 files changed, 8 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 093e3e19883..3044b2a4c3b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -30,7 +30,6 @@ import java.util.Optional;
 import java.util.Set;
 import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.Getter;
 import lombok.NoArgsConstructor;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.ProxyProtocol;
@@ -61,7 +60,6 @@ public class ClientConfigurationData implements Serializable, Cloneable {
             value = "The implementation class of ServiceUrlProvider used to generate ServiceUrl."
     )
     @JsonIgnore
-    @Getter(onMethod_ = @JsonIgnore)
     private transient ServiceUrlProvider serviceUrlProvider;
 
     @ApiModelProperty(
@@ -257,8 +255,6 @@ public class ClientConfigurationData implements Serializable, Cloneable {
             value = "Password of TLS TrustStore."
     )
     @Secret
-    @JsonIgnore
-    @Getter(onMethod_ = @JsonIgnore)
     private String tlsTrustStorePassword = null;
 
     @ApiModelProperty(
@@ -318,10 +314,8 @@ public class ClientConfigurationData implements Serializable, Cloneable {
             value = "Password of SOCKS5 proxy."
     )
     @Secret
-    @JsonIgnore
     private String socks5ProxyPassword;
 
-    @JsonIgnore
     public Authentication getAuthentication() {
         if (authentication == null) {
             this.authentication = AuthenticationDisabled.INSTANCE;
@@ -377,7 +371,6 @@ public class ClientConfigurationData implements Serializable, Cloneable {
         return Objects.nonNull(socks5ProxyUsername) ? socks5ProxyUsername : System.getProperty("socks5Proxy.username");
     }
 
-    @JsonIgnore
     public String getSocks5ProxyPassword() {
         return Objects.nonNull(socks5ProxyPassword) ? socks5ProxyPassword : System.getProperty("socks5Proxy.password");
     }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java
index b5c30c9a7c6..c817ec996d4 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java
@@ -48,7 +48,6 @@ public class ClientConfigurationDataTest {
         clientConfigurationData.setSocks5ProxyPassword("yyyy");
         clientConfigurationData.setAuthentication(new AuthenticationToken("zzzz"));
         String s = w.writeValueAsString(clientConfigurationData);
-        Assert.assertFalse(s.contains("Password"));
         Assert.assertFalse(s.contains("xxxx"));
         Assert.assertFalse(s.contains("yyyy"));
         Assert.assertFalse(s.contains("zzzz"));


[pulsar] 03/12: [feat] [tiered-storage] Add pure S3 provider for the offloader (#15710)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b2de04362b302d8d4294e6e4073c9d046f9bb84d
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Wed May 25 15:37:33 2022 +0800

    [feat] [tiered-storage] Add pure S3 provider for the offloader (#15710)
    
    * [improve] [tiered-storage] Add pure S3 provider for the offloader
    ---
    
    *Motivation*
    
    There have some cloud storages are compatible with S3
    APIs, such as aliyun-oss. Some other storages also use
    the S3 APIs and want to offload the data into them, but
    we only support the AWS or the Aliyun.
    The PR https://github.com/apache/pulsar/pull/8985 provides
    the Aliyun offload provider, but it has a force limitation of
    the `S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS`. That
    is not a limitation on other storage service which compatible
    with S3 APIs.
    This PR provides  a more general offload provider `S3` which uses
    pure JClouds S3 metadata and allows people to override the
    default JClouds properties through system properties.
    
    *Modifications*
    
    - Add the pure S3 offload provider
    
    (cherry picked from commit 047cb0e3117d55a79c0082c0dc3d2ab3c9bcd6f9)
---
 .../jcloud/provider/JCloudBlobStoreProvider.java   | 54 ++++++++++++++++------
 .../provider/TieredStorageConfiguration.java       | 13 ++++++
 .../provider/JCloudBlobStoreProviderTests.java     | 31 ++++++++++++-
 .../provider/TieredStorageConfigurationTests.java  | 17 +++++++
 4 files changed, 99 insertions(+), 16 deletions(-)

diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index 44aa92ce924..fc28c0291ce 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -181,17 +181,34 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
     ALIYUN_OSS("aliyun-oss", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) {
         @Override
         public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
-            ALIYUN_OSS_VALIDATION.validate(config);
+            S3_VALIDATION.validate(config);
         }
 
         @Override
         public BlobStore getBlobStore(TieredStorageConfiguration config) {
-            return ALIYUN_OSS_BLOB_STORE_BUILDER.getBlobStore(config);
+            return S3_BLOB_STORE_BUILDER.getBlobStore(config);
         }
 
         @Override
         public void buildCredentials(TieredStorageConfiguration config) {
-            ALIYUN_OSS_CREDENTIAL_BUILDER.buildCredentials(config);
+            S3_CREDENTIAL_BUILDER.buildCredentials(config);
+        }
+    },
+
+    S3("S3", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) {
+        @Override
+        public BlobStore getBlobStore(TieredStorageConfiguration config) {
+            return S3_BLOB_STORE_BUILDER.getBlobStore(config);
+        }
+
+        @Override
+        public void buildCredentials(TieredStorageConfiguration config) {
+            S3_CREDENTIAL_BUILDER.buildCredentials(config);
+        }
+
+        @Override
+        public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
+            S3_VALIDATION.validate(config);
         }
     },
 
@@ -374,12 +391,14 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
         }
     };
 
-    static final BlobStoreBuilder ALIYUN_OSS_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> {
+    static final BlobStoreBuilder S3_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> {
         ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata());
         contextBuilder.modules(Arrays.asList(new SLF4JLoggingModule()));
         Properties overrides = config.getOverrides();
-        // For security reasons, OSS supports only virtual hosted style access.
-        overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true");
+        if (ALIYUN_OSS.getDriver().equals(config.getDriver())) {
+            // For security reasons, OSS supports only virtual hosted style access.
+            overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true");
+        }
         contextBuilder.overrides(overrides);
         contextBuilder.endpoint(config.getServiceEndpoint());
 
@@ -396,7 +415,7 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
         }
     };
 
-    static final ConfigValidation ALIYUN_OSS_VALIDATION = (TieredStorageConfiguration config) -> {
+    static final ConfigValidation S3_VALIDATION = (TieredStorageConfiguration config) -> {
         if (Strings.isNullOrEmpty(config.getServiceEndpoint())) {
             throw new IllegalArgumentException(
                     "ServiceEndpoint must specified for " + config.getDriver() + " offload");
@@ -414,14 +433,21 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
         }
     };
 
-    static final CredentialBuilder ALIYUN_OSS_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
-        String accountName = System.getenv("ALIYUN_OSS_ACCESS_KEY_ID");
-        if (StringUtils.isEmpty(accountName)) {
-            throw new IllegalArgumentException("Couldn't get the aliyun oss access key id.");
+    static final CredentialBuilder S3_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
+        String accountName = System.getenv().getOrDefault("ACCESS_KEY_ID", "");
+        // For forward compatibility
+        if (StringUtils.isEmpty(accountName.trim())) {
+            accountName = System.getenv().getOrDefault("ALIYUN_OSS_ACCESS_KEY_ID", "");
+        }
+        if (StringUtils.isEmpty(accountName.trim())) {
+            throw new IllegalArgumentException("Couldn't get the access key id.");
+        }
+        String accountKey = System.getenv().getOrDefault("ACCESS_KEY_ID", "");
+        if (StringUtils.isEmpty(accountKey.trim())) {
+            accountKey = System.getenv().getOrDefault("ALIYUN_OSS_ACCESS_KEY_SECRET", "");
         }
-        String accountKey = System.getenv("ALIYUN_OSS_ACCESS_KEY_SECRET");
-        if (StringUtils.isEmpty(accountKey)) {
-            throw new IllegalArgumentException("Couldn't get the aliyun oss access key secret.");
+        if (StringUtils.isEmpty(accountKey.trim())) {
+            throw new IllegalArgumentException("Couldn't get the access key secret.");
         }
         Credentials credentials = new Credentials(
                 accountName, accountKey);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
index c1054969a42..18e3bbf0db8 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
@@ -329,6 +329,19 @@ public class TieredStorageConfiguration {
             overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
         }
 
+        // load more jclouds properties into the overrides
+        System.getProperties().entrySet().stream()
+            .filter(p -> p.getKey().toString().startsWith("jclouds"))
+            .forEach(jcloudsProp -> {
+                overrides.setProperty(jcloudsProp.getKey().toString(), jcloudsProp.getValue().toString());
+            });
+
+        System.getenv().entrySet().stream()
+            .filter(p -> p.getKey().toString().startsWith("jclouds"))
+            .forEach(jcloudsProp -> {
+                overrides.setProperty(jcloudsProp.getKey().toString(), jcloudsProp.getValue().toString());
+            });
+
         log.info("getOverrides: {}", overrides.toString());
         return overrides;
     }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
index 28e5829ba2a..4f0c60bc007 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
@@ -23,8 +23,6 @@ import static org.testng.Assert.assertEquals;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
-import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
 import org.testng.annotations.Test;
 
 public class JCloudBlobStoreProviderTests {
@@ -105,4 +103,33 @@ public class JCloudBlobStoreProviderTests {
         config = new TieredStorageConfiguration(map);
         JCloudBlobStoreProvider.TRANSIENT.validate(config);
     }
+
+    @Test()
+    public void s3ValidationTest() {
+        Map<String, String> map = new HashMap<>();
+        map.put("managedLedgerOffloadDriver", "S3");
+        map.put("managedLedgerOffloadServiceEndpoint", "http://s3.service");
+        map.put("managedLedgerOffloadBucket", "test-s3-bucket");
+        TieredStorageConfiguration configuration = new TieredStorageConfiguration(map);
+        configuration.getProvider().validate(configuration);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "ServiceEndpoint must specified for S3 offload")
+    public void s3ValidationServiceEndpointMissed() {
+        Map<String, String> map = new HashMap<>();
+        map.put("managedLedgerOffloadDriver", "S3");
+        TieredStorageConfiguration configuration = new TieredStorageConfiguration(map);
+        configuration.getProvider().validate(configuration);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "Bucket cannot be empty for S3 offload")
+    public void s3ValidationBucketMissed() {
+        Map<String, String> map = new HashMap<>();
+        map.put("managedLedgerOffloadDriver", "S3");
+        map.put("managedLedgerOffloadServiceEndpoint", "http://s3.service");
+        TieredStorageConfiguration configuration = new TieredStorageConfiguration(map);
+        configuration.getProvider().validate(configuration);
+    }
 }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
index f80f3ceaa1a..bf5e046bf70 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
@@ -22,6 +22,8 @@ import static org.testng.Assert.assertEquals;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+
 import org.jclouds.domain.Credentials;
 import org.testng.annotations.Test;
 
@@ -205,4 +207,19 @@ public class TieredStorageConfigurationTests {
         assertEquals(config.getMaxBlockSizeInBytes(), new Integer(12));
         assertEquals(config.getReadBufferSizeInBytes(), new Integer(500));
     }
+
+    @Test
+    public void overridePropertiesTest() {
+        Map<String, String> map = new HashMap<>();
+        map.put("s3ManagedLedgerOffloadServiceEndpoint", "http://localhost");
+        map.put("s3ManagedLedgerOffloadRegion", "my-region");
+        System.setProperty("jclouds.SystemPropertyA", "A");
+        System.setProperty("jclouds.region", "jclouds-region");
+        TieredStorageConfiguration config = new TieredStorageConfiguration(map);
+        Properties properties = config.getOverrides();
+        System.out.println(properties.toString());
+        assertEquals(properties.get("jclouds.region"), "jclouds-region");
+        assertEquals(config.getServiceEndpoint(), "http://localhost");
+        assertEquals(properties.get("jclouds.SystemPropertyA"), "A");
+    }
 }


[pulsar] 05/12: Fix NPE in MessageDeduplication. (#15820)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 89e005f939fa4400bc1be5a9058264d9e071622a
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Jun 1 11:09:01 2022 +0800

    Fix NPE in MessageDeduplication. (#15820)
    
    (cherry picked from commit 01d7bfa681b23d1a236b1411b83e854c9ad9323f)
---
 .../pulsar/broker/service/persistent/MessageDeduplication.java     | 2 +-
 .../pulsar/broker/service/persistent/MessageDuplicationTest.java   | 7 +++++++
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 55d35201fe5..5b29a4f15c7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -490,7 +490,7 @@ public class MessageDeduplication {
                 hasInactive = true;
             }
         }
-        if (hasInactive) {
+        if (hasInactive && isEnabled()) {
             takeSnapshot(getManagedCursor().getMarkDeletedPosition());
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 76caccadcc9..0e1b37b5160 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -202,6 +202,13 @@ public class MessageDuplicationTest {
         messageDeduplication.purgeInactiveProducers();
         assertEquals(inactiveProducers.size(), 3);
 
+        doReturn(false).when(messageDeduplication).isEnabled();
+        inactiveProducers.put(producerName2, System.currentTimeMillis() - 80000);
+        inactiveProducers.put(producerName3, System.currentTimeMillis() - 80000);
+        messageDeduplication.purgeInactiveProducers();
+        assertFalse(inactiveProducers.containsKey(producerName2));
+        assertFalse(inactiveProducers.containsKey(producerName3));
+        doReturn(true).when(messageDeduplication).isEnabled();
         // Modify the inactive time of produce2 and produce3
         // messageDeduplication.purgeInactiveProducers() will remove producer2 and producer3
         inactiveProducers.put(producerName2, System.currentTimeMillis() - 70000);


[pulsar] 01/12: [cleanup] [broker] Remove useless code to avoid confusion in OpReadEntry#checkReadCompletion. (#15104)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 55e4a1d66a7cd7853d37c76a47fcb965bf3e937c
Author: 赵延 <ho...@apache.org>
AuthorDate: Mon Apr 11 12:34:36 2022 +0800

    [cleanup] [broker] Remove useless code to avoid confusion in OpReadEntry#checkReadCompletion. (#15104)
    
    (cherry picked from commit 93761284b9f6875da0403f5fedb6ccbfbbcd7315)
---
 .../main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java   | 6 +-----
 1 file changed, 1 insertion(+), 5 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index b9c82914a76..d7eb0467f56 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -137,12 +137,8 @@ class OpReadEntry implements ReadEntriesCallback {
         // op readPosition is smaller or equals maxPosition then can read again
         if (entries.size() < count && cursor.hasMoreEntries()
                 && maxPosition.compareTo(readPosition) > 0) {
-            // We still have more entries to read from the next ledger, schedule a new async operation
-            if (nextReadPosition.getLedgerId() != readPosition.getLedgerId()) {
-                cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
-            }
 
-            // Schedule next read in a different thread
+            // We still have more entries to read from the next ledger, schedule a new async operation
             cursor.ledger.getExecutor().execute(safeRun(() -> {
                 readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
                 cursor.ledger.asyncReadEntries(OpReadEntry.this);


[pulsar] 06/12: Fix avro conversion order of registration (#15863)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d6565a5d3dac9910a8c361a4a08a3a7faf101331
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Wed Jun 1 16:58:48 2022 +0800

    Fix avro conversion order of registration (#15863)
    
    ### Motivation
    
    Fixes #15858
    
    The conversion that is registered first is a higher priority than the registered later, so `TimestampMillisConversion` should not be registered after `TimestampMicrosConversion`.
    
    ### Modifications
    
    Improve `avro` conversion order of registration.
    
    (cherry picked from commit 311fdb5dad09217c1706409feb3387d59285c51f)
---
 .../pulsar/client/impl/schema/AvroSchema.java       |  3 ++-
 .../pulsar/client/impl/schema/AvroSchemaTest.java   | 21 +++++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index b34017e20aa..cff3ccdf8f6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -117,8 +117,8 @@ public class AvroSchema<T> extends AvroBaseStructSchema<T> {
         reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
         reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
         reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
-        reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
         if (jsr310ConversionEnabled) {
+            // The conversion that is registered first is higher priority than the registered later.
             reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
         } else {
             try {
@@ -127,6 +127,7 @@ public class AvroSchema<T> extends AvroBaseStructSchema<T> {
             } catch (ClassNotFoundException e) {
                 // Skip if have not provide joda-time dependency.
             }
+            reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
         }
         reflectData.addLogicalTypeConversion(new Conversions.UUIDConversion());
     }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index 9e707af8367..d69f8bf66ba 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -42,6 +42,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.SchemaValidationException;
 import org.apache.avro.SchemaValidator;
 import org.apache.avro.SchemaValidatorBuilder;
+import org.apache.avro.data.TimeConversions;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.BufferedBinaryEncoder;
 import org.apache.avro.reflect.AvroDefault;
@@ -459,4 +460,24 @@ public class AvroSchemaTest {
         assertEquals(pojo2.value1, myBigDecimalPojo.value1);
         assertEquals(pojo2.value2, myBigDecimalPojo.value2);
     }
+
+
+    @Data
+    private static class TimestampStruct {
+        Instant value;
+    }
+
+    @Test
+    public void testTimestampWithJsr310Conversion() {
+        AvroSchema<TimestampStruct> schema = AvroSchema.of(TimestampStruct.class);
+        Assert.assertEquals(
+                schema.getAvroSchema().getFields().get(0).schema().getTypes().get(1).getLogicalType().getName(),
+                new TimeConversions.TimestampMicrosConversion().getLogicalTypeName());
+
+        AvroSchema<TimestampStruct> schema2 = AvroSchema.of(SchemaDefinition.<TimestampStruct>builder()
+                .withPojo(TimestampStruct.class).withJSR310ConversionEnabled(true).build());
+        Assert.assertEquals(
+                schema2.getAvroSchema().getFields().get(0).schema().getTypes().get(1).getLogicalType().getName(),
+                new TimeConversions.TimestampMillisConversion().getLogicalTypeName());
+    }
 }


[pulsar] 10/12: [fix][auth] Generate correct well-known OpenID configuration URL (#15928)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 43ab20b735f3a7cd989369725206c79d82b4d4f0
Author: ran <ga...@126.com>
AuthorDate: Tue Jun 7 15:46:57 2022 +0800

    [fix][auth] Generate correct well-known OpenID configuration URL (#15928)
    
    (cherry picked from commit 304b03e7ff3eeff62c31f93738af488eb44abde0)
---
 pulsar-client-cpp/lib/auth/AuthOauth2.cc  |  9 ++++++++-
 pulsar-client-cpp/lib/auth/AuthOauth2.h   |  1 +
 pulsar-client-cpp/tests/AuthPluginTest.cc | 20 ++++++++++++++++++++
 3 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/lib/auth/AuthOauth2.cc b/pulsar-client-cpp/lib/auth/AuthOauth2.cc
index c3dfe550a0c..438239a46d6 100644
--- a/pulsar-client-cpp/lib/auth/AuthOauth2.cc
+++ b/pulsar-client-cpp/lib/auth/AuthOauth2.cc
@@ -143,6 +143,8 @@ ClientCredentialFlow::ClientCredentialFlow(ParamMap& params)
       audience_(params["audience"]),
       scope_(params["scope"]) {}
 
+std::string ClientCredentialFlow::getTokenEndPoint() const { return tokenEndPoint_; }
+
 static size_t curlWriteCallback(void* contents, size_t size, size_t nmemb, void* responseDataPtr) {
     ((std::string*)responseDataPtr)->append((char*)contents, size * nmemb);
     return size * nmemb;
@@ -168,7 +170,12 @@ void ClientCredentialFlow::initialize() {
     curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "GET");
 
     // set URL: well-know endpoint
-    curl_easy_setopt(handle, CURLOPT_URL, (issuerUrl_ + "/.well-known/openid-configuration").c_str());
+    std::string wellKnownUrl = issuerUrl_;
+    if (wellKnownUrl.back() == '/') {
+        wellKnownUrl.pop_back();
+    }
+    wellKnownUrl.append("/.well-known/openid-configuration");
+    curl_easy_setopt(handle, CURLOPT_URL, wellKnownUrl.c_str());
 
     // Write callback
     curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlWriteCallback);
diff --git a/pulsar-client-cpp/lib/auth/AuthOauth2.h b/pulsar-client-cpp/lib/auth/AuthOauth2.h
index a3658b353ee..986919ddfcd 100644
--- a/pulsar-client-cpp/lib/auth/AuthOauth2.h
+++ b/pulsar-client-cpp/lib/auth/AuthOauth2.h
@@ -57,6 +57,7 @@ class ClientCredentialFlow : public Oauth2Flow {
     void close();
 
     ParamMap generateParamMap() const;
+    std::string getTokenEndPoint() const;
 
    private:
     std::string tokenEndPoint_;
diff --git a/pulsar-client-cpp/tests/AuthPluginTest.cc b/pulsar-client-cpp/tests/AuthPluginTest.cc
index be987e07c48..01c19ebbea4 100644
--- a/pulsar-client-cpp/tests/AuthPluginTest.cc
+++ b/pulsar-client-cpp/tests/AuthPluginTest.cc
@@ -412,6 +412,26 @@ TEST(AuthPluginTest, testOauth2RequestBody) {
     ASSERT_EQ(flow2.generateParamMap(), expectedResult2);
 }
 
+TEST(AuthPluginTest, testInitialize) {
+    std::string issuerUrl = "https://dev-kt-aa9ne.us.auth0.com";
+    std::string expectedTokenEndPoint = issuerUrl + "/oauth/token";
+
+    ParamMap params;
+    params["issuer_url"] = issuerUrl;
+    params["client_id"] = "Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x";
+    params["client_secret"] = "rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb";
+    params["audience"] = "https://dev-kt-aa9ne.us.auth0.com/api/v2/";
+
+    ClientCredentialFlow flow1(params);
+    flow1.initialize();
+    ASSERT_EQ(flow1.getTokenEndPoint(), expectedTokenEndPoint);
+
+    params["issuer_url"] = issuerUrl + "/";
+    ClientCredentialFlow flow2(params);
+    flow2.initialize();
+    ASSERT_EQ(flow2.getTokenEndPoint(), expectedTokenEndPoint);
+}
+
 TEST(AuthPluginTest, testOauth2Failure) {
     ParamMap params;
     auto addKeyValue = [&](const std::string& key, const std::string& value) {


[pulsar] 04/12: fix bug in getNumberOfEntriesInStorage (#15627)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c1e6606682053c2ad38d141faaab7314a910c692
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Sat May 28 10:58:35 2022 +0800

    fix bug in getNumberOfEntriesInStorage (#15627)
    
    (cherry picked from commit a43981109a9322d94082ae0d87d0de53b8f237e8)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 29 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 412b778a3d6..604e3a4f3d8 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -964,7 +964,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     }
 
     public long getNumberOfEntriesInStorage() {
-        return ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition().getNext()));
+        return ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
     }
 
     @Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index af5f6c807e3..8826f0d99fc 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2243,6 +2243,35 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         assertEquals(targetPosition.getEntryId(), 4);
     }
 
+    @Test
+    public void testGetNumberOfEntriesInStorage() throws Exception {
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(5);
+        ManagedLedgerImpl managedLedger =
+                (ManagedLedgerImpl) factory.open("testGetNumberOfEntriesInStorage", managedLedgerConfig);
+        // open cursor to prevent ledger to be deleted when ledger rollover
+        ManagedCursorImpl managedCursor = (ManagedCursorImpl) managedLedger.openCursor("cursor");
+        int numberOfEntries = 10;
+        for (int i = 0; i < numberOfEntries; i++) {
+            managedLedger.addEntry(("entry-" + i).getBytes(Encoding));
+        }
+
+        //trigger ledger rollover and wait for the new ledger created
+        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
+        managedLedger.rollCurrentLedgerIfFull();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(managedLedger.getLedgersInfo().size(), 2);
+            assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger);
+        });
+        assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries());
+        assertEquals(5, managedLedger.getLedgersInfoAsList().get(1).getEntries());
+        log.info("### ledgers {}", managedLedger.getLedgersInfo());
+        long length = managedCursor.getNumberOfEntriesInStorage();
+        assertEquals(length, numberOfEntries);
+    }
+
     @Test
     public void testEstimatedBacklogSize() throws Exception {
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testEstimatedBacklogSize");


[pulsar] 12/12: [fix][txn] fix NPE of TransactionMetaStoreHandler (#15840)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3411bf211761dbbdf9f78a5b768ca30a52363148
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Jun 9 11:55:18 2022 +0800

    [fix][txn] fix NPE of TransactionMetaStoreHandler (#15840)
    
    (cherry picked from commit f9b0912dc3b7768c604b3f1c039c4068bb0d5810)
---
 .../apache/pulsar/client/impl/TransactionMetaStoreHandler.java    | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 5b91a1cd84b..82fc89ca0a1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -129,9 +129,6 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
                 return;
             }
 
-            connectionHandler.setClientCnx(cnx);
-            cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId, this);
-
             // if broker protocol version < 19, don't send TcClientConnectRequest to broker.
             if (cnx.getRemoteEndpointProtocolVersion() > ProtocolVersion.v18.getValue()) {
                 long requestId = client.newRequestId();
@@ -145,6 +142,8 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
                             cnx.channel().close();
                         }
 
+                        connectionHandler.setClientCnx(cnx);
+                        cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId, this);
                         if (!this.connectFuture.isDone()) {
                             this.connectFuture.complete(null);
                         }
@@ -168,6 +167,9 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
             } else {
                 if (!changeToReadyState()) {
                     cnx.channel().close();
+                } else {
+                    connectionHandler.setClientCnx(cnx);
+                    cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId, this);
                 }
                 this.connectFuture.complete(null);
             }


[pulsar] 09/12: [fix][txn]Fix transasction ack batch message (#15875)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit faac51937d2f9c95811e85446b3319f877dc9fd4
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Tue Jun 7 12:45:00 2022 +0800

    [fix][txn]Fix transasction ack batch message (#15875)
    
    Fixes https://github.com/apache/pulsar/issues/15832
    
    ### Motivation
    The transaction needs batch size to help determine whether the batch message is in the pending ack state.
    
    ### Modifications
    Returns the batch size of messageID directly.
    
    (cherry picked from commit f87b3708ae6f05a8d4d4d6cd0db1090724fbcf4b)
---
 .../org/apache/pulsar/broker/service/Consumer.java |  7 ++-
 .../pendingack/PendingAckPersistentTest.java       | 71 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 031574975d1..f2a4677dbfc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -478,8 +478,11 @@ public class Consumer {
                 position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
                 ackedCount = batchSize;
             }
-
-            positionsAcked.add(new MutablePair<>(position, (int) batchSize));
+            if (msgId.hasBatchSize()) {
+                positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
+            } else {
+                positionsAcked.add(new MutablePair<>(position, (int) batchSize));
+            }
 
             addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 6683be138da..bd22ff423a9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -45,8 +45,10 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
@@ -574,4 +576,73 @@ public class PendingAckPersistentTest extends TransactionTestBase {
         assertFalse(individualAckOfTransaction.containsKey(transaction2.getTxnID()));
 
     }
+
+    @Test
+    public void testTransactionConflictExceptionWhenAckBatchMessage() throws Exception {
+        String topic = TopicName.get(TopicDomain.persistent.toString(),
+                NamespaceName.get(NAMESPACE1), "test").toString();
+
+        String subscriptionName = "my-subscription-batch";
+        pulsarServiceList.get(0).getBrokerService()
+                .getManagedLedgerConfig(TopicName.get(topic)).get()
+                .setDeletionAtBatchIndexLevelEnabled(true);
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(true)
+                .batchingMaxMessages(3)
+                // set batch max publish delay big enough to make sure entry has 3 messages
+                .batchingMaxPublishDelay(10, TimeUnit.SECONDS)
+                .topic(topic).create();
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .subscriptionName(subscriptionName)
+                .enableBatchIndexAcknowledgment(true)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .isAckReceiptEnabled(true)
+                .topic(topic)
+                .subscribe();
+
+        List<MessageId> messageIds = new ArrayList<>();
+        List<CompletableFuture<MessageId>> futureMessageIds = new ArrayList<>();
+
+        List<String> messages = new ArrayList<>();
+        for (int i = 0; i < 3; i++) {
+            String message = "my-message-" + i;
+            messages.add(message);
+            CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(message);
+            futureMessageIds.add(messageIdCompletableFuture);
+        }
+
+        for (CompletableFuture<MessageId> futureMessageId : futureMessageIds) {
+            MessageId messageId = futureMessageId.get();
+            messageIds.add(messageId);
+        }
+
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.DAYS)
+                .build()
+                .get();
+
+        Message<String> message1 = consumer.receive();
+        Message<String> message2 = consumer.receive();
+
+        BatchMessageIdImpl messageId = (BatchMessageIdImpl) message2.getMessageId();
+        consumer.acknowledgeAsync(messageId, transaction).get();
+
+        Transaction transaction2 = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.DAYS)
+                .build()
+                .get();
+        transaction.commit().get();
+
+        try {
+            consumer.acknowledgeAsync(messageId, transaction2).get();
+            fail();
+        } catch (ExecutionException e) {
+            Assert.assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
+        }
+    }
+
 }