You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/12 10:46:58 UTC

[pulsar] branch branch-2.8 updated (6abb3027a50 -> 73e470c1e71)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 6abb3027a50 [branch-2.8] Fix delete namespace issue (revert and fix #14619) (#15040)
     new 4829f79fbaf [fix][broker] Return if reset in progress (#14978)
     new 5c527f2610f Fix when nextValidLedger is null cause npe (#13975)
     new 73e470c1e71 Handle kafka sinks that return immutable maps as configs (#14780)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  4 ++-
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  | 29 ++++++++++++++--------
 .../connect/SchemaedFileStreamSinkConnector.java   | 14 +++++++++++
 3 files changed, 35 insertions(+), 12 deletions(-)


[pulsar] 02/03: Fix when nextValidLedger is null cause npe (#13975)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5c527f2610f640d791b8f092a1c730a518ddc81d
Author: Kai Wang <kw...@streamnative.io>
AuthorDate: Fri Apr 8 11:40:25 2022 +0800

    Fix when nextValidLedger is null cause npe (#13975)
    
    (cherry picked from commit 5cf3fa0d8050b16305ee060820b884d69ec8a828)
---
 .../java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java     | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index dfd4d6f220a..6bcb038b9e1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1699,7 +1699,8 @@ public class ManagedCursorImpl implements ManagedCursor {
             try {
                 long ledgerEntries = ledger.getLedgerInfo(markDeletePosition.getLedgerId()).get().getEntries();
                 Long nextValidLedger = ledger.getNextValidLedger(ledger.getLastConfirmedEntry().getLedgerId());
-                shouldCursorMoveForward = (markDeletePosition.getEntryId() + 1 >= ledgerEntries)
+                shouldCursorMoveForward = nextValidLedger != null
+                        && (markDeletePosition.getEntryId() + 1 >= ledgerEntries)
                         && (newPosition.getLedgerId() == nextValidLedger);
             } catch (Exception e) {
                 log.warn("Failed to get ledger entries while setting mark-delete-position", e);


[pulsar] 01/03: [fix][broker] Return if reset in progress (#14978)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4829f79fbaf2940d220e0906091390cfcf0cad36
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Wed Apr 6 20:49:01 2022 +0800

    [fix][broker] Return if reset in progress (#14978)
    
    ### Motivation
    
    - Fix bug. If cursor reset in progress, the callback method will call `resetFailed` but without return. This will cause the callback be invoked again
    
    ### Modifications
    
    - Just add `return` after callback invoke `resetFailed`  If cursor reset in progress
    
    (cherry picked from commit 81da8d3cd199fd6c1e4510a1c1c2ac71418efd5e)
---
 .../main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java  | 1 +
 1 file changed, 1 insertion(+)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index d03351f2da2..dfd4d6f220a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1045,6 +1045,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 resetCursorCallback.resetFailed(
                         new ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in progress"),
                         position);
+                return;
             }
         }
 


[pulsar] 03/03: Handle kafka sinks that return immutable maps as configs (#14780)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 73e470c1e71b3fe2aba6a9cb96d3b102f85e3969
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Tue Mar 22 00:51:39 2022 -0700

    Handle kafka sinks that return immutable maps as configs (#14780)
    
    (cherry picked from commit b56d7318e73fb6915208dbe1223446e759c2ed0b)
---
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  | 29 ++++++++++++++--------
 .../connect/SchemaedFileStreamSinkConnector.java   | 14 +++++++++++
 2 files changed, 32 insertions(+), 11 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index bd1971e75c6..7f89c106360 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -25,6 +25,19 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
@@ -44,17 +57,6 @@ import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
 import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
 import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG;
 
@@ -155,6 +157,11 @@ public class KafkaConnectSink implements Sink<GenericObject> {
         Preconditions.checkNotNull(configs);
         Preconditions.checkArgument(configs.size() == 1);
 
+        // configs may contain immutable/unmodifiable maps
+        configs = configs.stream()
+                .map(HashMap::new)
+                .collect(Collectors.toList());
+
         configs.forEach(x -> {
             x.put(OFFSET_STORAGE_TOPIC_CONFIG, kafkaSinkConfig.getOffsetStorageTopic());
             x.put(PULSAR_SERVICE_URL_CONFIG, kafkaSinkConfig.getPulsarServiceUrl());
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java
index a3cce924d1a..4a786617f75 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/SchemaedFileStreamSinkConnector.java
@@ -22,6 +22,11 @@ package org.apache.pulsar.io.kafka.connect;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.file.FileStreamSinkConnector;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 /**
  * A FileStreamSinkConnector for testing that writes data other than just a value, i.e.:
  * key, value, key and value schemas.
@@ -31,4 +36,13 @@ public class SchemaedFileStreamSinkConnector extends FileStreamSinkConnector {
     public Class<? extends Task> taskClass() {
         return SchemaedFileStreamSinkTask.class;
     }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        // to test cases when task return immutable maps as configs
+        return super.taskConfigs(maxTasks)
+                .stream()
+                .map(Collections::unmodifiableMap)
+                .collect(Collectors.toList());
+    }
 }