You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/07/23 04:54:23 UTC

[pulsar] branch branch-2.8 updated (702c8cb -> fe6158b)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 702c8cb  Pulsar IO: allow Sinks to use native AVRO and JSON (#11322)
     new 91a6e2e  fix flaky test testUpdateDynamicLocalConfiguration (#11115)
     new a3f179c  Avoid infinite waiting for consumer close (#11347)
     new fddac0d  [Tests] Reduce integration test memory usage in CI (#11414)
     new 3b40ace  Disable replicate system topic across clusters. (#11376)
     new beeefac  [Transaction] Fix direct memory leak related to commit and abort markers (#11407)
     new 0577259  Invalidate the read handle after all cursors consumed. (#11389)
     new 29b9031  [Python Client] Fix handle complex schema (#11400)
     new d1eb106  [Issue 11340] Fix concurrency issues in NarUnpacker (#11343)
     new 81e738d  Add metrics storageLogicalSize for the TopicStats and NamespaceStats (#11430)
     new fe6158b  [Broker] Add multi roles support for authorization (#11341)

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/actions/tune-runner-vm/action.yml          |  25 +-
 .github/workflows/ci-cpp.yaml                      |   4 +-
 .../ci-integration-backwards-compatibility.yaml    |   5 +-
 .github/workflows/ci-integration-cli.yaml          |   3 +-
 .github/workflows/ci-integration-function.yaml     |   5 +-
 .github/workflows/ci-integration-messaging.yaml    |   5 +-
 .github/workflows/ci-integration-process.yaml      |  10 +-
 .github/workflows/ci-integration-schema.yaml       |   5 +-
 .github/workflows/ci-integration-sql.yaml          |  13 +-
 .github/workflows/ci-integration-standalone.yaml   |   5 +-
 .github/workflows/ci-integration-thread.yaml       |   5 +-
 .../ci-integration-tiered-filesystem.yaml          |   5 +-
 .../workflows/ci-integration-tiered-jcloud.yaml    |   5 +-
 .github/workflows/ci-integration-transaction.yaml  |   5 +-
 .github/workflows/ci-pulsar-website-build.yaml     |   5 +-
 .github/workflows/ci-shade-test.yaml               |   3 +-
 .../bookkeeper/mledger/ManagedLedgerMXBean.java    |   5 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   4 +-
 .../mledger/impl/ManagedLedgerMBeanImpl.java       |   5 +
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  76 +++++++
 .../MultiRolesTokenAuthorizationProvider.java      | 253 +++++++++++++++++++++
 .../MultiRolesTokenAuthorizationProviderTest.java  |  71 ++++++
 .../broker/service/persistent/SystemTopic.java     |   5 +
 .../stats/prometheus/AggregatedNamespaceStats.java |   1 +
 .../stats/prometheus/ManagedLedgerStats.java       |   2 +
 .../stats/prometheus/NamespaceStatsAggregator.java |   2 +
 .../pulsar/broker/stats/prometheus/TopicStats.java |   2 +
 .../stats/prometheus/TransactionAggregator.java    |   3 +
 .../buffer/impl/TopicTransactionBuffer.java        | 104 +++++----
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  13 +-
 .../pulsar/broker/service/ReplicatorTest.java      |  55 ++++-
 .../pulsar/broker/service/ReplicatorTestBase.java  |   2 +
 .../broker/stats/TransactionMetricsTest.java       |   6 +
 .../prometheus/AggregatedNamespaceStatsTest.java   |   4 +
 .../python/pulsar/schema/definition.py             |  15 +-
 .../python/pulsar/schema/schema_avro.py            |  11 +-
 pulsar-client-cpp/python/schema_test.py            | 115 +++++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   4 +-
 .../org/apache/pulsar/common/nar/NarUnpacker.java  |  79 ++++---
 .../org/apache/pulsar/common/util/FutureUtil.java  |  10 +
 .../apache/pulsar/common/nar/NarUnpackerTest.java  | 160 +++++++++++++
 site2/docs/reference-metrics.md                    |   1 +
 .../docker-images/latest-version-image/Dockerfile  |   2 +
 .../latest-version-image/conf}/presto/jvm.config   |   3 +-
 tests/integration/pom.xml                          |   2 +-
 .../integration/containers/ChaosContainer.java     |   6 +
 .../integration/containers/PulsarContainer.java    |   1 +
 .../containers/StandaloneContainer.java            |   1 +
 48 files changed, 981 insertions(+), 150 deletions(-)
 create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
 create mode 100644 pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProviderTest.java
 create mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/nar/NarUnpackerTest.java
 copy {conf => tests/docker-images/latest-version-image/conf}/presto/jvm.config (98%)

[pulsar] 08/10: [Issue 11340] Fix concurrency issues in NarUnpacker (#11343)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d1eb10631f9a08701d549cf32ae6b760d4503d07
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Jul 23 01:25:21 2021 +0300

    [Issue 11340] Fix concurrency issues in NarUnpacker (#11343)
    
    Fixes #11340
    Fixes #11379
    
    NarUnpacker used to extract Pulsar Functions jar files and Pulsar IO connector nar files has concurrency and file corruption issues.
    
    When multiple processes start at once, the files in the extracted directory might get corrupted.
    There are also cases where the same extraction directory is used for different versions of the functions jar file therefore creating file corruption issues.
    
    - Calculate MD5 hash of the file content and use the Base64 encoded hash as the directory name
    - Prevent concurrent extraction of the file content across multiple JVMs on the same host machine by using a OS level file lock. This solution is necessary since Nar file extraction happens in separate JVMs, the Java function instance processes.
    - Prevent concurrent extraction of the file content in the same JVM with a object monitor based on the file path.
      - Java's file locking solution throws OverlappingFileLockException if the same JVM acquires the file lock twice. Therefore it's necessary to have 2 solutions: one for preventing concurrent access within the JVM and the other to prevent concurrent access across JVMs.
    
    (cherry picked from commit dbf31384e6f0d915fc6fc26dcf8a496963b8384e)
---
 .../org/apache/pulsar/common/nar/NarUnpacker.java  |  79 +++++-----
 .../apache/pulsar/common/nar/NarUnpackerTest.java  | 160 +++++++++++++++++++++
 2 files changed, 207 insertions(+), 32 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
index 037d9f9..7544417 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
@@ -24,16 +24,20 @@
 
 package org.apache.pulsar.common.nar;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.file.Files;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
+import java.util.Base64;
 import java.util.Enumeration;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import lombok.extern.slf4j.Slf4j;
@@ -44,7 +48,7 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class NarUnpacker {
 
-    private final static String HASH_FILENAME = "nar-md5sum";
+    private static final ConcurrentHashMap<String, Object> CURRENT_JVM_FILE_LOCKS = new ConcurrentHashMap<>();
 
     /**
      * Unpacks the specified nar into the specified base working directory.
@@ -58,43 +62,59 @@ public class NarUnpacker {
      *             if unable to explode nar
      */
     public static File unpackNar(final File nar, final File baseWorkingDirectory) throws IOException {
-        final File narWorkingDirectory = new File(baseWorkingDirectory, nar.getName() + "-unpacked");
+        return doUnpackNar(nar, baseWorkingDirectory, null);
+    }
 
-        // if the working directory doesn't exist, unpack the nar
-        if (!narWorkingDirectory.exists()) {
-            unpack(nar, narWorkingDirectory, calculateMd5sum(nar));
-        } else {
-            // the working directory does exist. Run MD5 sum against the nar
-            // file and check if the nar has changed since it was deployed.
-            final byte[] narMd5 = calculateMd5sum(nar);
-            final File workingHashFile = new File(narWorkingDirectory, HASH_FILENAME);
-            if (!workingHashFile.exists()) {
-                FileUtils.deleteFile(narWorkingDirectory, true);
-                unpack(nar, narWorkingDirectory, narMd5);
-            } else {
-                final byte[] hashFileContents = Files.readAllBytes(workingHashFile.toPath());
-                if (!Arrays.equals(hashFileContents, narMd5)) {
-                    log.info("Contents of nar {} have changed. Reloading.", nar.getAbsolutePath());
-                    FileUtils.deleteFile(narWorkingDirectory, true);
-                    unpack(nar, narWorkingDirectory, narMd5);
+    @VisibleForTesting
+    static File doUnpackNar(final File nar, final File baseWorkingDirectory, Runnable extractCallback)
+            throws IOException {
+        File parentDirectory = new File(baseWorkingDirectory, nar.getName() + "-unpacked");
+        if (!parentDirectory.exists()) {
+            if (parentDirectory.mkdirs()) {
+                log.info("Created directory {}", parentDirectory);
+            } else if (!parentDirectory.exists()) {
+                throw new IOException("Cannot create " + parentDirectory);
+            }
+        }
+        String md5Sum = Base64.getUrlEncoder().withoutPadding().encodeToString(calculateMd5sum(nar));
+        // ensure that one process can extract the files
+        File lockFile = new File(parentDirectory, "." + md5Sum + ".lock");
+        // prevent OverlappingFileLockException by ensuring that one thread tries to create a lock in this JVM
+        Object localLock = CURRENT_JVM_FILE_LOCKS.computeIfAbsent(lockFile.getAbsolutePath(), key -> new Object());
+        synchronized (localLock) {
+            // create file lock that ensures that other processes
+            // using the same lock file don't execute concurrently
+            try (FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
+                 FileLock lock = channel.lock()) {
+                File narWorkingDirectory = new File(parentDirectory, md5Sum);
+                if (narWorkingDirectory.mkdir()) {
+                    try {
+                        log.info("Extracting {} to {}", nar, narWorkingDirectory);
+                        if (extractCallback != null) {
+                            extractCallback.run();
+                        }
+                        unpack(nar, narWorkingDirectory);
+                    } catch (IOException e) {
+                        log.error("There was a problem extracting the nar file. Deleting {} to clean up state.",
+                                narWorkingDirectory, e);
+                        FileUtils.deleteFile(narWorkingDirectory, true);
+                        throw e;
+                    }
                 }
+                return narWorkingDirectory;
             }
         }
-
-        return narWorkingDirectory;
     }
 
     /**
-     * Unpacks the NAR to the specified directory. Creates a checksum file that used to determine if future expansion is
-     * necessary.
+     * Unpacks the NAR to the specified directory.
      *
      * @param workingDirectory
      *            the root directory to which the NAR should be unpacked.
      * @throws IOException
      *             if the NAR could not be unpacked.
      */
-    private static void unpack(final File nar, final File workingDirectory, final byte[] hash) throws IOException {
-
+    private static void unpack(final File nar, final File workingDirectory) throws IOException {
         try (JarFile jarFile = new JarFile(nar)) {
             Enumeration<JarEntry> jarEntries = jarFile.entries();
             while (jarEntries.hasMoreElements()) {
@@ -108,11 +128,6 @@ public class NarUnpacker {
                 }
             }
         }
-
-        final File hashFile = new File(workingDirectory, HASH_FILENAME);
-        try (final FileOutputStream fos = new FileOutputStream(hashFile)) {
-            fos.write(hash);
-        }
     }
 
     /**
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/nar/NarUnpackerTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/nar/NarUnpackerTest.java
new file mode 100644
index 0000000..1d2acd0
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/nar/NarUnpackerTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.nar;
+
+import static org.junit.Assert.assertTrue;
+import static org.testng.Assert.assertEquals;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.SystemUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class NarUnpackerTest {
+    File sampleZipFile;
+    File extractDirectory;
+
+    @BeforeMethod
+    public void createSampleZipFile() throws IOException {
+        sampleZipFile = Files.createTempFile("sample", ".zip").toFile();
+        try (ZipOutputStream out = new ZipOutputStream(new FileOutputStream(sampleZipFile))) {
+            for (int i = 0; i < 10000; i++) {
+                ZipEntry e = new ZipEntry("hello" + i + ".txt");
+                out.putNextEntry(e);
+                byte[] msg = "hello world!".getBytes(StandardCharsets.UTF_8);
+                out.write(msg, 0, msg.length);
+                out.closeEntry();
+            }
+        }
+        extractDirectory = Files.createTempDirectory("nar_unpack_dir").toFile();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    void deleteSampleZipFile() throws IOException {
+        if (sampleZipFile != null) {
+            sampleZipFile.delete();
+        }
+        if (extractDirectory != null) {
+            FileUtils.deleteFile(extractDirectory, true);
+        }
+    }
+
+    @Test
+    void shouldExtractFilesOnceInSameProcess() throws InterruptedException {
+        int threads = 20;
+        CountDownLatch countDownLatch = new CountDownLatch(threads);
+        AtomicInteger exceptionCounter = new AtomicInteger();
+        AtomicInteger extractCounter = new AtomicInteger();
+        for (int i = 0; i < threads; i++) {
+            new Thread(() -> {
+                try {
+                    NarUnpacker.doUnpackNar(sampleZipFile, extractDirectory, extractCounter::incrementAndGet);
+                } catch (Exception e) {
+                    log.error("Unpacking failed", e);
+                    exceptionCounter.incrementAndGet();
+                } finally {
+                    countDownLatch.countDown();
+                }
+            }).start();
+        }
+        assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
+        assertEquals(exceptionCounter.get(), 0);
+        assertEquals(extractCounter.get(), 1);
+    }
+
+    public static class NarUnpackerWorker {
+        public static void main(String[] args) {
+            File sampleZipFile = new File(args[0]);
+            File extractDirectory = new File(args[1]);
+            AtomicInteger extractCounter = new AtomicInteger();
+            try {
+                NarUnpacker.doUnpackNar(sampleZipFile, extractDirectory, extractCounter::incrementAndGet);
+                if (extractCounter.get() == 1) {
+                    System.exit(101);
+                } else if (extractCounter.get() == 0) {
+                    System.exit(100);
+                }
+            } catch (Exception e) {
+                log.error("Unpacking failed", e);
+                System.exit(99);
+            }
+        }
+    }
+
+    @Test
+    void shouldExtractFilesOnceInDifferentProcess() throws InterruptedException {
+        int processes = 10;
+        String javaExePath = findJavaExe().getAbsolutePath();
+        CountDownLatch countDownLatch = new CountDownLatch(processes);
+        AtomicInteger exceptionCounter = new AtomicInteger();
+        AtomicInteger extractCounter = new AtomicInteger();
+        for (int i = 0; i < processes; i++) {
+            new Thread(() -> {
+                try {
+                    // fork a new process with the same classpath
+                    Process process = new ProcessBuilder()
+                            .command(javaExePath,
+                                    "-Xmx64m",
+                                    "-cp",
+                                    System.getProperty("java.class.path"),
+                                    // use NarUnpackerWorker as the main class
+                                    NarUnpackerWorker.class.getName(),
+                                    // pass arguments to use for testing
+                                    sampleZipFile.getAbsolutePath(),
+                                    extractDirectory.getAbsolutePath())
+                            .start();
+                    String output = IOUtils.toString(process.getInputStream(), StandardCharsets.UTF_8);
+                    int retval = process.waitFor();
+                    log.info("Process retval {} output {}", retval, output);
+                    if (retval == 101) {
+                        extractCounter.incrementAndGet();
+                    } else if (retval != 100) {
+                        exceptionCounter.incrementAndGet();
+                    }
+                } catch (Exception e) {
+                    log.error("Unpacking in a separate process failed", e);
+                    exceptionCounter.incrementAndGet();
+                } finally {
+                    countDownLatch.countDown();
+                }
+            }).start();
+        }
+        assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
+        assertEquals(exceptionCounter.get(), 0);
+        assertEquals(extractCounter.get(), 1);
+    }
+
+    File findJavaExe() {
+        File javaHome = new File(System.getProperty("java.home"));
+        File javaExe = new File(javaHome, "bin/java" + (SystemUtils.IS_OS_WINDOWS ? ".exe" : ""));
+        return javaExe;
+    }
+}
\ No newline at end of file

[pulsar] 01/10: fix flaky test testUpdateDynamicLocalConfiguration (#11115)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 91a6e2ef6638a38947168220ecf723ad4ed55866
Author: hangc0276 <ch...@apache.org>
AuthorDate: Thu Jul 22 09:25:59 2021 +0800

    fix flaky test testUpdateDynamicLocalConfiguration (#11115)
    
    When running test, it fails sporadically.
    
    [example failure](https://github.com/apache/pulsar/pull/11114/checks?check_run_id=2921358630)
    ```
    Error:  Tests run: 73, Failures: 1, Errors: 0, Skipped: 3, Time elapsed: 922.345 s <<< FAILURE! - in org.apache.pulsar.broker.admin.AdminApiTest
    Error:  testUpdateDynamicLocalConfiguration(org.apache.pulsar.broker.admin.AdminApiTest)  Time elapsed: 10.05 s  <<< FAILURE!
    org.awaitility.core.ConditionTimeoutException: Condition with lambda expression in org.apache.pulsar.broker.admin.AdminApiTest was not fulfilled within 10 seconds.
    	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:165)
    	at org.awaitility.core.CallableCondition.await(CallableCondition.java:78)
    	at org.awaitility.core.CallableCondition.await(CallableCondition.java:26)
    	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:895)
    	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:864)
    	at org.apache.pulsar.broker.admin.AdminApiTest.testUpdateDynamicLocalConfiguration(AdminApiTest.java:626)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
    	at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:45)
    	at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:73)
    	at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
    	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    ```
    
    1. simplify check logic and increase wait timeout to 30s.
    
    (cherry picked from commit 9a66cbf795d038ce96118f7400dceaa20bc7ec6d)
---
 .../java/org/apache/pulsar/broker/admin/AdminApiTest.java   | 13 ++++---------
 1 file changed, 4 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 74b2925..fb8f799 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -615,7 +615,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
      *
      * @throws Exception
      */
-    @Test
+    @Test(timeOut = 30000)
     public void testUpdateDynamicLocalConfiguration() throws Exception {
         // (1) try to update dynamic field
         final long initValue = 30000;
@@ -623,15 +623,10 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         pulsar.getConfiguration().setBrokerShutdownTimeoutMs(initValue);
         // update configuration
         admin.brokers().updateDynamicConfiguration("brokerShutdownTimeoutMs", Long.toString(shutdownTime));
-        // sleep incrementally as zk-watch notification is async and may take some time
-        for (int i = 0; i < 5; i++) {
-            if (pulsar.getConfiguration().getBrokerShutdownTimeoutMs() == initValue) {
-                Thread.sleep(50 + (i * 10));
-            }
-        }
-
         // verify value is updated
-        assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime);
+        Awaitility.waitAtMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime);
+        });
     }
 
     @Test

[pulsar] 03/10: [Tests] Reduce integration test memory usage in CI (#11414)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fddac0d7a50fe52590d9c2bce8fcb026faf07a5e
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Jul 22 03:37:10 2021 +0300

    [Tests] Reduce integration test memory usage in CI (#11414)
    
    ### Motivation
    
    There are several integration test jobs failing where the docker container run by Testcontainers gets terminated with error code 137 (maps to out of memory error).
    
    The failing jobs are:
    CI - Integration - Sql - https://github.com/apache/pulsar/actions/workflows/ci-integration-sql.yaml (most fail)
    CI - Integration - Process - https://github.com/apache/pulsar/actions/workflows/ci-integration-process.yaml (some succeed)
    CI - Integration - Messaging - https://github.com/apache/pulsar/actions/workflows/ci-integration-messaging.yaml (some succeed)
    CI - Integration - Function & IO - https://github.com/apache/pulsar/actions/workflows/ci-integration-function.yaml (some succeed)
    
    This started happening yesterday for most PR builds.
    
    For example:
    https://github.com/apache/pulsar/runs/3111868662?check_suite_focus=true#step:14:1024
    
    ```
    Error:  Tests run: 22, Failures: 1, Errors: 0, Skipped: 21, Time elapsed: 292.035 s <<< FAILURE! - in TestSuite
    Error:  testPythonWordCountFunction(org.apache.pulsar.tests.integration.functions.PulsarStateTest)  Time elapsed: 43.416 s  <<< FAILURE!
    org.apache.pulsar.tests.integration.docker.ContainerExecException: /pulsar/bin/pulsar-admin functions querystate --tenant public --namespace default --name test-wordcount-py-fn-tfhycxsf --key message-1 failed on 705ecb067214d1cc42cd16358df6fa6d7a8cacc6c5ddd0cdde84a73b3e2e1f76 with error code 137
    at org.apache.pulsar.tests.integration.utils.DockerUtils$2.onComplete(DockerUtils.java:248)
    at org.testcontainers.shaded.com.github.dockerjava.core.exec.AbstrAsyncDockerCmdExec$1.onComplete(AbstrAsyncDockerCmdExec.java:51)
    at org.testcontainers.shaded.com.github.dockerjava.core.DefaultInvocationBuilder.lambda$executeAndStream$1(DefaultInvocationBuilder.java:276)
    at java.base/java.lang.Thread.run(Thread.java:829)
    ```
    
    It seems that GitHub Actions Runner VM has increased memory consumption after[ the most recent updates](https://github.com/actions/virtual-environments/blob/releases/ubuntu20/20210718/images/linux/Ubuntu2004-README.md).
    
    ### Modifications
    
    - Reduce Presto maximum heap size in integration tests
    - Reduce standalone container memory usage
    - Stop some system services to save RAM
    - Redcue memory usage of the integration tests surefire JVM
    
    (cherry picked from commit af213c23954f465d289e7cc0bf8e6da010f05a46)
---
 .github/actions/tune-runner-vm/action.yml          | 25 +++++++++++++++++-
 .github/workflows/ci-cpp.yaml                      |  4 +--
 .../ci-integration-backwards-compatibility.yaml    |  5 ++--
 .github/workflows/ci-integration-cli.yaml          |  3 +--
 .github/workflows/ci-integration-function.yaml     |  5 ++--
 .github/workflows/ci-integration-messaging.yaml    |  5 ++--
 .github/workflows/ci-integration-process.yaml      | 10 +++++---
 .github/workflows/ci-integration-schema.yaml       |  5 ++--
 .github/workflows/ci-integration-sql.yaml          | 13 +++++++---
 .github/workflows/ci-integration-standalone.yaml   |  5 ++--
 .github/workflows/ci-integration-thread.yaml       |  5 ++--
 .../ci-integration-tiered-filesystem.yaml          |  5 ++--
 .../workflows/ci-integration-tiered-jcloud.yaml    |  5 ++--
 .github/workflows/ci-integration-transaction.yaml  |  5 ++--
 .github/workflows/ci-pulsar-website-build.yaml     |  5 ++--
 .github/workflows/ci-shade-test.yaml               |  3 +--
 .../docker-images/latest-version-image/Dockerfile  |  2 ++
 .../latest-version-image/conf/presto/jvm.config    | 30 ++++++++++++++++++++++
 tests/integration/pom.xml                          |  2 +-
 .../integration/containers/ChaosContainer.java     |  6 +++++
 .../integration/containers/PulsarContainer.java    |  1 +
 .../containers/StandaloneContainer.java            |  1 +
 22 files changed, 104 insertions(+), 46 deletions(-)

diff --git a/.github/actions/tune-runner-vm/action.yml b/.github/actions/tune-runner-vm/action.yml
index ec5599a..30cf183 100644
--- a/.github/actions/tune-runner-vm/action.yml
+++ b/.github/actions/tune-runner-vm/action.yml
@@ -32,6 +32,10 @@ runs:
             # consumption is high.
             # Set vm.swappiness=1 to avoid swapping and allow high RAM usage
             echo 1 | sudo tee /proc/sys/vm/swappiness
+            # Set swappiness to 1 for all cgroups and sub-groups
+            for swappiness_file in /sys/fs/cgroup/memory/*/memory.swappiness /sys/fs/cgroup/memory/*/*/memory.swappiness; do
+              echo 1 | sudo tee $swappiness_file > /dev/null
+            done
 
             # use "madvise" Linux Transparent HugePages (THP) setting
             # https://www.kernel.org/doc/html/latest/admin-guide/mm/transhuge.html
@@ -53,5 +57,24 @@ runs:
             sudo systemctl stop fstrim.timer || true
             sudo systemctl disable fstrim.service || true
             sudo systemctl stop fstrim.service || true
+
+            # stop php-fpm
+            sudo systemctl stop php8.0-fpm.service || true
+            sudo systemctl stop php7.4-fpm.service || true
+            # stop mono-xsp4
+            sudo systemctl disable mono-xsp4.service || true
+            sudo systemctl stop mono-xsp4.service || true
+            sudo killall mono || true
+
+            # stop Azure Linux agent to save RAM
+            sudo systemctl stop walinuxagent.service || true
+
+            # show memory
+            free -m
+            # show disk
+            df -h
+            # show cggroup
+            echo "/actions_job cgroup settings:"
+            sudo cgget actions_job
         fi
-      shell: bash
\ No newline at end of file
+      shell: bash
diff --git a/.github/workflows/ci-cpp.yaml b/.github/workflows/ci-cpp.yaml
index 43da692..94d1210 100644
--- a/.github/workflows/ci-cpp.yaml
+++ b/.github/workflows/ci-cpp.yaml
@@ -74,8 +74,6 @@ jobs:
       - name: clean disk
         if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
         run: |
-          sudo swapoff -a
-          sudo rm -f /swapfile
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
@@ -105,4 +103,4 @@ jobs:
         continue-on-error: true
         with:
           name: test-logs
-          path: test-logs
\ No newline at end of file
+          path: test-logs
diff --git a/.github/workflows/ci-integration-backwards-compatibility.yaml b/.github/workflows/ci-integration-backwards-compatibility.yaml
index 707d88e..ea9dc0d 100644
--- a/.github/workflows/ci-integration-backwards-compatibility.yaml
+++ b/.github/workflows/ci-integration-backwards-compatibility.yaml
@@ -79,8 +79,7 @@ jobs:
       - name: clean disk
         if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
         run: |
-          sudo swapoff -a
-          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
@@ -119,4 +118,4 @@ jobs:
         continue-on-error: true
         with:
           name: surefire-reports
-          path: tests/integration/target/surefire-reports
\ No newline at end of file
+          path: tests/integration/target/surefire-reports
diff --git a/.github/workflows/ci-integration-cli.yaml b/.github/workflows/ci-integration-cli.yaml
index 8f198a1..47af7d1 100644
--- a/.github/workflows/ci-integration-cli.yaml
+++ b/.github/workflows/ci-integration-cli.yaml
@@ -79,8 +79,7 @@ jobs:
       - name: clean disk
         if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
         run: |
-          sudo swapoff -a
-          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
diff --git a/.github/workflows/ci-integration-function.yaml b/.github/workflows/ci-integration-function.yaml
index b1af22d..c4f9e07 100644
--- a/.github/workflows/ci-integration-function.yaml
+++ b/.github/workflows/ci-integration-function.yaml
@@ -79,8 +79,7 @@ jobs:
       - name: clean disk
         if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
         run: |
-          sudo swapoff -a
-          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
@@ -119,4 +118,4 @@ jobs:
         continue-on-error: true
         with:
           name: surefire-reports
-          path: tests/integration/target/surefire-reports
\ No newline at end of file
+          path: tests/integration/target/surefire-reports
diff --git a/.github/workflows/ci-integration-messaging.yaml b/.github/workflows/ci-integration-messaging.yaml
index 2cea626..00e615b 100644
--- a/.github/workflows/ci-integration-messaging.yaml
+++ b/.github/workflows/ci-integration-messaging.yaml
@@ -79,8 +79,7 @@ jobs:
       - name: clean disk
         if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
         run: |
-          sudo swapoff -a
-          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
@@ -119,4 +118,4 @@ jobs:
         continue-on-error: true
         with:
           name: surefire-reports
-          path: tests/integration/target/surefire-reports
\ No newline at end of file
+          path: tests/integration/target/surefire-reports
diff --git a/.github/workflows/ci-integration-process.yaml b/.github/workflows/ci-integration-process.yaml
index e88f1b6..404ce7a 100644
--- a/.github/workflows/ci-integration-process.yaml
+++ b/.github/workflows/ci-integration-process.yaml
@@ -78,8 +78,7 @@ jobs:
       - name: clean disk
         if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
         run: |
-          sudo swapoff -a
-          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
@@ -104,6 +103,11 @@ jobs:
         if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
         run: ./build/run_integration_group.sh PULSAR_CONNECTORS_PROCESS
 
+      - name: Log dmesg when failed
+        if: ${{ failure() }}
+        continue-on-error: true
+        run: sudo dmesg
+
       - name: Upload container logs
         uses: actions/upload-artifact@v2
         if: ${{ cancelled() || failure() }}
@@ -118,4 +122,4 @@ jobs:
         continue-on-error: true
         with:
           name: surefire-reports
-          path: tests/integration/target/surefire-reports
\ No newline at end of file
+          path: tests/integration/target/surefire-reports
diff --git a/.github/workflows/ci-integration-schema.yaml b/.github/workflows/ci-integration-schema.yaml
index d8c8859..571d22a 100644
--- a/.github/workflows/ci-integration-schema.yaml
+++ b/.github/workflows/ci-integration-schema.yaml
@@ -78,8 +78,7 @@ jobs:
       - name: clean disk
         if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
         run: |
-          sudo swapoff -a
-          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
@@ -115,4 +114,4 @@ jobs:
         continue-on-error: true
         with:
           name: surefire-reports
-          path: tests/integration/target/surefire-reports
\ No newline at end of file
+          path: tests/integration/target/surefire-reports
diff --git a/.github/workflows/ci-integration-sql.yaml b/.github/workflows/ci-integration-sql.yaml
index c3a324e..74951e3 100644
--- a/.github/workflows/ci-integration-sql.yaml
+++ b/.github/workflows/ci-integration-sql.yaml
@@ -27,7 +27,8 @@ on:
       - branch-*
 
 env:
-  MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
+  MAVEN_OPTS: -Xmx768m -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryHandler.class=standard -Dmaven.wagon.http.retryHandler.count=3
+  MALLOC_ARENA_MAX: "1"
 
 jobs:
 
@@ -78,8 +79,7 @@ jobs:
       - name: clean disk
         if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
         run: |
-          sudo swapoff -a
-          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
@@ -104,6 +104,11 @@ jobs:
         if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
         run: ./build/run_integration_group.sh SQL
 
+      - name: Log dmesg when failed
+        if: ${{ failure() }}
+        continue-on-error: true
+        run: sudo dmesg
+
       - name: Upload container logs
         uses: actions/upload-artifact@v2
         if: ${{ cancelled() || failure() }}
@@ -118,4 +123,4 @@ jobs:
         continue-on-error: true
         with:
           name: surefire-reports
-          path: tests/integration/target/surefire-reports
\ No newline at end of file
+          path: tests/integration/target/surefire-reports
diff --git a/.github/workflows/ci-integration-standalone.yaml b/.github/workflows/ci-integration-standalone.yaml
index 6266746..19b8bfd 100644
--- a/.github/workflows/ci-integration-standalone.yaml
+++ b/.github/workflows/ci-integration-standalone.yaml
@@ -78,8 +78,7 @@ jobs:
       - name: clean disk
         if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
         run: |
-          sudo swapoff -a
-          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
@@ -118,4 +117,4 @@ jobs:
         continue-on-error: true
         with:
           name: surefire-reports
-          path: tests/integration/target/surefire-reports
\ No newline at end of file
+          path: tests/integration/target/surefire-reports
diff --git a/.github/workflows/ci-integration-thread.yaml b/.github/workflows/ci-integration-thread.yaml
index 24099fb..cd22c84 100644
--- a/.github/workflows/ci-integration-thread.yaml
+++ b/.github/workflows/ci-integration-thread.yaml
@@ -78,8 +78,7 @@ jobs:
       - name: clean disk
         if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
         run: |
-          sudo swapoff -a
-          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
@@ -118,4 +117,4 @@ jobs:
         continue-on-error: true
         with:
           name: surefire-reports
-          path: tests/integration/target/surefire-reports
\ No newline at end of file
+          path: tests/integration/target/surefire-reports
diff --git a/.github/workflows/ci-integration-tiered-filesystem.yaml b/.github/workflows/ci-integration-tiered-filesystem.yaml
index b1c6ba9..3ae4fca 100644
--- a/.github/workflows/ci-integration-tiered-filesystem.yaml
+++ b/.github/workflows/ci-integration-tiered-filesystem.yaml
@@ -78,8 +78,7 @@ jobs:
       - name: clean disk
         if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
         run: |
-          sudo swapoff -a
-          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
@@ -118,4 +117,4 @@ jobs:
         continue-on-error: true
         with:
           name: surefire-reports
-          path: tests/integration/target/surefire-reports
\ No newline at end of file
+          path: tests/integration/target/surefire-reports
diff --git a/.github/workflows/ci-integration-tiered-jcloud.yaml b/.github/workflows/ci-integration-tiered-jcloud.yaml
index 5ad078f..2f3dc3e 100644
--- a/.github/workflows/ci-integration-tiered-jcloud.yaml
+++ b/.github/workflows/ci-integration-tiered-jcloud.yaml
@@ -78,8 +78,7 @@ jobs:
       - name: clean disk
         if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
         run: |
-          sudo swapoff -a
-          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
@@ -118,4 +117,4 @@ jobs:
         continue-on-error: true
         with:
           name: surefire-reports
-          path: tests/integration/target/surefire-reports
\ No newline at end of file
+          path: tests/integration/target/surefire-reports
diff --git a/.github/workflows/ci-integration-transaction.yaml b/.github/workflows/ci-integration-transaction.yaml
index 30929b4..b53814e 100644
--- a/.github/workflows/ci-integration-transaction.yaml
+++ b/.github/workflows/ci-integration-transaction.yaml
@@ -78,8 +78,7 @@ jobs:
       - name: clean disk
         if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
         run: |
-          sudo swapoff -a
-          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
@@ -115,4 +114,4 @@ jobs:
         continue-on-error: true
         with:
           name: surefire-reports
-          path: tests/integration/target/surefire-reports
\ No newline at end of file
+          path: tests/integration/target/surefire-reports
diff --git a/.github/workflows/ci-pulsar-website-build.yaml b/.github/workflows/ci-pulsar-website-build.yaml
index f9a1f28..a6b196e 100644
--- a/.github/workflows/ci-pulsar-website-build.yaml
+++ b/.github/workflows/ci-pulsar-website-build.yaml
@@ -61,8 +61,7 @@ jobs:
 
       - name: clean disk
         run: |
-          sudo swapoff -a
-          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
@@ -87,4 +86,4 @@ jobs:
           # Build the new website
           site2/tools/docker-build-site.sh
           # Script was initially made for travis
-          bash -e site2/tools/publish-website.sh
\ No newline at end of file
+          bash -e site2/tools/publish-website.sh
diff --git a/.github/workflows/ci-shade-test.yaml b/.github/workflows/ci-shade-test.yaml
index dd70074..7548ca1 100644
--- a/.github/workflows/ci-shade-test.yaml
+++ b/.github/workflows/ci-shade-test.yaml
@@ -79,8 +79,7 @@ jobs:
       - name: clean disk
         if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
         run: |
-          sudo swapoff -a
-          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
diff --git a/tests/docker-images/latest-version-image/Dockerfile b/tests/docker-images/latest-version-image/Dockerfile
index 242ae47..f0022cb 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -77,6 +77,8 @@ COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \
      scripts/run-standalone.sh \
      /pulsar/bin/
 
+COPY conf/presto/jvm.config /pulsar/conf/presto/
+
 # copy python test examples
 RUN mkdir -p /pulsar/instances/deps
 COPY python-examples/exclamation_lib.py /pulsar/instances/deps/
diff --git a/tests/docker-images/latest-version-image/conf/presto/jvm.config b/tests/docker-images/latest-version-image/conf/presto/jvm.config
new file mode 100644
index 0000000..28db36a
--- /dev/null
+++ b/tests/docker-images/latest-version-image/conf/presto/jvm.config
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+-server
+-Xms128M
+-Xmx1500M
+-XX:+UseG1GC
+-XX:G1HeapRegionSize=32M
+-XX:+UseGCOverheadLimit
+-XX:+ExplicitGCInvokesConcurrent
+-XX:+HeapDumpOnOutOfMemoryError
+-XX:+ExitOnOutOfMemoryError
+-Dpresto-temporarily-allow-java8=true
+-Djdk.attach.allowAttachSelf=true
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index ec52ece..458cc2a 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -225,7 +225,7 @@
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-surefire-plugin</artifactId>
             <configuration>
-              <argLine>-Xmx2G -XX:MaxDirectMemorySize=8G
+              <argLine>-Xmx1G -XX:MaxDirectMemorySize=1G
               -Dio.netty.leakDetectionLevel=advanced
               </argLine>
               <skipTests>false</skipTests>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
index 6064d5b..3896777 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
@@ -46,6 +46,12 @@ public class ChaosContainer<SelfT extends ChaosContainer<SelfT>> extends Generic
         this.clusterName = clusterName;
     }
 
+    @Override
+    protected void configure() {
+        super.configure();
+        addEnv("MALLOC_ARENA_MAX", "1");
+    }
+
     protected void beforeStop() {
         if (null == getContainerId()) {
             return;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
index 74963e3..46f50fc 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
@@ -147,6 +147,7 @@ public abstract class PulsarContainer<SelfT extends PulsarContainer<SelfT>> exte
 
     @Override
     protected void configure() {
+        super.configure();
         if (httpPort > 0) {
             addExposedPorts(httpPort);
         }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/StandaloneContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/StandaloneContainer.java
index cc47d44..819e328 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/StandaloneContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/StandaloneContainer.java
@@ -54,6 +54,7 @@ public class StandaloneContainer extends PulsarContainer<StandaloneContainer> {
     protected void configure() {
         super.configure();
         setCommand("standalone");
+        addEnv("PULSAR_MEM", "-Xms128M -Xmx1g -XX:MaxDirectMemorySize=1g");
     }
 
     @Override

[pulsar] 04/10: Disable replicate system topic across clusters. (#11376)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3b40ace4a6f564d345c593f8d7a5ad9629613130
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jul 22 15:35:19 2021 +0800

    Disable replicate system topic across clusters. (#11376)
    
    ### Motivation
    
    Currently, we are not able to use the topic policy for a geo-replicated schema.
    The root cause is we do not support replicate schema to the remote cluster but the
    topic policy using the AVRO schema.
    
    For the schema replication, we already have PIP 88 in 2.9.0
    For the topic policy replication, we are working on a proposal, for now, will be available in 2.9.0
    
    But for 2.7.x and 2.8.x, we need this fix to make sure the topic policy can work on a geo-replication cluster.
    
    ### Modifications
    
    Disabled the replication for the system topic.
    
    ### Verifying this change
    
    Added new tests for making sure the topic policy can work on a replicated cluster.
    
    (cherry picked from commit a6e1a3290d9eb42f4d03438c3ce9ced506ee8445)
---
 .../broker/service/persistent/SystemTopic.java     |  5 ++
 .../pulsar/broker/service/ReplicatorTest.java      | 55 +++++++++++++++++++---
 .../pulsar/broker/service/ReplicatorTestBase.java  |  2 +
 3 files changed, 56 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
index 788aa94..231f4e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
@@ -59,6 +59,11 @@ public class SystemTopic extends PersistentTopic {
         // do nothing for system topic
     }
 
+    @Override
+    public CompletableFuture<Void> checkReplication() {
+        return CompletableFuture.completedFuture(null);
+    }
+
     public CompletableFuture<Void> preCreateSubForCompactionIfNeeded() {
         if (!super.hasCompactionTriggered()) {
             // To pre-create the subscription for the compactor to avoid lost any data since we are using reader
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 1d85cb4..1e1dca3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -50,6 +50,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import lombok.Cleanup;
 
@@ -78,11 +79,14 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.events.EventsTopicNames;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -834,19 +838,24 @@ public class ReplicatorTest extends ReplicatorTestBase {
         // Update partitioned topic from R2
         admin2.topics().updatePartitionedTopic(persistentTopicName, 5);
         assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 5);
-        assertEquals(admin2.topics().getList(namespace).size(), 5);
+        assertEquals((int) admin2.topics().getList(namespace).stream().filter(topic ->
+                !topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 5);
         // Update partitioned topic from R3
         admin3.topics().updatePartitionedTopic(persistentTopicName, 6);
         assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 6);
-        assertEquals(admin3.topics().getList(namespace).size(), 6);
+        assertEquals(admin3.topics().getList(namespace).stream().filter(topic ->
+                !topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 6);
         // Update partitioned topic from R1
         admin1.topics().updatePartitionedTopic(persistentTopicName, 7);
         assertEquals(admin1.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 7);
         assertEquals(admin2.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 7);
         assertEquals(admin3.topics().getPartitionedTopicMetadata(persistentTopicName).partitions, 7);
-        assertEquals(admin1.topics().getList(namespace).size(), 7);
-        assertEquals(admin2.topics().getList(namespace).size(), 7);
-        assertEquals(admin3.topics().getList(namespace).size(), 7);
+        assertEquals(admin1.topics().getList(namespace).stream().filter(topic ->
+                !topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7);
+        assertEquals(admin2.topics().getList(namespace).stream().filter(topic ->
+                !topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7);
+        assertEquals(admin3.topics().getList(namespace).stream().filter(topic ->
+                !topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)).count(), 7);
     }
 
     /**
@@ -1218,12 +1227,46 @@ public class ReplicatorTest extends ReplicatorTestBase {
                 pulsar1.getBrokerService().getReplicationClients().get(cluster4)));
     }
 
+    @Test
+    public void testDoNotReplicateSystemTopic() throws Exception {
+        final String namespace = newUniqueName("pulsar/ns");
+        admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1", "r2", "r3"));
+        String topic = TopicName.get("persistent", NamespaceName.get(namespace),
+                "testDoesNotReplicateSystemTopic").toString();
+        String systemTopic = TopicName.get("persistent", NamespaceName.get(namespace),
+                EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME).toString();
+        admin1.topics().createNonPartitionedTopic(topic);
+        Awaitility.await()
+                .until(() -> pulsar1.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+        Awaitility.await()
+                .until(() -> pulsar2.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+        Awaitility.await()
+                .until(() -> pulsar3.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+        admin1.topics().setRetention(topic, new RetentionPolicies(10, 10));
+        admin2.topics().setRetention(topic, new RetentionPolicies(20, 20));
+        admin3.topics().setRetention(topic, new RetentionPolicies(30, 30));
+
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(admin1.topics().getStats(systemTopic).getReplication().size(), 0);
+            Assert.assertEquals(admin2.topics().getStats(systemTopic).getReplication().size(), 0);
+            Assert.assertEquals(admin3.topics().getStats(systemTopic).getReplication().size(), 0);
+        });
+
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(admin1.topics().getRetention(topic).getRetentionSizeInMB(), 10);
+            Assert.assertEquals(admin2.topics().getRetention(topic).getRetentionSizeInMB(), 20);
+            Assert.assertEquals(admin3.topics().getRetention(topic).getRetentionSizeInMB(), 30);
+        });
+    }
+
     private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace, List<String> expectedTopicList) {
         // wait non-partitioned topics replicators created finished
         final List<String> list = new ArrayList<>();
         Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> {
             list.clear();
-            list.addAll(admin.topics().getList(namespace));
+            list.addAll(admin.topics().getList(namespace).stream()
+                    .filter(topic -> !topic.contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME))
+                    .collect(Collectors.toList()));
             return list.size() == expectedTopicList.size();
         });
         for (String expectTopic : expectedTopicList) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 810d8ed..5a6e111 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -236,6 +236,8 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
         config.setAllowAutoTopicCreationType("non-partitioned");
         config.setEnableReplicatedSubscriptions(true);
         config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
+        config.setSystemTopicEnabled(true);
+        config.setTopicLevelPoliciesEnabled(true);
     }
 
     public void resetConfig1() {

[pulsar] 10/10: [Broker] Add multi roles support for authorization (#11341)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fe6158b19363adec727ff9917d47a7bff33625e9
Author: Zike Yang <ar...@armail.top>
AuthorDate: Fri Jul 23 12:19:52 2021 +0800

    [Broker] Add multi roles support for authorization (#11341)
    
    ### Motivation
    
    In https://github.com/apache/pulsar/pull/10375, we add multi roles support for JWT authentication. But the authorization does not support multi roles currently. Only the first one in the roles array will be used during authorization. This PR adds multi roles support for authorization.
    
    ### Modifications
    
    * Add MultiRolesTokenAuthorizationProvider. It will check the permissions of all the roles in the roles array, and when one of the roles has permissions, it means that the current operation has permissions.
    
    (cherry picked from commit 62b5dfb2fe52b992d2c76dd0952a5850f81238f0)
---
 .../MultiRolesTokenAuthorizationProvider.java      | 253 +++++++++++++++++++++
 .../MultiRolesTokenAuthorizationProviderTest.java  |  71 ++++++
 .../org/apache/pulsar/common/util/FutureUtil.java  |  10 +
 3 files changed, 334 insertions(+)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
new file mode 100644
index 0000000..23fb7bc
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authorization;
+
+import io.jsonwebtoken.Claims;
+import io.jsonwebtoken.Jwt;
+import io.jsonwebtoken.JwtParser;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.RequiredTypeException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+public class MultiRolesTokenAuthorizationProvider extends PulsarAuthorizationProvider {
+    private static final Logger log = LoggerFactory.getLogger(MultiRolesTokenAuthorizationProvider.class);
+
+    static final String HTTP_HEADER_NAME = "Authorization";
+    static final String HTTP_HEADER_VALUE_PREFIX = "Bearer ";
+
+    // When symmetric key is configured
+    static final String CONF_TOKEN_SETTING_PREFIX = "tokenSettingPrefix";
+
+    // The token's claim that corresponds to the "role" string
+    static final String CONF_TOKEN_AUTH_CLAIM = "tokenAuthClaim";
+
+    private JwtParser parser;
+    private String roleClaim;
+
+    public MultiRolesTokenAuthorizationProvider() {
+        this.roleClaim = Claims.SUBJECT;
+        this.parser = Jwts.parserBuilder().build();
+    }
+
+    @Override
+    public void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException {
+        String prefix = (String) conf.getProperty(CONF_TOKEN_SETTING_PREFIX);
+        if (null == prefix) {
+            prefix = "";
+        }
+        String confTokenAuthClaimSettingName = prefix + CONF_TOKEN_AUTH_CLAIM;
+        Object tokenAuthClaim = conf.getProperty(confTokenAuthClaimSettingName);
+        if (tokenAuthClaim != null && StringUtils.isNotBlank((String) tokenAuthClaim)) {
+            this.roleClaim = (String) tokenAuthClaim;
+        }
+
+        super.initialize(conf, configCache);
+    }
+
+    private List<String> getRoles(AuthenticationDataSource authData) {
+        String token = null;
+
+        if (authData.hasDataFromCommand()) {
+            // Authenticate Pulsar binary connection
+            token = authData.getCommandData();
+            if (StringUtils.isBlank(token)) {
+                return Collections.emptyList();
+            }
+        } else if (authData.hasDataFromHttp()) {
+            // The format here should be compliant to RFC-6750
+            // (https://tools.ietf.org/html/rfc6750#section-2.1). Eg: Authorization: Bearer xxxxxxxxxxxxx
+            String httpHeaderValue = authData.getHttpHeader(HTTP_HEADER_NAME);
+            if (httpHeaderValue == null || !httpHeaderValue.startsWith(HTTP_HEADER_VALUE_PREFIX)) {
+                return Collections.emptyList();
+            }
+
+            // Remove prefix
+            token = httpHeaderValue.substring(HTTP_HEADER_VALUE_PREFIX.length());
+        }
+
+        if (token == null)
+            return Collections.emptyList();
+
+        String[] splitToken = token.split("\\.");
+        String unsignedToken = splitToken[0] + "." + splitToken[1] + ".";
+
+        Jwt<?, Claims> jwt = parser.parseClaimsJwt(unsignedToken);
+        try {
+            Collections.singletonList(jwt.getBody().get(roleClaim, String.class));
+        } catch (RequiredTypeException requiredTypeException) {
+            try {
+                List list = jwt.getBody().get(roleClaim, List.class);
+                if (list != null) {
+                    return list;
+                }
+            } catch (RequiredTypeException requiredTypeException1) {
+                return Collections.emptyList();
+            }
+        }
+
+        return Collections.emptyList();
+    }
+
+    public CompletableFuture<Boolean> authorize(AuthenticationDataSource authenticationData, Function<String, CompletableFuture<Boolean>> authorizeFunc) {
+        List<String> roles = getRoles(authenticationData);
+        List<CompletableFuture<Boolean>> futures = new ArrayList<>(roles.size());
+        roles.forEach(r -> futures.add(authorizeFunc.apply(r)));
+        return CompletableFuture.supplyAsync(() -> {
+            do {
+                try {
+                    List<CompletableFuture<Boolean>> doneFutures = new ArrayList<>();
+                    FutureUtil.waitForAny(futures).get();
+                    for (CompletableFuture<Boolean> future : futures) {
+                        if (!future.isDone()) continue;
+                        doneFutures.add(future);
+                        if (future.get()) {
+                            futures.forEach(f -> {
+                                if (!f.isDone()) f.cancel(false);
+                            });
+                            return true;
+                        }
+                    }
+                    futures.removeAll(doneFutures);
+                } catch (InterruptedException | ExecutionException ignored) {
+                }
+            } while (!futures.isEmpty());
+            return false;
+        });
+    }
+
+    /**
+     * Check if the specified role has permission to send messages to the specified fully qualified topic name.
+     *
+     * @param topicName the fully qualified topic name associated with the topic.
+     * @param role      the app id used to send messages to the topic.
+     */
+    @Override
+    public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
+                                                      AuthenticationDataSource authenticationData) {
+        return authorize(authenticationData, r -> super.canProduceAsync(topicName, r, authenticationData));
+    }
+
+    /**
+     * Check if the specified role has permission to receive messages from the specified fully qualified topic
+     * name.
+     *
+     * @param topicName    the fully qualified topic name associated with the topic.
+     * @param role         the app id used to receive messages from the topic.
+     * @param subscription the subscription name defined by the client
+     */
+    @Override
+    public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
+                                                      AuthenticationDataSource authenticationData, String subscription) {
+        return authorize(authenticationData, r -> super.canConsumeAsync(topicName, r, authenticationData, subscription));
+    }
+
+    /**
+     * Check whether the specified role can perform a lookup for the specified topic.
+     * <p>
+     * For that the caller needs to have producer or consumer permission.
+     *
+     * @param topicName
+     * @param role
+     * @return
+     * @throws Exception
+     */
+    @Override
+    public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
+                                                     AuthenticationDataSource authenticationData) {
+        return authorize(authenticationData, r -> super.canLookupAsync(topicName, r, authenticationData));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+        return authorize(authenticationData, r -> super.allowFunctionOpsAsync(namespaceName, r, authenticationData));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+        return authorize(authenticationData, r -> super.allowSourceOpsAsync(namespaceName, r, authenticationData));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+        return authorize(authenticationData, r -> super.allowSinkOpsAsync(namespaceName, r, authenticationData));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName,
+                                                                String role,
+                                                                TenantOperation operation,
+                                                                AuthenticationDataSource authData) {
+        return authorize(authData, r -> super.allowTenantOperationAsync(tenantName, r, operation, authData));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName namespaceName,
+                                                                   String role,
+                                                                   NamespaceOperation operation,
+                                                                   AuthenticationDataSource authData) {
+        return authorize(authData, r -> super.allowNamespaceOperationAsync(namespaceName, r, operation, authData));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+                                                                         PolicyName policy,
+                                                                         PolicyOperation operation,
+                                                                         String role,
+                                                                         AuthenticationDataSource authData) {
+        return authorize(authData, r -> super.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, r, authData));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
+                                                               String role,
+                                                               TopicOperation operation,
+                                                               AuthenticationDataSource authData) {
+        return authorize(authData, r -> super.allowTopicOperationAsync(topicName, r, operation, authData));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName topicName,
+                                                                     String role,
+                                                                     PolicyName policyName,
+                                                                     PolicyOperation policyOperation,
+                                                                     AuthenticationDataSource authData) {
+        return authorize(authData, r -> super.allowTopicPolicyOperationAsync(topicName, r, policyName, policyOperation, authData));
+    }
+}
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProviderTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProviderTest.java
new file mode 100644
index 0000000..fdedf86
--- /dev/null
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProviderTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authorization;
+
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import javax.crypto.SecretKey;
+import java.util.concurrent.CompletableFuture;
+
+public class MultiRolesTokenAuthorizationProviderTest {
+
+    @Test
+    public void testMultiRolesAuthz() throws Exception {
+        SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+        String userA = "user-a";
+        String userB = "user-b";
+        String token = Jwts.builder().claim("sub", new String[]{userA, userB}).signWith(secretKey).compact();
+
+        MultiRolesTokenAuthorizationProvider provider = new MultiRolesTokenAuthorizationProvider();
+
+        AuthenticationDataSource ads = new AuthenticationDataSource() {
+            @Override
+            public boolean hasDataFromHttp() {
+                return true;
+            }
+
+            @Override
+            public String getHttpHeader(String name) {
+                if (name.equals("Authorization")) {
+                    return "Bearer " + token;
+                } else {
+                    throw new IllegalArgumentException("Wrong HTTP header");
+                }
+            }
+        };
+
+        Assert.assertTrue(provider.authorize(ads, role -> {
+            if (role.equals(userB)) return CompletableFuture.completedFuture(true); // only userB has permission
+            return CompletableFuture.completedFuture(false);
+        }).get());
+
+        Assert.assertTrue(provider.authorize(ads, role -> {
+            return CompletableFuture.completedFuture(true); // all users has permission
+        }).get());
+
+        Assert.assertFalse(provider.authorize(ads, role -> {
+            return CompletableFuture.completedFuture(false); // only users has no permission
+        }).get());
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 0c3a0c0..45fd7f7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -45,6 +45,16 @@ public class FutureUtil {
         return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
     }
 
+    /**
+     * Return a future that represents the completion of any future in the provided list.
+     *
+     * @param futures futures to wait any
+     * @return a new CompletableFuture that is completed when any of the given CompletableFutures complete
+     */
+    public static CompletableFuture<Object> waitForAny(List<? extends CompletableFuture<?>> futures) {
+        return CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]));
+    }
+
 
     /**
      * Return a future that represents the completion of the futures in the provided list.

[pulsar] 06/10: Invalidate the read handle after all cursors consumed. (#11389)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0577259730c3eb97e248836cab76c2f5d48e8e44
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jul 22 17:27:26 2021 +0800

    Invalidate the read handle after all cursors consumed. (#11389)
    
    ### Motivation
    
    Currently, the read ReadHandle only invalidates when removing the ledger from the ManagedLedger.
    If the ManagedLedger has many ledgers(the topic might retain infinite data), we will get oom
    on the direct memory since we are using 1MB read cache by default for the offloaded data ReadHandle.
    
    If all the cursors are consumed data, the ReadHandle can be closed safely. And if a cursor reset
    to an earlier position to consume the historical data, the ReadHandle will be reopened again.
    
    ### Verifying this change
    
    New tests were added to ensure the ReadHandle had been invalidated property.
    
    (cherry picked from commit 47deafcd4cf55b2395c0db2b8c031ec730f9dba4)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  4 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 76 ++++++++++++++++++++++
 2 files changed, 78 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 1491239..3dcef1a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -151,7 +151,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     protected Map<String, String> propertiesMap;
     protected final MetaStore store;
 
-    private final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache = new ConcurrentLongHashMap<>(
+    final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache = new ConcurrentLongHashMap<>(
             16 /* initial capacity */, 1 /* number of sections */);
     protected final NavigableMap<Long, LedgerInfo> ledgers = new ConcurrentSkipListMap<>();
     private volatile Stat ledgersStat;
@@ -2410,7 +2410,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, ls.getLedgerId());
                     }
-                    break;
+                    invalidateReadHandle(ls.getLedgerId());
                 }
             }
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 1b31b5a..274eb92 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3093,4 +3093,80 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
                 ", dataLength=" + dataLength +
                 '}';
     }
+
+    @Test
+    public void testInvalidateReadHandleWhenDeleteLedger() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(1);
+
+        // Verify the read handle should be invalidated after ledger been removed.
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("testInvalidateReadHandleWhenDeleteLedger", config);
+        ManagedCursor cursor = ledger.openCursor("test-cursor");
+        ManagedCursor cursor2 = ledger.openCursor("test-cursor2");
+        final int entries = 3;
+        for (int i = 0; i < entries; i++) {
+            ledger.addEntry(String.valueOf(i).getBytes(Encoding));
+        }
+        List<Entry> entryList = cursor.readEntries(3);
+        assertEquals(entryList.size(), 3);
+        assertEquals(ledger.ledgers.size(), 3);
+        assertEquals(ledger.ledgerCache.size(), 2);
+        cursor.clearBacklog();
+        cursor2.clearBacklog();
+        ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ledger.ledgers.size(), 1);
+            assertEquals(ledger.ledgerCache.size(), 0);
+        });
+
+        cursor.close();
+        cursor2.close();
+        ledger.close();
+    }
+
+    @Test
+    public void testInvalidateReadHandleWhenConsumed() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(1);
+        // Verify the read handle should be invalidated when all cursors consumed
+        // even if the ledger can not been removed due to the data retention
+        config.setRetentionSizeInMB(50);
+        config.setRetentionTime(1, TimeUnit.DAYS);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("testInvalidateReadHandleWhenConsumed", config);
+        ManagedCursor cursor = ledger.openCursor("test-cursor");
+        ManagedCursor cursor2 = ledger.openCursor("test-cursor2");
+        final int entries = 3;
+        for (int i = 0; i < entries; i++) {
+            ledger.addEntry(String.valueOf(i).getBytes(Encoding));
+        }
+        List<Entry> entryList = cursor.readEntries(3);
+        assertEquals(entryList.size(), 3);
+        assertEquals(ledger.ledgers.size(), 3);
+        assertEquals(ledger.ledgerCache.size(), 2);
+        cursor.clearBacklog();
+        cursor2.clearBacklog();
+        ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ledger.ledgers.size(), 3);
+            assertEquals(ledger.ledgerCache.size(), 0);
+        });
+
+        // Verify the ReadHandle can be reopened.
+        ManagedCursor cursor3 = ledger.openCursor("test-cursor3", InitialPosition.Earliest);
+        entryList = cursor3.readEntries(3);
+        assertEquals(entryList.size(), 3);
+        assertEquals(ledger.ledgerCache.size(), 2);
+        cursor3.clearBacklog();
+        ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ledger.ledgers.size(), 3);
+            assertEquals(ledger.ledgerCache.size(), 0);
+        });
+
+
+        cursor.close();
+        cursor2.close();
+        cursor3.close();
+        ledger.close();
+    }
 }

