You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/11/02 02:51:46 UTC

[pulsar] branch branch-2.6 updated (6ba0156 -> 92b77c0)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 6ba0156  Remove go 1.11
     new fe3a258  Fix deadlock that occurred during topic ownership check (#8406)
     new 76ae877  Resolve pr 8304 conflict
     new 62d5e93  Set timeout delay for partitionsAutoUpdateTimerTask recheck schedule (#8227)
     new 92b77c0  Merge branch 'branch-2.6' of github.com:apache/pulsar into branch-2.6

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../pulsar/broker/namespace/NamespaceService.java  | 14 +++++-
 .../pulsar/broker/service/BkEnsemblesTestBase.java |  1 +
 .../client/impl/MultiTopicsConsumerImpl.java       |  2 +-
 .../client/impl/PartitionedProducerImpl.java       |  2 +-
 .../apache/pulsar/zookeeper/ZooKeeperCache.java    | 54 ++++++++++++++------
 .../pulsar/zookeeper/ZookeeperCacheTest.java       | 58 ++++++++++++++++++++++
 6 files changed, 114 insertions(+), 17 deletions(-)


[pulsar] 03/04: Set timeout delay for partitionsAutoUpdateTimerTask recheck schedule (#8227)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 62d5e9379273bf35629d1a6d78289e3ea4f29291
Author: Aaron Robert <Ro...@outlook.com>
AuthorDate: Sat Oct 31 10:53:34 2020 +0800

    Set timeout delay for partitionsAutoUpdateTimerTask recheck schedule (#8227)
    
    Co-authored-by: penghui <pe...@apache.org>
    
    ### Motivation
    In setting autoUpdatePartitionsInterval, just on the first call to partitionsAutoUpdateTimerTask were set up. But the next recheck schedule is still using the default parameters. This PR fixes the problem.
    
    ### Modifications
    
    * Set timeout delay for partitionsAutoUpdateTimerTask recheck schedule
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
    
    This change is a trivial rework / code cleanup without any test coverage.
    
    ### Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API: (no)
      - The schema: (no)
      - The default values of configurations: (no)
      - The wire protocol: (no)
      - The rest endpoints: (no)
      - The admin cli options: (no)
      - Anything that affects deployment: (no)
    
    ### Documentation
    
      - Does this pull request introduce a new feature? (no)
    
    (cherry picked from commit d4a23b345b0df5398d01ec5bebd500c58013d4d1)
---
 .../java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java     | 2 +-
 .../java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java     | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 06a21a8..67f09c7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1186,7 +1186,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
             // schedule the next re-check task
             partitionsAutoUpdateTimeout = client.timer()
-                .newTimeout(partitionsAutoUpdateTimerTask, 1, TimeUnit.MINUTES);
+                .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
         }
     };
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index dd7b29b..765524c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -353,7 +353,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
 
             // schedule the next re-check task
             partitionsAutoUpdateTimeout = client.timer()
-                .newTimeout(partitionsAutoUpdateTimerTask, 1, TimeUnit.MINUTES);
+                .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
         }
     };
 


