You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2022/09/30 03:46:43 UTC
[flink] 05/05: [FLINK-29381][Connector/Pulsar] Fixes the split assignment for Key Shared subscription.
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 33b290ec9c4426a5cd08f8749256c55e7e20e93a
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Fri Sep 30 01:40:14 2022 +0800
[FLINK-29381][Connector/Pulsar] Fixes the split assignment for Key Shared subscription.
---
...itAssigner.java => KeySharedSplitAssigner.java} | 65 ++++++++--------------
.../assigner/NonSharedSplitAssigner.java | 33 -----------
.../enumerator/assigner/SplitAssignerBase.java | 33 +++++++++++
.../enumerator/assigner/SplitAssignerFactory.java | 4 +-
.../assigner/NonSharedSplitAssignerTest.java | 2 +-
.../source/cases/KeySharedSubscriptionContext.java | 5 +-
6 files changed, 62 insertions(+), 80 deletions(-)
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/KeySharedSplitAssigner.java
similarity index 57%
copy from flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
copy to flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/KeySharedSplitAssigner.java
index 1b7b4a6f446..9ef43476f71 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/KeySharedSplitAssigner.java
@@ -19,7 +19,6 @@
package org.apache.flink.connector.pulsar.source.enumerator.assigner;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
@@ -32,14 +31,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-/**
- * This assigner is used for {@link SubscriptionType#Failover}, {@link SubscriptionType#Exclusive}
- * and {@link SubscriptionType#Key_Shared} subscriptions.
- */
+/** This assigner is used for {@link SubscriptionType#Key_Shared} subscription. */
@Internal
-class NonSharedSplitAssigner extends SplitAssignerBase {
+public class KeySharedSplitAssigner extends SplitAssignerBase {
- public NonSharedSplitAssigner(
+ public KeySharedSplitAssigner(
StopCursor stopCursor,
boolean enablePartitionDiscovery,
SplitEnumeratorContext<PulsarPartitionSplit> context,
@@ -52,10 +48,15 @@ class NonSharedSplitAssigner extends SplitAssignerBase {
List<TopicPartition> newPartitions = new ArrayList<>();
for (TopicPartition partition : fetchedPartitions) {
+ boolean shouldAssign = false;
if (!appendedPartitions.contains(partition)) {
appendedPartitions.add(partition);
newPartitions.add(partition);
+ shouldAssign = true;
+ }
+ // Reassign the incoming splits when restarting from state.
+ if (shouldAssign || !initialized) {
// Calculate the reader id by the current parallelism.
int readerId = partitionOwner(partition);
PulsarPartitionSplit split = new PulsarPartitionSplit(partition, stopCursor);
@@ -72,41 +73,21 @@ class NonSharedSplitAssigner extends SplitAssignerBase {
@Override
public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
- for (PulsarPartitionSplit split : splits) {
- int readerId = partitionOwner(split.getPartition());
- addSplitToPendingList(readerId, split);
+ if (splits.isEmpty()) {
+ // In case of the task failure. No splits will be put back to the enumerator.
+ for (TopicPartition partition : appendedPartitions) {
+ int readId = partitionOwner(partition);
+ if (readId == subtaskId) {
+ PulsarPartitionSplit split = new PulsarPartitionSplit(partition, stopCursor);
+ addSplitToPendingList(subtaskId, split);
+ }
+ }
+ } else {
+ // Manually put all the splits back to the enumerator.
+ for (PulsarPartitionSplit split : splits) {
+ int readerId = partitionOwner(split.getPartition());
+ addSplitToPendingList(readerId, split);
+ }
}
}
-
- /**
- * Returns the index of the target subtask that a specific partition should be assigned to. It's
- * inspired by the {@code KafkaSourceEnumerator.getSplitOwner()}
- *
- * <p>The resulting distribution of partition has the following contract:
- *
- * <ul>
- * <li>1. Uniformly distributed across subtasks.
- * <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending subtask
- * indices) by using the partition id as the offset from a starting index (i.e., the index
- * of the subtask which partition 0 of the topic will be assigned to, determined using the
- * topic name).
- * </ul>
- *
- * @param partition The Pulsar partition to assign.
- * @return The id of the reader that owns this partition.
- */
- private int partitionOwner(TopicPartition partition) {
- return calculatePartitionOwner(
- partition.getTopic(), partition.getPartitionId(), context.currentParallelism());
- }
-
- @VisibleForTesting
- static int calculatePartitionOwner(String topic, int partitionId, int parallelism) {
- int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % parallelism;
- /*
- * Here, the assumption is that the id of Pulsar partitions are always ascending starting from
- * 0. Therefore, can be used directly as the offset clockwise from the start index.
- */
- return (startIndex + partitionId) % parallelism;
- }
}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
index 1b7b4a6f446..833477c638e 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
@@ -19,7 +19,6 @@
package org.apache.flink.connector.pulsar.source.enumerator.assigner;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
@@ -77,36 +76,4 @@ class NonSharedSplitAssigner extends SplitAssignerBase {
addSplitToPendingList(readerId, split);
}
}
-
- /**
- * Returns the index of the target subtask that a specific partition should be assigned to. It's
- * inspired by the {@code KafkaSourceEnumerator.getSplitOwner()}
- *
- * <p>The resulting distribution of partition has the following contract:
- *
- * <ul>
- * <li>1. Uniformly distributed across subtasks.
- * <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending subtask
- * indices) by using the partition id as the offset from a starting index (i.e., the index
- * of the subtask which partition 0 of the topic will be assigned to, determined using the
- * topic name).
- * </ul>
- *
- * @param partition The Pulsar partition to assign.
- * @return The id of the reader that owns this partition.
- */
- private int partitionOwner(TopicPartition partition) {
- return calculatePartitionOwner(
- partition.getTopic(), partition.getPartitionId(), context.currentParallelism());
- }
-
- @VisibleForTesting
- static int calculatePartitionOwner(String topic, int partitionId, int parallelism) {
- int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % parallelism;
- /*
- * Here, the assumption is that the id of Pulsar partitions are always ascending starting from
- * 0. Therefore, can be used directly as the offset clockwise from the start index.
- */
- return (startIndex + partitionId) % parallelism;
- }
}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java
index 32af433acbb..733072ca7e4 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
@@ -99,4 +100,36 @@ abstract class SplitAssignerBase implements SplitAssigner {
pendingPartitionSplits.computeIfAbsent(readerId, i -> new HashSet<>());
splits.add(split);
}
+
+ /**
+ * Returns the index of the target subtask that a specific partition should be assigned to. It's
+ * inspired by the {@code KafkaSourceEnumerator.getSplitOwner()}
+ *
+ * <p>The resulting distribution of partition has the following contract:
+ *
+ * <ul>
+ * <li>1. Uniformly distributed across subtasks.
+ * <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending subtask
+ * indices) by using the partition id as the offset from a starting index (i.e., the index
+ * of the subtask which partition 0 of the topic will be assigned to, determined using the
+ * topic name).
+ * </ul>
+ *
+ * @param partition The Pulsar partition to assign.
+ * @return The id of the reader that owns this partition.
+ */
+ protected int partitionOwner(TopicPartition partition) {
+ return calculatePartitionOwner(
+ partition.getTopic(), partition.getPartitionId(), context.currentParallelism());
+ }
+
+ @VisibleForTesting
+ static int calculatePartitionOwner(String topic, int partitionId, int parallelism) {
+ int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % parallelism;
+ /*
+ * Here, the assumption is that the id of Pulsar partitions are always ascending starting from
+ * 0. Therefore, can be used directly as the offset clockwise from the start index.
+ */
+ return (startIndex + partitionId) % parallelism;
+ }
}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
index 02f8934a6bf..4ade1e561a7 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
@@ -46,12 +46,14 @@ public final class SplitAssignerFactory {
switch (subscriptionType) {
case Failover:
case Exclusive:
- case Key_Shared:
return new NonSharedSplitAssigner(
stopCursor, enablePartitionDiscovery, context, enumState);
case Shared:
return new SharedSplitAssigner(
stopCursor, enablePartitionDiscovery, context, enumState);
+ case Key_Shared:
+ return new KeySharedSplitAssigner(
+ stopCursor, enablePartitionDiscovery, context, enumState);
default:
throw new IllegalArgumentException(
"We don't support this subscription type: " + subscriptionType);
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java
index 58f8d8fc51c..bc2d669aeb9 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java
@@ -36,7 +36,7 @@ import java.util.Optional;
import java.util.Set;
import static java.util.Collections.singletonList;
-import static org.apache.flink.connector.pulsar.source.enumerator.assigner.NonSharedSplitAssigner.calculatePartitionOwner;
+import static org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerBase.calculatePartitionOwner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java
index 1901787b342..7aa689e6106 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java
@@ -29,6 +29,7 @@ import org.apache.flink.connector.testframe.external.source.TestingSourceSetting
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.util.Murmur3_32Hash;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.singletonList;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
@@ -40,7 +41,6 @@ public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext
public KeySharedSubscriptionContext(PulsarTestEnvironment environment) {
super(environment);
-
this.keyToRead = randomAlphabetic(8);
}
@@ -60,7 +60,6 @@ public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext
protected void setSourceBuilder(PulsarSourceBuilder<String> builder) {
int keyHash = keyHash(keyToRead);
TopicRange range = new TopicRange(keyHash, keyHash);
-
builder.setRangeGenerator(new FixedRangeGenerator(singletonList(range)));
}
@@ -76,6 +75,6 @@ public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext
// This method is copied from Pulsar for calculating message key hash.
private int keyHash(String key) {
- return Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % RANGE_SIZE;
+ return Murmur3_32Hash.getInstance().makeHash(key.getBytes(UTF_8)) % RANGE_SIZE;
}
}