[pulsar] 07/10: [Python Client] Fix handle complex schema (#11400)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 29b903118df9210307cc85ad7860df157ce6952e
Author: ran <ga...@126.com>
AuthorDate: Thu Jul 22 22:06:52 2021 +0800

    [Python Client] Fix handle complex schema (#11400)
    
    Fixes #7785, #11221
    
    ### Motivation
    
    Currently, the Pulsar python client couldn't handle complex schema, users using complex schema will encounter errors.
    
    ### Modifications
    
    Fix AvroSchema encodes and decodes complex schema.
    Fix JsonSchema decodes complex schema.
    
    ### Verifying this change
    
    This change added tests and can be verified as follows:
    
      - *Encode and decode complex schema data*
      - *Produce and consume complex schema data*
    
    (cherry picked from commit 23706431c00371cb34d19d666ec02cf9c152ef40)
---
 .../python/pulsar/schema/definition.py             |  15 ++-
 .../python/pulsar/schema/schema_avro.py            |  11 +-
 pulsar-client-cpp/python/schema_test.py            | 115 ++++++++++++++++++++-
 3 files changed, 134 insertions(+), 7 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py
index d46cf3c..3b946b8 100644
--- a/pulsar-client-cpp/python/pulsar/schema/definition.py
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -17,9 +17,10 @@
 # under the License.
 #
 
-from abc import abstractmethod, ABCMeta
-from enum import Enum, EnumMeta
+import copy
+from abc import abstractmethod
 from collections import OrderedDict
+from enum import Enum, EnumMeta
 from six import with_metaclass
 
 
@@ -63,8 +64,14 @@ class Record(with_metaclass(RecordMeta, object)):
 
         for k, value in self._fields.items():
             if k in kwargs:
-                # Value was overridden at constructor
-                self.__setattr__(k, kwargs[k])
+                if isinstance(value, Record) and isinstance(kwargs[k], dict):
+                    # Use dict init Record object
+                    copied = copy.copy(value)
+                    copied.__init__(decode=True, **kwargs[k])
+                    self.__setattr__(k, copied)
+                else:
+                    # Value was overridden at constructor
+                    self.__setattr__(k, kwargs[k])
             elif isinstance(value, Record):
                 # Value is a subrecord
                 self.__setattr__(k, value)
diff --git a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
index 2afa9db..fc9e6a6 100644
--- a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
+++ b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
@@ -21,6 +21,7 @@ import _pulsar
 import io
 import enum
 
+from . import Record
 from .schema import Schema
 
 try:
@@ -39,16 +40,24 @@ if HAS_AVRO:
         def _get_serialized_value(self, x):
             if isinstance(x, enum.Enum):
                 return x.name
+            elif isinstance(x, Record):
+                return self.encode_dict(x.__dict__)
             else:
                 return x
 
         def encode(self, obj):
             self._validate_object_type(obj)
             buffer = io.BytesIO()
-            m = {k: self._get_serialized_value(v) for k, v in obj.__dict__.items()}
+            m = self.encode_dict(obj.__dict__)
             fastavro.schemaless_writer(buffer, self._schema, m)
             return buffer.getvalue()
 
+        def encode_dict(self, d: dict):
+            obj = {}
+            for k, v in d.items():
+                obj[k] = self._get_serialized_value(v)
+            return obj
+
         def decode(self, data):
             buffer = io.BytesIO(data)
             d = fastavro.schemaless_reader(buffer, self._schema)
diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py
index a0d60c0..8cf6ff4 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -363,7 +363,7 @@ class SchemaTest(TestCase):
 
         consumer = client.subscribe('my-avro-python-schema-version-topic', 'sub-1',
                                     schema=AvroSchema(Example))
-        
+
         r = Example(a=1, b=2)
         producer.send(r)
 
@@ -372,7 +372,7 @@ class SchemaTest(TestCase):
         self.assertIsNotNone(msg.schema_version())
 
         self.assertEquals(b'\x00\x00\x00\x00\x00\x00\x00\x00', msg.schema_version().encode())
-        
+
         self.assertEqual(r, msg.value())
 
         client.close()
@@ -853,5 +853,116 @@ class SchemaTest(TestCase):
         consumer.close()
         client.close()
 
+    def test_serialize_schema_complex(self):
+        class NestedObj1(Record):
+            na1 = String()
+            nb1 = Double()
+
+        class NestedObj2(Record):
+            na2 = Integer()
+            nb2 = Boolean()
+            nc2 = NestedObj1()
+
+        class ComplexRecord(Record):
+            a = Integer()
+            b = Integer()
+            nested = NestedObj2()
+
+        self.assertEqual(ComplexRecord.schema(), {
+            "name": "ComplexRecord",
+            "type": "record",
+            "fields": [
+                {"name": "a", "type": ["null", "int"]},
+                {"name": "b", "type": ["null", "int"]},
+                {"name": "nested", "type": ['null', {'name': 'NestedObj2', 'type': 'record', 'fields': [
+                    {'name': 'na2', 'type': ['null', 'int']},
+                    {'name': 'nb2', 'type': ['null', 'boolean']},
+                    {'name': 'nc2', 'type': ['null', {'name': 'NestedObj1', 'type': 'record', 'fields': [
+                        {'name': 'na1', 'type': ['null', 'string']},
+                        {'name': 'nb1', 'type': ['null', 'double']}
+                    ]}]}
+                ]}]}
+            ]
+        })
+
+        def encode_and_decode(schema_type):
+            data_schema = AvroSchema(ComplexRecord)
+            if schema_type == 'json':
+                data_schema = JsonSchema(ComplexRecord)
+
+            nested_obj1 = NestedObj1(na1='na1 value', nb1=20.5)
+            nested_obj2 = NestedObj2(na2=22, nb2=True, nc2=nested_obj1)
+            r = ComplexRecord(a=1, b=2, nested=nested_obj2)
+            data_encode = data_schema.encode(r)
+
+            data_decode = data_schema.decode(data_encode)
+            self.assertEqual(data_decode.__class__.__name__, 'ComplexRecord')
+            self.assertEqual(data_decode, r)
+            self.assertEqual(data_decode.a, 1)
+            self.assertEqual(data_decode.b, 2)
+            self.assertEqual(data_decode.nested.na2, 22)
+            self.assertEqual(data_decode.nested.nb2, True)
+            self.assertEqual(data_decode.nested.nc2.na1, 'na1 value')
+            self.assertEqual(data_decode.nested.nc2.nb1, 20.5)
+            print('Encode and decode complex schema finish. schema_type: ', schema_type)
+
+        encode_and_decode('avro')
+        encode_and_decode('json')
+
+    def test_produce_and_consume_complex_schema_data(self):
+        class NestedObj1(Record):
+            na1 = String()
+            nb1 = Double()
+
+        class NestedObj2(Record):
+            na2 = Integer()
+            nb2 = Boolean()
+            nc2 = NestedObj1()
+
+        class ComplexRecord(Record):
+            a = Integer()
+            b = Integer()
+            nested = NestedObj2()
+
+        client = pulsar.Client(self.serviceUrl)
+
+        def produce_consume_test(schema_type):
+            topic = "my-complex-schema-topic-" + schema_type
+
+            data_schema = AvroSchema(ComplexRecord)
+            if schema_type == 'json':
+                data_schema= JsonSchema(ComplexRecord)
+
+            producer = client.create_producer(
+                        topic=topic,
+                        schema=data_schema)
+
+            consumer = client.subscribe(topic, 'test', schema=data_schema)
+
+            nested_obj1 = NestedObj1(na1='na1 value', nb1=20.5)
+            nested_obj2 = NestedObj2(na2=22, nb2=True, nc2=nested_obj1)
+            r = ComplexRecord(a=1, b=2, nested=nested_obj2)
+            producer.send(r)
+
+            msg = consumer.receive()
+            value = msg.value()
+            self.assertEqual(value.__class__.__name__, 'ComplexRecord')
+            self.assertEqual(value, r)
+            self.assertEqual(value.a, 1)
+            self.assertEqual(value.b, 2)
+            self.assertEqual(value.nested.na2, 22)
+            self.assertEqual(value.nested.nb2, True)
+            self.assertEqual(value.nested.nc2.na1, 'na1 value')
+            self.assertEqual(value.nested.nc2.nb1, 20.5)
+
+            producer.close()
+            consumer.close()
+            print('Produce and consume complex schema data finish. schema_type', schema_type)
+
+        produce_consume_test('avro')
+        produce_consume_test('json')
+
+        client.close()
+
 if __name__ == '__main__':
     main()

[pulsar] 05/10: [Transaction] Fix direct memory leak related to commit and abort markers (#11407)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit beeefacf571304f53b27d37516325b5e93b7f09d
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Jul 22 11:45:32 2021 +0300

    [Transaction] Fix direct memory leak related to commit and abort markers (#11407)
    
    - Netty ByteBufs for commit and abort markers leaked direct memory
    
    - ManagedLedgerImpl.asyncAddEntry will retain the buffer and therefore the
      reference count has to be decreased with the call to release after
      the asyncAddEntry call
    
    (cherry picked from commit 22413d53e3cafb0d304d606aec8cf6216219fed0)
---
 .../buffer/impl/TopicTransactionBuffer.java        | 104 ++++++++++++---------
 1 file changed, 58 insertions(+), 46 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 4db7a3e..c33c404 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -206,24 +206,28 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
         ByteBuf commitMarker = Markers.newTxnCommitMarker(-1L, txnID.getMostSigBits(),
                 txnID.getLeastSigBits());
 
-        topic.getManagedLedger().asyncAddEntry(commitMarker, new AsyncCallbacks.AddEntryCallback() {
-            @Override
-            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
-                synchronized (TopicTransactionBuffer.this) {
-                    updateMaxReadPosition(txnID);
-                    handleLowWaterMark(txnID, lowWaterMark);
-                    clearAbortedTransactions();
-                    takeSnapshotByChangeTimes();
+        try {
+            topic.getManagedLedger().asyncAddEntry(commitMarker, new AsyncCallbacks.AddEntryCallback() {
+                @Override
+                public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+                    synchronized (TopicTransactionBuffer.this) {
+                        updateMaxReadPosition(txnID);
+                        handleLowWaterMark(txnID, lowWaterMark);
+                        clearAbortedTransactions();
+                        takeSnapshotByChangeTimes();
+                    }
+                    completableFuture.complete(null);
                 }
-                completableFuture.complete(null);
-            }
 
-            @Override
-            public void addFailed(ManagedLedgerException exception, Object ctx) {
-                log.error("Failed to commit for txn {}", txnID, exception);
-                completableFuture.completeExceptionally(new PersistenceException(exception));
-            }
-        }, null);
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object ctx) {
+                    log.error("Failed to commit for txn {}", txnID, exception);
+                    completableFuture.completeExceptionally(new PersistenceException(exception));
+                }
+            }, null);
+        } finally {
+            commitMarker.release();
+        }
         return completableFuture;
     }
 
@@ -235,26 +239,30 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
         CompletableFuture<Void> completableFuture = new CompletableFuture<>();
 
         ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L, txnID.getMostSigBits(), txnID.getLeastSigBits());
