You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/21 00:20:12 UTC

[pulsar] branch branch-2.7 updated (af3407e -> 43ab3c4)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from af3407e  Handle web application exception to redirect request (#9228)
     new 7ff2b41  Fixing the mac build if you use a different openssl (#9165)
     new 43ab3c4  Fix issue with topic compaction when compaction ledger is empty (#9206)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/pulsar/compaction/CompactedTopicImpl.java  | 15 ++++++++++++++-
 pulsar-client-cpp/CMakeLists.txt                          |  3 ++-
 2 files changed, 16 insertions(+), 2 deletions(-)


[pulsar] 02/02: Fix issue with topic compaction when compaction ledger is empty (#9206)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 43ab3c460307f5bc4f909b343281c36604437320
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed Jan 20 14:36:48 2021 -0800

    Fix issue with topic compaction when compaction ledger is empty (#9206)
    
    ### Motivation
    
    Exceptions are thrown in the broker when when reading from a compacted topic where all keys are marked as deleted.  Clients will not be able to make progress at that point. This can be reproduced in the following manner:
    
    1. Publish x number of key value pairs
    2. Publish the same x number of keys but with no value (mark them for deletion)
    3. Compact the topic
    4. Read compacted from that topic.
    
    Exception:
    
    `23:11:20.805 [broker-topic-workers-OrderedScheduler-3-0] WARN  com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache - Exception thrown during asynchronous load
    org.apache.bookkeeper.client.BKException$BKReadException: Error while reading ledger
    	at org.apache.bookkeeper.client.BKException.create(BKException.java:62) ~[bookkeeper-server-4.11.0.SPLK.10278328.jar:?]
    	at org.apache.pulsar.compaction.CompactedTopicImpl.lambda$readOneMessageId$9(CompactedTopicImpl.java:181) ~[pulsar-broker.jar:2.7.0]
    	at org.apache.bookkeeper.client.LedgerHandle.asyncReadEntries(LedgerHandle.java:690) ~[bookkeeper-server-4.11.0.SPLK.10278328.jar:?]
    	at org.apache.pulsar.compaction.CompactedTopicImpl.readOneMessageId(CompactedTopicImpl.java:177) ~[pulsar-broker.jar:2.7.0]
    	at org.apache.pulsar.compaction.CompactedTopicImpl.lambda$createCache$8(CompactedTopicImpl.java:168) ~[pulsar-broker.jar:2.7.0]
    	at com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.lambda$get$2(LocalAsyncLoadingCache.java:129) ~[caffeine-2.6.2.jar:?]
    	at com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2039) ~[caffeine-2.6.2.jar:?]
    	at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853) ~[?:1.8.0_231]
    	at com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2037) ~[caffeine-2.6.2.jar:?]
    	at com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2020) ~[caffeine-2.6.2.jar:?]
    	at com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.get(LocalAsyncLoadingCache.java:128) ~[caffeine-2.6.2.jar:?]
    	at com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.get(LocalAsyncLoadingCache.java:119) ~[caffeine-2.6.2.jar:?]
    	at com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache.get(LocalAsyncLoadingCache.java:158) ~[caffeine-2.6.2.jar:?]
    	at org.apache.pulsar.compaction.CompactedTopicImpl.findStartPointLoop(CompactedTopicImpl.java:144) ~[pulsar-broker.jar:2.7.0]
    	at org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint(CompactedTopicImpl.java:132) ~[pulsar-broker.jar:2.7.0]
    	at org.apache.pulsar.compaction.CompactedTopicImpl.lambda$asyncReadEntriesOrWait$4(CompactedTopicImpl.java:93) ~[pulsar-broker.jar:2.7.0]
    	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981) ~[?:1.8.0_231]
    	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124) ~[?:1.8.0_231]
    	at org.apache.pulsar.compaction.CompactedTopicImpl.asyncReadEntriesOrWait(CompactedTopicImpl.java:92) ~[pulsar-broker.jar:2.7.0]
    	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.readMoreEntries(PersistentDispatcherSingleActiveConsumer.java:483) ~[pulsar-broker.jar:2.7.0]
    	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$null$11(PersistentDispatcherSingleActiveConsumer.java:549) ~[pulsar-broker.jar:2.7.0]
    	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [managed-ledger.jar:2.7.0]
    	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [bookkeeper-common-4.11.0.SPLK.10278328.jar:?]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_231]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_231]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_231]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_231]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_231]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_231]
    	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.51.Final.jar:4.1.51.Final]
    	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
    `
    
    ### Modifications
    
    Handle correctly the scenario where compact ledger is empty
    
    
    
    (cherry picked from commit f0929bee69f99264a829522a3bda9fd7748f0f75)
---
 .../org/apache/pulsar/compaction/CompactedTopicImpl.java  | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 2739269..d30098a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
 
 public class CompactedTopicImpl implements CompactedTopic {
     final static long NEWER_THAN_COMPACTED = -0xfeed0fbaL;
+    final static long COMPACT_LEDGER_EMPTY = -0xfeed0fbbL;
     final static int DEFAULT_STARTPOINT_CACHE_SIZE = 100;
 
     private final BookKeeper bk;
@@ -91,6 +92,13 @@ public class CompactedTopicImpl implements CompactedTopic {
                 compactedTopicContext.thenCompose(
                     (context) -> findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache)
                         .thenCompose((startPoint) -> {
+                            // do not need to read the compaction ledger if it is empty.
+                            // the cursor just needs to be set to the compaction horizon
+                            if (startPoint == COMPACT_LEDGER_EMPTY) {
+                                cursor.seek(compactionHorizon.getNext());
+                                callback.readEntriesComplete(Collections.emptyList(), ctx);
+                                return CompletableFuture.completedFuture(null);
+                            }
                             if (startPoint == NEWER_THAN_COMPACTED && compactionHorizon.compareTo(cursorPosition) < 0) {
                                 cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, ctx);
                                 return CompletableFuture.completedFuture(null);
@@ -126,7 +134,12 @@ public class CompactedTopicImpl implements CompactedTopic {
                                                   long lastEntryId,
                                                   AsyncLoadingCache<Long,MessageIdData> cache) {
         CompletableFuture<Long> promise = new CompletableFuture<>();
-        findStartPointLoop(p, 0, lastEntryId, promise, cache);
+        // if lastEntryId is less than zero it means there are no entries in the compact ledger
+        if (lastEntryId < 0) {
+            promise.complete(COMPACT_LEDGER_EMPTY);
+        } else {
+            findStartPointLoop(p, 0, lastEntryId, promise, cache);
+        }
         return promise;
     }
 


[pulsar] 01/02: Fixing the mac build if you use a different openssl (#9165)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7ff2b418c5c300a3c6f0fe509073cd9fd04b5f46
Author: phijohns-tibco <62...@users.noreply.github.com>
AuthorDate: Wed Jan 20 15:34:24 2021 -0600

    Fixing the mac build if you use a different openssl (#9165)
    
    Fixes #9155
    
    ### Motivation
    When I attempted to build the c library on macosx with openssl 1.1.1i it failed with linkage errors. I tracked those errors down to the fact that the OPENSLL_ROOT_DIR was incorrect outside of mac vms used by the upstream.
    
    ### Modifications
    
    I set the default value first and then reset it only if running on mac.
    
    ### Verifying this change
    
    This change is a trivial rework / code cleanup without any test coverage.
    
    (cherry picked from commit fd7a5bf5ccb35350081e9e47c323824f066534d6)
---
 pulsar-client-cpp/CMakeLists.txt | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt
index 4716dd7..d4fdb8e 100644
--- a/pulsar-client-cpp/CMakeLists.txt
+++ b/pulsar-client-cpp/CMakeLists.txt
@@ -73,6 +73,8 @@ endif(NOT LOG_CATEGORY_NAME)
 
 add_definitions(-DLOG_CATEGORY_NAME=${LOG_CATEGORY_NAME} -DBUILDING_PULSAR -DBOOST_ALL_NO_LIB -DBOOST_ALLOW_DEPRECATED_HEADERS)
 
+set(OPENSSL_ROOT_DIR /usr/lib64/)
+
 ### This part is to find and keep SSL dynamic libs in RECORD_OPENSSL_SSL_LIBRARY and RECORD_OPENSSL_CRYPTO_LIBRARY
 ### After find the libs, will unset related cache, and will not affact another same call to find_package.
 if (APPLE)
@@ -80,7 +82,6 @@ if (APPLE)
     set(OPENSSL_ROOT_DIR /usr/local/opt/openssl/)
 endif ()
 
-set(OPENSSL_ROOT_DIR /usr/lib64/)
 set(OPENSSL_USE_STATIC_LIBS FALSE)
 find_package(OpenSSL REQUIRED)
 set(RECORD_OPENSSL_SSL_LIBRARY ${OPENSSL_SSL_LIBRARY})