You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2022/09/30 03:46:43 UTC

[flink] 05/05: [FLINK-29381][Connector/Pulsar] Fixes the split assignment for Key Shared subscription.

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 33b290ec9c4426a5cd08f8749256c55e7e20e93a
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Fri Sep 30 01:40:14 2022 +0800

    [FLINK-29381][Connector/Pulsar] Fixes the split assignment for Key Shared subscription.
---
 ...itAssigner.java => KeySharedSplitAssigner.java} | 65 ++++++++--------------
 .../assigner/NonSharedSplitAssigner.java           | 33 -----------
 .../enumerator/assigner/SplitAssignerBase.java     | 33 +++++++++++
 .../enumerator/assigner/SplitAssignerFactory.java  |  4 +-
 .../assigner/NonSharedSplitAssignerTest.java       |  2 +-
 .../source/cases/KeySharedSubscriptionContext.java |  5 +-
 6 files changed, 62 insertions(+), 80 deletions(-)

diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/KeySharedSplitAssigner.java
similarity index 57%
copy from flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
copy to flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/KeySharedSplitAssigner.java
index 1b7b4a6f446..9ef43476f71 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/KeySharedSplitAssigner.java
@@ -19,7 +19,6 @@
 package org.apache.flink.connector.pulsar.source.enumerator.assigner;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
@@ -32,14 +31,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
-/**
- * This assigner is used for {@link SubscriptionType#Failover}, {@link SubscriptionType#Exclusive}
- * and {@link SubscriptionType#Key_Shared} subscriptions.
- */
+/** This assigner is used for {@link SubscriptionType#Key_Shared} subscription. */
 @Internal