-        topic.getManagedLedger().asyncAddEntry(abortMarker, new AsyncCallbacks.AddEntryCallback() {
-            @Override
-            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
-                synchronized (TopicTransactionBuffer.this) {
-                    aborts.put(txnID, (PositionImpl) position);
-                    updateMaxReadPosition(txnID);
-                    handleLowWaterMark(txnID, lowWaterMark);
-                    changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
-                    clearAbortedTransactions();
-                    takeSnapshotByChangeTimes();
+        try {
+            topic.getManagedLedger().asyncAddEntry(abortMarker, new AsyncCallbacks.AddEntryCallback() {
+                @Override
+                public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+                    synchronized (TopicTransactionBuffer.this) {
+                        aborts.put(txnID, (PositionImpl) position);
+                        updateMaxReadPosition(txnID);
+                        handleLowWaterMark(txnID, lowWaterMark);
+                        changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+                        clearAbortedTransactions();
+                        takeSnapshotByChangeTimes();
+                    }
+                    completableFuture.complete(null);
                 }
-                completableFuture.complete(null);
-            }
 
-            @Override
-            public void addFailed(ManagedLedgerException exception, Object ctx) {
-                log.error("Failed to abort for txn {}", txnID, exception);
-                completableFuture.completeExceptionally(new PersistenceException(exception));
-            }
-        }, null);
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object ctx) {
+                    log.error("Failed to abort for txn {}", txnID, exception);
+                    completableFuture.completeExceptionally(new PersistenceException(exception));
+                }
+            }, null);
+        } finally {
+            abortMarker.release();
+        }
         return completableFuture;
     }
 
