You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2022/06/07 01:54:42 UTC

[pulsar] branch branch-2.8 updated (d38407652f6 -> 7377a594762)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from d38407652f6 Configure DLog Bookie, Pulsar, and Admin clients via pass through config (#15818)
     new 72629be0d9a [feat] [tiered-storage] Add pure S3 provider for the offloader (#15710)
     new 8973baff6f6 [fix][ML]Fix NPE when put value to `RangeCache`. (#15707)
     new 155d60ca14c fix bug in getNumberOfEntriesInStorage (#15627)
     new 532aa85e0d8 Fix NPE in MessageDeduplication. (#15820)
     new 7377a594762 Fix avro conversion order of registration (#15863)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  2 +-
 .../apache/bookkeeper/mledger/util/RangeCache.java | 12 +++--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 29 ++++++++++++
 .../bookkeeper/mledger/util/RangeCacheTest.java    | 29 +++++++++++-
 .../service/persistent/MessageDeduplication.java   |  2 +-
 .../service/persistent/MessageDuplicationTest.java |  7 +++
 .../pulsar/client/impl/schema/AvroSchema.java      |  3 +-
 .../pulsar/client/impl/schema/AvroSchemaTest.java  | 21 +++++++++
 .../jcloud/provider/JCloudBlobStoreProvider.java   | 54 ++++++++++++++++------
 .../provider/TieredStorageConfiguration.java       | 13 ++++++
 .../provider/JCloudBlobStoreProviderTests.java     | 31 ++++++++++++-
 .../provider/TieredStorageConfigurationTests.java  | 17 +++++++
 12 files changed, 194 insertions(+), 26 deletions(-)


[pulsar] 02/05: [fix][ML]Fix NPE when put value to `RangeCache`. (#15707)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8973baff6f67f60e739977bc8894c026dc7fc170
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Tue May 24 12:15:25 2022 +0800

    [fix][ML]Fix NPE when put value to `RangeCache`. (#15707)
    
    ### Motivation
    
    When `ReferenceCounted` object overrides the method `deallocate` to make the `getLength` value equal null will cause NPE because the `RangeCache#put` method is not thread-safe. (The process of describing this abstraction is not very clear, please refer to the code modification :)
    
    Pulsar implementation may throw an exception to make `OpAddEntry` fail abnormal and fence the topic. relative code as below:
    
    https://github.com/apache/pulsar/blob/defeec0e84a63ea865f3a2790bc61b66a02254c5/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java#L211-L217
    
    **Exception screenshot:**
    
    ```java
    java.lang.NullPointerException: Cannot invoke "String.length()" because "value.s" is null
    
            at org.apache.bookkeeper.mledger.util.RangeCacheTest.lambda$testInParallel$6(RangeCacheTest.java:279)
            at org.apache.bookkeeper.mledger.util.RangeCache.put(RangeCache.java:77)
            at org.apache.bookkeeper.mledger.util.RangeCacheTest.testInParallel(RangeCacheTest.java:283)
            at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
            at java.base/java.lang.reflect.Method.invoke(Method.java:577)
            at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
            at org.testng.internal.TestInvoker.invokeMethod(TestInvoker.java:599)
            at org.testng.internal.TestInvoker.invokeTestMethod(TestInvoker.java:174)
            at org.testng.internal.MethodRunner.runInSequence(MethodRunner.java:46)
            at org.testng.internal.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:822)
            at org.testng.internal.TestInvoker.invokeTestMethods(TestInvoker.java:147)
            at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
            at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:128)
            at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
            at org.testng.TestRunner.privateRun(TestRunner.java:764)
            at org.testng.TestRunner.run(TestRunner.java:585)
            at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
            at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
            at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
            at org.testng.SuiteRunner.run(SuiteRunner.java:286)
            at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
            at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
            at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
            at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
            at org.testng.TestNG.runSuites(TestNG.java:1069)
            at org.testng.TestNG.run(TestNG.java:1037)
            at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
            at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)
    ```
    
    ### Modifications
    
    - Make the `RangeCache#put` method to thread-safe.
    
    (cherry picked from commit b155d84c2ee397fe8003f452f04ae6cedf229b5c)
---
 .../apache/bookkeeper/mledger/util/RangeCache.java | 12 +++++----
 .../bookkeeper/mledger/util/RangeCacheTest.java    | 29 ++++++++++++++++++++--
 2 files changed, 34 insertions(+), 7 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
index a5786ad8670..4a77ac91dca 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.tuple.Pair;
 
 /**
@@ -74,12 +75,13 @@ public class RangeCache<Key extends Comparable<Key>, Value extends ReferenceCoun
      * @return whether the entry was inserted in the cache
      */
     public boolean put(Key key, Value value) {
-        if (entries.putIfAbsent(key, value) == null) {
+        MutableBoolean flag = new MutableBoolean();
+        entries.computeIfAbsent(key, (k) -> {
             size.addAndGet(weighter.getSize(value));
-            return true;
-        } else {
-            return false;
-        }
+            flag.setValue(true);
+            return value;
+        });
+        return flag.booleanValue();
     }
 
     public Value get(Key key) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
index 95896d24f35..f31aa4a74f9 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java
@@ -29,11 +29,15 @@ import io.netty.util.AbstractReferenceCounted;
 import io.netty.util.ReferenceCounted;
 import org.apache.commons.lang3.tuple.Pair;
 import org.testng.annotations.Test;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 public class RangeCacheTest {
 
     class RefString extends AbstractReferenceCounted implements ReferenceCounted {
-        final String s;
+        String s;
 
         RefString(String s) {
             super();
@@ -43,7 +47,7 @@ public class RangeCacheTest {
 
         @Override
         protected void deallocate() {
-            // no-op
+            s = null;
         }
 
         @Override
@@ -122,6 +126,7 @@ public class RangeCacheTest {
         assertEquals(cache.getNumberOfEntries(), 2);
     }
 
+
     @Test
     public void customTimeExtraction() {
         RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> x.s.length());
@@ -268,4 +273,24 @@ public class RangeCacheTest {
         assertEquals((long) res.getRight(), 10);
         assertEquals(cache.getSize(), 90);
     }
+
+    @Test
+    public void testInParallel() {
+        RangeCache<String, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> 0);
+        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+        executor.scheduleWithFixedDelay(cache::clear, 10, 10, TimeUnit.MILLISECONDS);
+        for (int i = 0; i < 1000; i++) {
+            cache.put(UUID.randomUUID().toString(), new RefString("zero"));
+        }
+        executor.shutdown();
+    }
+
+    @Test
+    public void testPutSameObj() {
+        RangeCache<Integer, RefString> cache = new RangeCache<>(value -> value.s.length(), x -> 0);
+        RefString s0 = new RefString("zero");
+        assertEquals(s0.refCnt(), 1);
+        assertTrue(cache.put(0, s0));
+        assertFalse(cache.put(0, s0));
+    }
 }


[pulsar] 05/05: Fix avro conversion order of registration (#15863)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7377a59476285b490ab07e12fbdef5df532c19f6
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Wed Jun 1 16:58:48 2022 +0800

    Fix avro conversion order of registration (#15863)
    
    ### Motivation
    
    Fixes #15858
    
    The conversion that is registered first is a higher priority than the registered later, so `TimestampMillisConversion` should not be registered after `TimestampMicrosConversion`.
    
    ### Modifications
    
    Improve `avro` conversion order of registration.
    
    (cherry picked from commit 311fdb5dad09217c1706409feb3387d59285c51f)
---
 .../pulsar/client/impl/schema/AvroSchema.java       |  3 ++-
 .../pulsar/client/impl/schema/AvroSchemaTest.java   | 21 +++++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index b34017e20aa..cff3ccdf8f6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -117,8 +117,8 @@ public class AvroSchema<T> extends AvroBaseStructSchema<T> {
         reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
         reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
         reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
-        reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
         if (jsr310ConversionEnabled) {
+            // The conversion that is registered first is higher priority than the registered later.
             reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
         } else {
             try {
@@ -127,6 +127,7 @@ public class AvroSchema<T> extends AvroBaseStructSchema<T> {
             } catch (ClassNotFoundException e) {
                 // Skip if have not provide joda-time dependency.
             }
+            reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
         }
         reflectData.addLogicalTypeConversion(new Conversions.UUIDConversion());
     }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index 9e707af8367..d69f8bf66ba 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -42,6 +42,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.SchemaValidationException;
 import org.apache.avro.SchemaValidator;
 import org.apache.avro.SchemaValidatorBuilder;
+import org.apache.avro.data.TimeConversions;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.BufferedBinaryEncoder;
 import org.apache.avro.reflect.AvroDefault;
@@ -459,4 +460,24 @@ public class AvroSchemaTest {
         assertEquals(pojo2.value1, myBigDecimalPojo.value1);
         assertEquals(pojo2.value2, myBigDecimalPojo.value2);
     }
+
+
+    @Data
+    private static class TimestampStruct {
+        Instant value;
+    }
+
+    @Test
+    public void testTimestampWithJsr310Conversion() {
+        AvroSchema<TimestampStruct> schema = AvroSchema.of(TimestampStruct.class);
+        Assert.assertEquals(
+                schema.getAvroSchema().getFields().get(0).schema().getTypes().get(1).getLogicalType().getName(),
+                new TimeConversions.TimestampMicrosConversion().getLogicalTypeName());
+
+        AvroSchema<TimestampStruct> schema2 = AvroSchema.of(SchemaDefinition.<TimestampStruct>builder()
+                .withPojo(TimestampStruct.class).withJSR310ConversionEnabled(true).build());
+        Assert.assertEquals(
+                schema2.getAvroSchema().getFields().get(0).schema().getTypes().get(1).getLogicalType().getName(),
+                new TimeConversions.TimestampMillisConversion().getLogicalTypeName());
+    }
 }


[pulsar] 01/05: [feat] [tiered-storage] Add pure S3 provider for the offloader (#15710)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 72629be0d9aac9e850030b18b8642ea591119245
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Wed May 25 15:37:33 2022 +0800

    [feat] [tiered-storage] Add pure S3 provider for the offloader (#15710)
    
    * [improve] [tiered-storage] Add pure S3 provider for the offloader
    ---
    
    *Motivation*
    
    There have some cloud storages are compatible with S3
    APIs, such as aliyun-oss. Some other storages also use
    the S3 APIs and want to offload the data into them, but
    we only support the AWS or the Aliyun.
    The PR https://github.com/apache/pulsar/pull/8985 provides
    the Aliyun offload provider, but it has a force limitation of
    the `S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS`. That
    is not a limitation on other storage service which compatible
    with S3 APIs.
    This PR provides  a more general offload provider `S3` which uses
    pure JClouds S3 metadata and allows people to override the
    default JClouds properties through system properties.
    
    *Modifications*
    
    - Add the pure S3 offload provider
    
    (cherry picked from commit 047cb0e3117d55a79c0082c0dc3d2ab3c9bcd6f9)
---
 .../jcloud/provider/JCloudBlobStoreProvider.java   | 54 ++++++++++++++++------
 .../provider/TieredStorageConfiguration.java       | 13 ++++++
 .../provider/JCloudBlobStoreProviderTests.java     | 31 ++++++++++++-
 .../provider/TieredStorageConfigurationTests.java  | 17 +++++++
 4 files changed, 99 insertions(+), 16 deletions(-)

diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
index 44aa92ce924..fc28c0291ce 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProvider.java
@@ -181,17 +181,34 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
     ALIYUN_OSS("aliyun-oss", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) {
         @Override
         public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
-            ALIYUN_OSS_VALIDATION.validate(config);
+            S3_VALIDATION.validate(config);
         }
 
         @Override
         public BlobStore getBlobStore(TieredStorageConfiguration config) {
-            return ALIYUN_OSS_BLOB_STORE_BUILDER.getBlobStore(config);
+            return S3_BLOB_STORE_BUILDER.getBlobStore(config);
         }
 
         @Override
         public void buildCredentials(TieredStorageConfiguration config) {
-            ALIYUN_OSS_CREDENTIAL_BUILDER.buildCredentials(config);
+            S3_CREDENTIAL_BUILDER.buildCredentials(config);
+        }
+    },
+
+    S3("S3", new AnonymousProviderMetadata(new S3ApiMetadata(), "")) {
+        @Override
+        public BlobStore getBlobStore(TieredStorageConfiguration config) {
+            return S3_BLOB_STORE_BUILDER.getBlobStore(config);
+        }
+
+        @Override
+        public void buildCredentials(TieredStorageConfiguration config) {
+            S3_CREDENTIAL_BUILDER.buildCredentials(config);
+        }
+
+        @Override
+        public void validate(TieredStorageConfiguration config) throws IllegalArgumentException {
+            S3_VALIDATION.validate(config);
         }
     },
 
@@ -374,12 +391,14 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
         }
     };
 
-    static final BlobStoreBuilder ALIYUN_OSS_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> {
+    static final BlobStoreBuilder S3_BLOB_STORE_BUILDER = (TieredStorageConfiguration config) -> {
         ContextBuilder contextBuilder = ContextBuilder.newBuilder(config.getProviderMetadata());
         contextBuilder.modules(Arrays.asList(new SLF4JLoggingModule()));
         Properties overrides = config.getOverrides();
-        // For security reasons, OSS supports only virtual hosted style access.
-        overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true");
+        if (ALIYUN_OSS.getDriver().equals(config.getDriver())) {
+            // For security reasons, OSS supports only virtual hosted style access.
+            overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "true");
+        }
         contextBuilder.overrides(overrides);
         contextBuilder.endpoint(config.getServiceEndpoint());
 
@@ -396,7 +415,7 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
         }
     };
 
-    static final ConfigValidation ALIYUN_OSS_VALIDATION = (TieredStorageConfiguration config) -> {
+    static final ConfigValidation S3_VALIDATION = (TieredStorageConfiguration config) -> {
         if (Strings.isNullOrEmpty(config.getServiceEndpoint())) {
             throw new IllegalArgumentException(
                     "ServiceEndpoint must specified for " + config.getDriver() + " offload");
@@ -414,14 +433,21 @@ public enum JCloudBlobStoreProvider implements Serializable, ConfigValidation, B
         }
     };
 
-    static final CredentialBuilder ALIYUN_OSS_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
-        String accountName = System.getenv("ALIYUN_OSS_ACCESS_KEY_ID");
-        if (StringUtils.isEmpty(accountName)) {
-            throw new IllegalArgumentException("Couldn't get the aliyun oss access key id.");
+    static final CredentialBuilder S3_CREDENTIAL_BUILDER = (TieredStorageConfiguration config) -> {
+        String accountName = System.getenv().getOrDefault("ACCESS_KEY_ID", "");
+        // For forward compatibility
+        if (StringUtils.isEmpty(accountName.trim())) {
+            accountName = System.getenv().getOrDefault("ALIYUN_OSS_ACCESS_KEY_ID", "");
+        }
+        if (StringUtils.isEmpty(accountName.trim())) {
+            throw new IllegalArgumentException("Couldn't get the access key id.");
+        }
+        String accountKey = System.getenv().getOrDefault("ACCESS_KEY_ID", "");
+        if (StringUtils.isEmpty(accountKey.trim())) {
+            accountKey = System.getenv().getOrDefault("ALIYUN_OSS_ACCESS_KEY_SECRET", "");
         }
-        String accountKey = System.getenv("ALIYUN_OSS_ACCESS_KEY_SECRET");
-        if (StringUtils.isEmpty(accountKey)) {
-            throw new IllegalArgumentException("Couldn't get the aliyun oss access key secret.");
+        if (StringUtils.isEmpty(accountKey.trim())) {
+            throw new IllegalArgumentException("Couldn't get the access key secret.");
         }
         Credentials credentials = new Credentials(
                 accountName, accountKey);
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
index 442980ad336..dfca32eff11 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfiguration.java
@@ -329,6 +329,19 @@ public class TieredStorageConfiguration {
             overrides.setProperty(S3Constants.PROPERTY_S3_VIRTUAL_HOST_BUCKETS, "false");
         }
 
+        // load more jclouds properties into the overrides
+        System.getProperties().entrySet().stream()
+            .filter(p -> p.getKey().toString().startsWith("jclouds"))
+            .forEach(jcloudsProp -> {
+                overrides.setProperty(jcloudsProp.getKey().toString(), jcloudsProp.getValue().toString());
+            });
+
+        System.getenv().entrySet().stream()
+            .filter(p -> p.getKey().toString().startsWith("jclouds"))
+            .forEach(jcloudsProp -> {
+                overrides.setProperty(jcloudsProp.getKey().toString(), jcloudsProp.getValue().toString());
+            });
+
         log.info("getOverrides: {}", overrides.toString());
         return overrides;
     }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
index 28e5829ba2a..4f0c60bc007 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/JCloudBlobStoreProviderTests.java
@@ -23,8 +23,6 @@ import static org.testng.Assert.assertEquals;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
-import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
 import org.testng.annotations.Test;
 
 public class JCloudBlobStoreProviderTests {
@@ -105,4 +103,33 @@ public class JCloudBlobStoreProviderTests {
         config = new TieredStorageConfiguration(map);
         JCloudBlobStoreProvider.TRANSIENT.validate(config);
     }
+
+    @Test()
+    public void s3ValidationTest() {
+        Map<String, String> map = new HashMap<>();
+        map.put("managedLedgerOffloadDriver", "S3");
+        map.put("managedLedgerOffloadServiceEndpoint", "http://s3.service");
+        map.put("managedLedgerOffloadBucket", "test-s3-bucket");
+        TieredStorageConfiguration configuration = new TieredStorageConfiguration(map);
+        configuration.getProvider().validate(configuration);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "ServiceEndpoint must specified for S3 offload")
+    public void s3ValidationServiceEndpointMissed() {
+        Map<String, String> map = new HashMap<>();
+        map.put("managedLedgerOffloadDriver", "S3");
+        TieredStorageConfiguration configuration = new TieredStorageConfiguration(map);
+        configuration.getProvider().validate(configuration);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "Bucket cannot be empty for S3 offload")
+    public void s3ValidationBucketMissed() {
+        Map<String, String> map = new HashMap<>();
+        map.put("managedLedgerOffloadDriver", "S3");
+        map.put("managedLedgerOffloadServiceEndpoint", "http://s3.service");
+        TieredStorageConfiguration configuration = new TieredStorageConfiguration(map);
+        configuration.getProvider().validate(configuration);
+    }
 }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
index f80f3ceaa1a..bf5e046bf70 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
@@ -22,6 +22,8 @@ import static org.testng.Assert.assertEquals;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+
 import org.jclouds.domain.Credentials;
 import org.testng.annotations.Test;
 
@@ -205,4 +207,19 @@ public class TieredStorageConfigurationTests {
         assertEquals(config.getMaxBlockSizeInBytes(), new Integer(12));
         assertEquals(config.getReadBufferSizeInBytes(), new Integer(500));
     }
+
+    @Test
+    public void overridePropertiesTest() {
+        Map<String, String> map = new HashMap<>();
+        map.put("s3ManagedLedgerOffloadServiceEndpoint", "http://localhost");
+        map.put("s3ManagedLedgerOffloadRegion", "my-region");
+        System.setProperty("jclouds.SystemPropertyA", "A");
+        System.setProperty("jclouds.region", "jclouds-region");
+        TieredStorageConfiguration config = new TieredStorageConfiguration(map);
+        Properties properties = config.getOverrides();
+        System.out.println(properties.toString());
+        assertEquals(properties.get("jclouds.region"), "jclouds-region");
+        assertEquals(config.getServiceEndpoint(), "http://localhost");
+        assertEquals(properties.get("jclouds.SystemPropertyA"), "A");
+    }
 }


[pulsar] 03/05: fix bug in getNumberOfEntriesInStorage (#15627)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 155d60ca14cb0cb2a93ee0f97f971215acdb918a
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Sat May 28 10:58:35 2022 +0800

    fix bug in getNumberOfEntriesInStorage (#15627)
    
    (cherry picked from commit a43981109a9322d94082ae0d87d0de53b8f237e8)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 29 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index ef0a5a4b95f..f3a865bd784 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -963,7 +963,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     }
 
     public long getNumberOfEntriesInStorage() {
-        return ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition().getNext()));
+        return ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
     }
 
     @Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index aadf9b4c893..d2f3be9ef0e 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2188,6 +2188,35 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         assertEquals(targetPosition.getEntryId(), 4);
     }
 
+    @Test
+    public void testGetNumberOfEntriesInStorage() throws Exception {
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(5);
+        ManagedLedgerImpl managedLedger =
+                (ManagedLedgerImpl) factory.open("testGetNumberOfEntriesInStorage", managedLedgerConfig);
+        // open cursor to prevent ledger to be deleted when ledger rollover
+        ManagedCursorImpl managedCursor = (ManagedCursorImpl) managedLedger.openCursor("cursor");
+        int numberOfEntries = 10;
+        for (int i = 0; i < numberOfEntries; i++) {
+            managedLedger.addEntry(("entry-" + i).getBytes(Encoding));
+        }
+
+        //trigger ledger rollover and wait for the new ledger created
+        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
+        managedLedger.rollCurrentLedgerIfFull();
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(managedLedger.getLedgersInfo().size(), 2);
+            assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger);
+        });
+        assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries());
+        assertEquals(5, managedLedger.getLedgersInfoAsList().get(1).getEntries());
+        log.info("### ledgers {}", managedLedger.getLedgersInfo());
+        long length = managedCursor.getNumberOfEntriesInStorage();
+        assertEquals(length, numberOfEntries);
+    }
+
     @Test
     public void testEstimatedBacklogSize() throws Exception {
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testEstimatedBacklogSize");


[pulsar] 04/05: Fix NPE in MessageDeduplication. (#15820)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 532aa85e0d8ff78b1c99485d54dd7de224a7a41f
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Jun 1 11:09:01 2022 +0800

    Fix NPE in MessageDeduplication. (#15820)
    
    (cherry picked from commit 01d7bfa681b23d1a236b1411b83e854c9ad9323f)
---
 .../pulsar/broker/service/persistent/MessageDeduplication.java     | 2 +-
 .../pulsar/broker/service/persistent/MessageDuplicationTest.java   | 7 +++++++
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 5d0d8f8b1ea..7dd2ca4ba99 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -493,7 +493,7 @@ public class MessageDeduplication {
                 hasInactive = true;
             }
         }
-        if (hasInactive) {
+        if (hasInactive && isEnabled()) {
             takeSnapshot(getManagedCursor().getMarkDeletedPosition());
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
index 5c2598ceac2..c324e13da91 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java
@@ -199,6 +199,13 @@ public class MessageDuplicationTest {
         messageDeduplication.purgeInactiveProducers();
         assertEquals(inactiveProducers.size(), 3);
 
+        doReturn(false).when(messageDeduplication).isEnabled();
+        inactiveProducers.put(producerName2, System.currentTimeMillis() - 80000);
+        inactiveProducers.put(producerName3, System.currentTimeMillis() - 80000);
+        messageDeduplication.purgeInactiveProducers();
+        assertFalse(inactiveProducers.containsKey(producerName2));
+        assertFalse(inactiveProducers.containsKey(producerName3));
+        doReturn(true).when(messageDeduplication).isEnabled();
         // Modify the inactive time of produce2 and produce3
         // messageDeduplication.purgeInactiveProducers() will remove producer2 and producer3
         inactiveProducers.put(producerName2, System.currentTimeMillis() - 70000);