-class NonSharedSplitAssigner extends SplitAssignerBase {
+public class KeySharedSplitAssigner extends SplitAssignerBase {
 
-    public NonSharedSplitAssigner(
+    public KeySharedSplitAssigner(
             StopCursor stopCursor,
             boolean enablePartitionDiscovery,
             SplitEnumeratorContext<PulsarPartitionSplit> context,
@@ -52,10 +48,15 @@ class NonSharedSplitAssigner extends SplitAssignerBase {
         List<TopicPartition> newPartitions = new ArrayList<>();
 
         for (TopicPartition partition : fetchedPartitions) {
+            boolean shouldAssign = false;
             if (!appendedPartitions.contains(partition)) {
                 appendedPartitions.add(partition);
                 newPartitions.add(partition);
+                shouldAssign = true;
+            }
 
+            // Reassign the incoming splits when restarting from state.
+            if (shouldAssign || !initialized) {
                 // Calculate the reader id by the current parallelism.
                 int readerId = partitionOwner(partition);
                 PulsarPartitionSplit split = new PulsarPartitionSplit(partition, stopCursor);
@@ -72,41 +73,21 @@ class NonSharedSplitAssigner extends SplitAssignerBase {
 
     @Override
     public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
-        for (PulsarPartitionSplit split : splits) {
-            int readerId = partitionOwner(split.getPartition());
-            addSplitToPendingList(readerId, split);
+        if (splits.isEmpty()) {
+            // In case of the task failure. No splits will be put back to the enumerator.
+            for (TopicPartition partition : appendedPartitions) {
+                int readId = partitionOwner(partition);
+                if (readId == subtaskId) {
+                    PulsarPartitionSplit split = new PulsarPartitionSplit(partition, stopCursor);
+                    addSplitToPendingList(subtaskId, split);
+                }
+            }
+        } else {
+            // Manually put all the splits back to the enumerator.
+            for (PulsarPartitionSplit split : splits) {
+                int readerId = partitionOwner(split.getPartition());
+                addSplitToPendingList(readerId, split);
+            }
         }
     }
-
-    /**
-     * Returns the index of the target subtask that a specific partition should be assigned to. It's
-     * inspired by the {@code KafkaSourceEnumerator.getSplitOwner()}
-     *
-     * <p>The resulting distribution of partition has the following contract:
-     *
-     * <ul>
-     *   <li>1. Uniformly distributed across subtasks.
-     *   <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending subtask
-     *       indices) by using the partition id as the offset from a starting index (i.e., the index
-     *       of the subtask which partition 0 of the topic will be assigned to, determined using the
-     *       topic name).
-     * </ul>
-     *
-     * @param partition The Pulsar partition to assign.
-     * @return The id of the reader that owns this partition.
-     */
-    private int partitionOwner(TopicPartition partition) {
-        return calculatePartitionOwner(
-                partition.getTopic(), partition.getPartitionId(), context.currentParallelism());
-    }
-
-    @VisibleForTesting
-    static int calculatePartitionOwner(String topic, int partitionId, int parallelism) {
-        int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % parallelism;
-        /*
-         * Here, the assumption is that the id of Pulsar partitions are always ascending starting from
-         * 0. Therefore, can be used directly as the offset clockwise from the start index.
-         */
-        return (startIndex + partitionId) % parallelism;
-    }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
index 1b7b4a6f446..833477c638e 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
@@ -19,7 +19,6 @@
 package org.apache.flink.connector.pulsar.source.enumerator.assigner;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
@@ -77,36 +76,4 @@ class NonSharedSplitAssigner extends SplitAssignerBase {
             addSplitToPendingList(readerId, split);
         }
     }
-
-    /**
-     * Returns the index of the target subtask that a specific partition should be assigned to. It's
-     * inspired by the {@code KafkaSourceEnumerator.getSplitOwner()}
-     *
-     * <p>The resulting distribution of partition has the following contract:
-     *
-     * <ul>
-     *   <li>1. Uniformly distributed across subtasks.
-     *   <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending subtask
-     *       indices) by using the partition id as the offset from a starting index (i.e., the index
-     *       of the subtask which partition 0 of the topic will be assigned to, determined using the
-     *       topic name).
-     * </ul>
-     *
-     * @param partition The Pulsar partition to assign.
-     * @return The id of the reader that owns this partition.
-     */
-    private int partitionOwner(TopicPartition partition) {
-        return calculatePartitionOwner(
-                partition.getTopic(), partition.getPartitionId(), context.currentParallelism());
-    }
-
-    @VisibleForTesting
-    static int calculatePartitionOwner(String topic, int partitionId, int parallelism) {
-        int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % parallelism;
-        /*
-         * Here, the assumption is that the id of Pulsar partitions are always ascending starting from
-         * 0. Therefore, can be used directly as the offset clockwise from the start index.
-         */
-        return (startIndex + partitionId) % parallelism;
-    }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java
index 32af433acbb..733072ca7e4 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.pulsar.source.enumerator.assigner;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
 import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
@@ -99,4 +100,36 @@ abstract class SplitAssignerBase implements SplitAssigner {
                 pendingPartitionSplits.computeIfAbsent(readerId, i -> new HashSet<>());
         splits.add(split);
     }
+
+    /**
+     * Returns the index of the target subtask that a specific partition should be assigned to. It's
+     * inspired by the {@code KafkaSourceEnumerator.getSplitOwner()}
+     *
+     * <p>The resulting distribution of partition has the following contract:
+     *
+     * <ul>
+     *   <li>1. Uniformly distributed across subtasks.
+     *   <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending subtask
+     *       indices) by using the partition id as the offset from a starting index (i.e., the index
+     *       of the subtask which partition 0 of the topic will be assigned to, determined using the
+     *       topic name).
+     * </ul>
+     *
+     * @param partition The Pulsar partition to assign.
+     * @return The id of the reader that owns this partition.
+     */
+    protected int partitionOwner(TopicPartition partition) {
+        return calculatePartitionOwner(
+                partition.getTopic(), partition.getPartitionId(), context.currentParallelism());
+    }
+
+    @VisibleForTesting
+    static int calculatePartitionOwner(String topic, int partitionId, int parallelism) {
+        int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % parallelism;
+        /*
+         * Here, the assumption is that the id of Pulsar partitions are always ascending starting from
+         * 0. Therefore, can be used directly as the offset clockwise from the start index.
+         */
+        return (startIndex + partitionId) % parallelism;
+    }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
index 02f8934a6bf..4ade1e561a7 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
@@ -46,12 +46,14 @@ public final class SplitAssignerFactory {
         switch (subscriptionType) {
             case Failover:
             case Exclusive:
-            case Key_Shared:
                 return new NonSharedSplitAssigner(
                         stopCursor, enablePartitionDiscovery, context, enumState);
             case Shared:
                 return new SharedSplitAssigner(
                         stopCursor, enablePartitionDiscovery, context, enumState);
+            case Key_Shared:
+                return new KeySharedSplitAssigner(
+                        stopCursor, enablePartitionDiscovery, context, enumState);
             default:
                 throw new IllegalArgumentException(
                         "We don't support this subscription type: " + subscriptionType);
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java
index 58f8d8fc51c..bc2d669aeb9 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java
@@ -36,7 +36,7 @@ import java.util.Optional;
 import java.util.Set;
 
 import static java.util.Collections.singletonList;
-import static org.apache.flink.connector.pulsar.source.enumerator.assigner.NonSharedSplitAssigner.calculatePartitionOwner;
+import static org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerBase.calculatePartitionOwner;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java
index 1901787b342..7aa689e6106 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java
@@ -29,6 +29,7 @@ import org.apache.flink.connector.testframe.external.source.TestingSourceSetting
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
@@ -40,7 +41,6 @@ public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext
 
     public KeySharedSubscriptionContext(PulsarTestEnvironment environment) {
         super(environment);
-
         this.keyToRead = randomAlphabetic(8);
     }
 
@@ -60,7 +60,6 @@ public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext
     protected void setSourceBuilder(PulsarSourceBuilder<String> builder) {
         int keyHash = keyHash(keyToRead);
         TopicRange range = new TopicRange(keyHash, keyHash);
-
         builder.setRangeGenerator(new FixedRangeGenerator(singletonList(range)));
     }
 
@@ -76,6 +75,6 @@ public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext
 
     // This method is copied from Pulsar for calculating message key hash.
     private int keyHash(String key) {
-        return Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % RANGE_SIZE;
+        return Murmur3_32Hash.getInstance().makeHash(key.getBytes(UTF_8)) % RANGE_SIZE;
     }
 }