@@ -264,20 +272,24 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
             if (firstTxn.getMostSigBits() == txnID.getMostSigBits() && lowWaterMark >= firstTxn.getLeastSigBits()) {
                 ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L,
                         txnID.getMostSigBits(), txnID.getLeastSigBits());
-                topic.getManagedLedger().asyncAddEntry(abortMarker, new AsyncCallbacks.AddEntryCallback() {
-                    @Override
-                    public void addComplete(Position position, ByteBuf entryData, Object ctx) {
-                        synchronized (TopicTransactionBuffer.this) {
-                            aborts.put(firstTxn, (PositionImpl) position);
-                            updateMaxReadPosition(firstTxn);
+                try {
+                    topic.getManagedLedger().asyncAddEntry(abortMarker, new AsyncCallbacks.AddEntryCallback() {
+                        @Override
+                        public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+                            synchronized (TopicTransactionBuffer.this) {
+                                aborts.put(firstTxn, (PositionImpl) position);
+                                updateMaxReadPosition(firstTxn);
+                            }
                         }
-                    }
 
-                    @Override
-                    public void addFailed(ManagedLedgerException exception, Object ctx) {
-                        log.error("Failed to abort low water mark for txn {}", txnID, exception);
-                    }
-                }, null);
+                        @Override
+                        public void addFailed(ManagedLedgerException exception, Object ctx) {
+                            log.error("Failed to abort low water mark for txn {}", txnID, exception);
+                        }
+                    }, null);
+                } finally {
+                    abortMarker.release();
+                }
             }
         }
     }