[pulsar] 04/04: Merge branch 'branch-2.6' of github.com:apache/pulsar into branch-2.6

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 92b77c0de7631f42c81fb98efc5b427b7bed2890
Merge: 62d5e93 6ba0156
Author: xiaolong.ran <rx...@apache.org>
AuthorDate: Mon Nov 2 10:51:03 2020 +0800

    Merge branch 'branch-2.6' of github.com:apache/pulsar into branch-2.6

 .github/workflows/ci-go-functions-test.yaml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[pulsar] 01/04: Fix deadlock that occurred during topic ownership check (#8406)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fe3a258c3a696683009d8ff2771a3e4df757ea7f
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Fri Oct 30 04:00:37 2020 +0900

    Fix deadlock that occurred during topic ownership check (#8406)
    
    
    (cherry picked from commit f96bc6305f54cd1845af64707ba4b94a7e9d861a)
---
 .../apache/pulsar/broker/namespace/NamespaceService.java   | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index f2995ee..a1acf28 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -931,7 +931,19 @@ public class NamespaceService {
         if (bundle.isPresent()) {
             return ownershipCache.getOwnedBundle(bundle.get()) != null;
         } else {
-            return ownershipCache.getOwnedBundle(getBundle(topicName)) != null;
+            // Calling `getBundle(TopicName)` here can cause a deadlock.
+            // cf. https://github.com/apache/pulsar/pull/4190
+            //
+            // This method returns false once if the bundle metadata is not cached, but gets the metadata asynchronously
+            // to cache it. Otherwise, the clients will never be able to connect to the topic due to ServiceUnitNotReadyException.
+            // cf. https://github.com/apache/pulsar/pull/5919
+            getBundleAsync(topicName).thenAccept(bundle2 -> {
+                LOG.info("Succeeded in getting bundle {} for topic - [{}]", bundle2, topicName);
+            }).exceptionally(ex -> {
+                LOG.warn("Failed to get bundle for topic - [{}] {}", topicName, ex.getMessage());
+                return null;
+            });
+            return false;
         }
     }
 


[pulsar] 02/04: Resolve pr 8304 conflict

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 76ae877da3ead26200525f0fcbef8fd71747b9f6
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Fri Oct 30 07:34:01 2020 -0700

    Resolve pr 8304 conflict
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
---
 .../pulsar/broker/service/BkEnsemblesTestBase.java |  1 +
 .../apache/pulsar/zookeeper/ZooKeeperCache.java    | 54 ++++++++++++++------
 .../pulsar/zookeeper/ZookeeperCacheTest.java       | 58 ++++++++++++++++++++++
 3 files changed, 99 insertions(+), 14 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
index a4bfe05..0616b55 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
@@ -77,6 +77,7 @@ public abstract class BkEnsemblesTestBase {
             config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
             config.setAdvertisedAddress("127.0.0.1");
             config.setAllowAutoTopicCreationType("non-partitioned");
+            config.setZooKeeperOperationTimeoutSeconds(1);
 
             pulsar = new PulsarService(config);
             pulsar.start();
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index ff132f2..89ba630 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -28,7 +28,6 @@ import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Collections;
-import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
@@ -41,13 +40,12 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
-import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.stats.CacheMetricsCollector;
-import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
-import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -84,19 +82,20 @@ public abstract class ZooKeeperCache implements Watcher {
 
     public static final String ZK_CACHE_INSTANCE = "zk_cache_instance";
 
-    protected final AsyncLoadingCache<String, Entry<Object, Stat>> dataCache;
+    protected final AsyncLoadingCache<String, Pair<Entry<Object, Stat>, Long>> dataCache;
     protected final AsyncLoadingCache<String, Set<String>> childrenCache;
     protected final AsyncLoadingCache<String, Boolean> existsCache;
     private final OrderedExecutor executor;
     private final OrderedExecutor backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build();
     private boolean shouldShutdownExecutor;
     private final int zkOperationTimeoutSeconds;
-    private static final int CACHE_EXPIRY_SECONDS = 300; //5 minutes
+    private static final int DEFAULT_CACHE_EXPIRY_SECONDS = 300; //5 minutes
+    private final int cacheExpirySeconds;
 
     protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null);
 
     public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds, OrderedExecutor executor) {
-        this(cacheName, zkSession, zkOperationTimeoutSeconds, executor, CACHE_EXPIRY_SECONDS);
+        this(cacheName, zkSession, zkOperationTimeoutSeconds, executor, DEFAULT_CACHE_EXPIRY_SECONDS);
     }
     
     public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds,
@@ -106,6 +105,7 @@ public abstract class ZooKeeperCache implements Watcher {
         this.executor = executor;
         this.zkSession.set(zkSession);
         this.shouldShutdownExecutor = false;
+        this.cacheExpirySeconds = cacheExpirySeconds;
 
         this.dataCache = Caffeine.newBuilder()
                 .recordStats()
@@ -349,10 +349,12 @@ public abstract class ZooKeeperCache implements Watcher {
         checkNotNull(path);
         checkNotNull(deserializer);
 
-        CompletableFuture<Optional<Entry<T, Stat>>> future = new CompletableFuture<>();
+        // refresh zk-cache entry in background if it's already expired
+        checkAndRefreshExpiredEntry(path, deserializer);
+        CompletableFuture<Optional<Entry<T,Stat>>> future = new CompletableFuture<>();
         dataCache.get(path, (p, executor) -> {
             // Return a future for the z-node to be fetched from ZK
-            CompletableFuture<Entry<Object, Stat>> zkFuture = new CompletableFuture<>();
+            CompletableFuture<Pair<Entry<Object, Stat>, Long>> zkFuture = new CompletableFuture<>();
 
             // Broker doesn't restart on global-zk session lost: so handling unexpected exception
             try {
@@ -361,8 +363,8 @@ public abstract class ZooKeeperCache implements Watcher {
                         try {
                             T obj = deserializer.deserialize(path, content);
                             // avoid using the zk-client thread to process the result
-                            executor.execute(
-                                    () -> zkFuture.complete(new SimpleImmutableEntry<Object, Stat>(obj, stat)));
+                            executor.execute(() -> zkFuture.complete(ImmutablePair
+                                    .of(new SimpleImmutableEntry<Object, Stat>(obj, stat), System.nanoTime())));
                         } catch (Exception e) {
                             executor.execute(() -> zkFuture.completeExceptionally(e));
                         }
@@ -381,7 +383,7 @@ public abstract class ZooKeeperCache implements Watcher {
             return zkFuture;
         }).thenAccept(result -> {
             if (result != null) {
-                future.complete(Optional.of((Entry<T, Stat>) result));
+                future.complete(Optional.of((Entry<T, Stat>) result.getLeft()));
             } else {
                 future.complete(Optional.empty());
             }
@@ -393,6 +395,30 @@ public abstract class ZooKeeperCache implements Watcher {
         return future;
     }
 
+    private <T> void checkAndRefreshExpiredEntry(String path, final Deserializer<T> deserializer) {
+        CompletableFuture<Pair<Entry<Object, Stat>, Long>> result = dataCache.getIfPresent(path);
+        if (result != null && result.isDone()) {
+            Pair<Entry<Object, Stat>, Long> entryPair = result.getNow(null);
+            if (entryPair != null && entryPair.getRight() != null) {
+                if ((System.nanoTime() - entryPair.getRight()) > TimeUnit.SECONDS.toMillis(cacheExpirySeconds)) {
+                    this.zkSession.get().getData(path, this, (rc, path1, ctx, content, stat) -> {
+                        if (rc != Code.OK.intValue()) {
+                            log.warn("Failed to refresh zookeeper-cache for {} due to {}", path, rc);
+                            return;
+                        }
+                        try {
+                            T obj = deserializer.deserialize(path, content);
+                            dataCache.put(path, CompletableFuture.completedFuture(ImmutablePair
+                                    .of(new SimpleImmutableEntry<Object, Stat>(obj, stat), System.nanoTime())));
+                        } catch (Exception e) {
+                            log.warn("Failed to refresh zookeeper-cache for {}", path, e);
+                        }
+                    }, null);
+                }
+            }
+        }
+    }
+
     /**
      * Simple ZooKeeperChildrenCache use this method to invalidate cache entry on watch event w/o automatic re-loading
      *
@@ -468,9 +494,9 @@ public abstract class ZooKeeperCache implements Watcher {
 
     @SuppressWarnings("unchecked")
     public <T> T getDataIfPresent(String path) {
-        CompletableFuture<Map.Entry<Object, Stat>> f = dataCache.getIfPresent(path);
+        CompletableFuture<Pair<Entry<Object, Stat>, Long>> f = dataCache.getIfPresent(path);
         if (f != null && f.isDone() && !f.isCompletedExceptionally()) {
-            return (T) f.join().getKey();
+            return (T) f.join().getLeft().getKey();
         } else {
             return null;
         }
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index b3db3d9..d8bd2c9 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -48,6 +48,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.KeeperException.Code;
@@ -563,6 +564,63 @@ public class ZookeeperCacheTest {
         scheduledExecutor.shutdown();
     }
 
+    /**
+     * Test to verify {@link ZooKeeperCache} renews cache data after expiry time in background.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testZKRefreshExpiredEntry() throws Exception {
+        int cacheExpiryTimeSec = 1;
+        OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build();
+        ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
+        ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk"));
+
+        String path = "/test";
+        String val1 = "test-1";
+        String val2 = "test-2";
+        zkClient.create(path, val1.getBytes(), null, null);
+
+        // add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle
+        // callback-result process
+        ZooKeeperCache zkCacheService = new ZooKeeperCacheTest("test", zkClient, 30, executor, cacheExpiryTimeSec);
+        ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
+            @Override
+            public String deserialize(String key, byte[] content) throws Exception {
+                return new String(content);
+            }
+        };
+
+        // try to do get on the path which will time-out and async-cache will have non-completed Future
+        assertEquals(zkCache.get(path).get(), val1);
+
+        zkClient.setData(path, val2.getBytes(), -1);
+
+        retryStrategically((test) -> {
+            try {
+                return zkCache.get(path).get().equalsIgnoreCase(val2);
+            } catch (Exception e) {
+                log.warn("Failed to get date for path {}", path);
+            }
+            return false;
+        }, 5, 1000);
+
+        assertEquals(zkCache.get(path).get(), val2);
+
+        executor.shutdown();
+        zkExecutor.shutdown();
+        scheduledExecutor.shutdown();
+    }
+
+    static class ZooKeeperCacheTest extends ZooKeeperCache {
+
+        public ZooKeeperCacheTest(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds,
+                OrderedExecutor executor, int cacheExpirySeconds) {
+            super(cacheName, zkSession, zkOperationTimeoutSeconds, executor, cacheExpirySeconds);
+        }
+
+    }
+
     private static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis)
             throws Exception {
         for (int i = 0; i < retryCount; i++) {