[pulsar] 02/10: Avoid infinite waiting for consumer close (#11347)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a3f179cf5d8d4e2e67828af535234cc9dd3a0b52
Author: Shoothzj <sh...@gmail.com>
AuthorDate: Thu Jul 22 09:21:59 2021 +0800

    Avoid infinite waiting for consumer close (#11347)
    
    ### Motivation
    If there are two Events
    - EventA: Server close the consumer
    - EventB: consumer close self
    
    If EventA and EventB concurrently, then the `cnx.ctx()` has probability to be null. Which occurs a NPE, lead to infinite wait future
    
    ### Modifications
    
    If the `cnx.ctx()` is already null, we ignore the exception too.
    
    (cherry picked from commit 4d3fdae532f2e55392daaa92c9ffabbd4db8c278)
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java     | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a0fb143..829e62f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -28,6 +28,7 @@ import com.scurrilous.circe.checksum.Crc32cIntChecksum;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
@@ -926,7 +927,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         } else {
             ByteBuf cmd = Commands.newCloseConsumer(consumerId, requestId);
             cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
-                boolean ignoreException = !cnx.ctx().channel().isActive();
+                final ChannelHandlerContext ctx = cnx.ctx();
+                boolean ignoreException = ctx == null || !ctx.channel().isActive();
                 if (ignoreException && exception != null) {
                     log.debug("Exception ignored in closing consumer", exception);
                 }

[pulsar] 09/10: Add metrics storageLogicalSize for the TopicStats and NamespaceStats (#11430)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 81e738d78995cf817e4c961e38daa646c1e7a470
Author: GuoJiwei <te...@apache.org>
AuthorDate: Fri Jul 23 07:52:58 2021 +0800

    Add metrics storageLogicalSize for the TopicStats and NamespaceStats (#11430)
    
    
    (cherry picked from commit 540ae1b4ec8b4149dfdc371ce3d298c7bf1c03c7)
---
 .../java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java     | 5 +++++
 .../org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java  | 5 +++++
 .../pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java    | 1 +
 .../apache/pulsar/broker/stats/prometheus/ManagedLedgerStats.java   | 2 ++
 .../pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java    | 2 ++
 .../java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java  | 2 ++
 .../pulsar/broker/stats/prometheus/TransactionAggregator.java       | 3 +++
 .../java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java | 6 ++++++
 .../broker/stats/prometheus/AggregatedNamespaceStatsTest.java       | 4 ++++
 site2/docs/reference-metrics.md                                     | 1 +
 10 files changed, 31 insertions(+)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
index d6f2b89..b4444c0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
@@ -41,6 +41,11 @@ public interface ManagedLedgerMXBean {
     long getStoredMessagesSize();
 
     /**
+     * @return the total size of the messages in active ledgers (without accounting for replicas)
+     */
+    long getStoredMessagesLogicalSize();
+
+    /**
      * @return the number of backlog messages for all the consumers
      */
     long getNumberOfMessagesInBacklog();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
index 87606d2..9e22b51 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
@@ -282,6 +282,11 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean {
     }
 
     @Override
+    public long getStoredMessagesLogicalSize() {
+        return managedLedger.getTotalSize();
+    }
+
+    @Override
     public long getNumberOfMessagesInBacklog() {
         long count = 0;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 05f1b15..c049366 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -65,6 +65,7 @@ public class AggregatedNamespaceStats {
         msgOutCounter += stats.msgOutCounter;
 
         managedLedgerStats.storageSize += stats.managedLedgerStats.storageSize;
+        managedLedgerStats.storageLogicalSize += stats.managedLedgerStats.storageLogicalSize;
         managedLedgerStats.backlogSize += stats.managedLedgerStats.backlogSize;
         managedLedgerStats.offloadedStorageUsed += stats.managedLedgerStats.offloadedStorageUsed;
         backlogQuotaLimit = Math.max(backlogQuotaLimit, stats.backlogQuotaLimit);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/ManagedLedgerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/ManagedLedgerStats.java
index f914166..9f124a4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/ManagedLedgerStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/ManagedLedgerStats.java
@@ -26,6 +26,7 @@ public class ManagedLedgerStats {
     long storageSize;
     long backlogSize;
     long offloadedStorageUsed;
+    long storageLogicalSize;
 
     StatsBuckets storageWriteLatencyBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
     StatsBuckets storageLedgerWriteLatencyBuckets = new StatsBuckets(ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC);
@@ -40,6 +41,7 @@ public class ManagedLedgerStats {
         storageReadRate = 0;
         backlogSize = 0;
         offloadedStorageUsed = 0;
+        storageLogicalSize = 0;
 
         storageWriteLatencyBuckets.reset();
         storageLedgerWriteLatencyBuckets.reset();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 18c4f97..43119d7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -96,6 +96,7 @@ public class NamespaceStatsAggregator {
             ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) ml.getStats();
 
             stats.managedLedgerStats.storageSize = mlStats.getStoredMessagesSize();
+            stats.managedLedgerStats.storageLogicalSize = mlStats.getStoredMessagesLogicalSize();
             stats.managedLedgerStats.backlogSize = ml.getEstimatedBacklogSize();
             stats.managedLedgerStats.offloadedStorageUsed = ml.getOffloadedSize();
             stats.backlogQuotaLimit = topic.getBacklogQuota().getLimitSize();
@@ -258,6 +259,7 @@ public class NamespaceStatsAggregator {
         metric(stream, cluster, namespace, "pulsar_out_messages_total", stats.msgOutCounter);
 
         metric(stream, cluster, namespace, "pulsar_storage_size", stats.managedLedgerStats.storageSize);
+        metric(stream, cluster, namespace, "pulsar_storage_logical_size", stats.managedLedgerStats.storageLogicalSize);
         metric(stream, cluster, namespace, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize);
         metric(stream, cluster, namespace, "pulsar_storage_offloaded_size",
                 stats.managedLedgerStats.offloadedStorageUsed);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 0649235..1ac514a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -92,6 +92,8 @@ class TopicStats {
         metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize);
 
         metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize);
+        metric(stream, cluster, namespace, topic, "pulsar_storage_logical_size",
+                stats.managedLedgerStats.storageLogicalSize);
         metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog);
         metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_size",
                 stats.managedLedgerStats.backlogSize);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
index 56e4292..367b792 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
@@ -119,6 +119,7 @@ public class TransactionAggregator {
         ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) managedLedger.getStats();
 
         managedLedgerStats.storageSize = mlStats.getStoredMessagesSize();
+        managedLedgerStats.storageLogicalSize = mlStats.getStoredMessagesLogicalSize();
         managedLedgerStats.backlogSize = managedLedger.getEstimatedBacklogSize();
         managedLedgerStats.offloadedStorageUsed = managedLedger.getOffloadedSize();
 
@@ -168,6 +169,8 @@ public class TransactionAggregator {
         metrics(stream, cluster, namespace, topic, subscription,
                 "pulsar_storage_size", stats.storageSize);
         metrics(stream, cluster, namespace, topic, subscription,
+                "pulsar_storage_logical_size", stats.storageLogicalSize);
+        metrics(stream, cluster, namespace, topic, subscription,
                 "pulsar_storage_backlog_size", stats.backlogSize);
         metrics(stream, cluster, namespace, topic, subscription,
                 "pulsar_storage_offloaded_size", stats.offloadedStorageUsed);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index cd05306..f0266de 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -235,6 +235,10 @@ public class TransactionMetricsTest extends BrokerTestBase {
         checkManagedLedgerMetrics(subName, 32, metric);
         checkManagedLedgerMetrics(MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME, 252, metric);
 
+        metric = metrics.get("pulsar_storage_logical_size");
+        checkManagedLedgerMetrics(subName, 16, metric);
+        checkManagedLedgerMetrics(MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME, 126, metric);
+
         metric = metrics.get("pulsar_storage_backlog_size");
         checkManagedLedgerMetrics(subName, 16, metric);
         checkManagedLedgerMetrics(MLTransactionLogImpl.TRANSACTION_SUBSCRIPTION_NAME, 126, metric);
@@ -245,6 +249,8 @@ public class TransactionMetricsTest extends BrokerTestBase {
         metrics = parseMetrics(metricsStr);
         metric = metrics.get("pulsar_storage_size");
         assertEquals(metric.size(), 3);
+        metric = metrics.get("pulsar_storage_logical_size");
+        assertEquals(metric.size(), 2);
         metric = metrics.get("pulsar_storage_backlog_size");
         assertEquals(metric.size(), 2);
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
index 2d0a5f9..9784928 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
@@ -39,6 +39,7 @@ public class AggregatedNamespaceStatsTest {
         topicStats1.throughputIn = 10240.0;
         topicStats1.throughputOut = 20480.0;
         topicStats1.managedLedgerStats.storageSize = 5120;
+        topicStats1.managedLedgerStats.storageLogicalSize = 2048;
         topicStats1.msgBacklog = 30;
         topicStats1.managedLedgerStats.storageWriteRate = 12.0;
         topicStats1.managedLedgerStats.storageReadRate = 6.0;
@@ -70,6 +71,7 @@ public class AggregatedNamespaceStatsTest {
         topicStats2.throughputIn = 512.0;
         topicStats2.throughputOut = 1024.5;
         topicStats2.managedLedgerStats.storageSize = 1024;
+        topicStats2.managedLedgerStats.storageLogicalSize = 512;
         topicStats2.msgBacklog = 7;
         topicStats2.managedLedgerStats.storageWriteRate = 5.0;
         topicStats2.managedLedgerStats.storageReadRate = 2.5;
@@ -108,6 +110,8 @@ public class AggregatedNamespaceStatsTest {
         assertEquals(nsStats.msgBacklog, 37);
         assertEquals(nsStats.managedLedgerStats.storageWriteRate, 17.0);
         assertEquals(nsStats.managedLedgerStats.storageReadRate, 8.5);
+        assertEquals(nsStats.managedLedgerStats.storageSize, 6144);
+        assertEquals(nsStats.managedLedgerStats.storageLogicalSize, 2560);
 
         AggregatedReplicationStats nsReplStats = nsStats.replicationStats.get(namespace);
         assertNotNull(nsReplStats);
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index f9be3bc..c89ca5c 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -144,6 +144,7 @@ All the namespace metrics are labelled with the following labels:
 | pulsar_throughput_in | Gauge | The total throughput of the namespace coming into this broker (bytes/second). |
 | pulsar_throughput_out | Gauge | The total throughput of the namespace going out from this broker (bytes/second). |
 | pulsar_storage_size | Gauge | The total storage size of the topics in this namespace owned by this broker (bytes). |
+| pulsar_storage_logical_size | Gauge | The storage size (without accounting for replicas) of the topics in this namespace owned by this broker (bytes). |
 | pulsar_storage_backlog_size | Gauge | The total backlog size of the topics of this namespace owned by this broker (messages). |
 | pulsar_storage_offloaded_size | Gauge | The total amount of the data in this namespace offloaded to the tiered storage (bytes). |
 | pulsar_storage_write_rate | Gauge | The total message batches (entries) written to the storage for this namespace (message batches / second). |