You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/08/28 11:47:53 UTC
[flink] branch release-1.9 updated: [FLINK-13789][kafka] Simplify
transactional ID generation
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new c1716b0 [FLINK-13789][kafka] Simplify transactional ID generation
c1716b0 is described below
commit c1716b0ed45baf132b2c1eccfe834c996bc0c6ec
Author: haodang <hd...@gmail.com>
AuthorDate: Wed Aug 28 04:47:02 2019 -0700
[FLINK-13789][kafka] Simplify transactional ID generation
Remove String.format usage to prevent unexpected behaviors if the configured prefix contains format specifiers.
---
.../connectors/kafka/internal/TransactionalIdsGenerator.java | 2 +-
.../kafka/internal/TransactionalIdsGeneratorTest.java | 12 ++++++++++++
.../connectors/kafka/internal/TransactionalIdsGenerator.java | 2 +-
3 files changed, 14 insertions(+), 2 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
index ffebf56..29e8524 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
@@ -91,6 +91,6 @@ public class TransactionalIdsGenerator {
}
private String generateTransactionalId(long transactionalId) {
- return String.format(prefix + "-%d", transactionalId);
+ return prefix + "-" + transactionalId;
}
}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
index 7ad3779..2ffb2c2 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
@@ -46,6 +46,18 @@ public class TransactionalIdsGeneratorTest {
}
/**
+ * Task name may contain percent sign which should not impact the formatter.
+ */
+ @Test
+ public void testTaskNameMayContainPercentSign() {
+ TransactionalIdsGenerator generator = new TransactionalIdsGenerator("%pattern%", 2, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR);
+
+ assertEquals(
+ new HashSet<>(Arrays.asList("%pattern%-42", "%pattern%-43", "%pattern%-44")),
+ generator.generateIdsToUse(36));
+ }
+
+ /**
* Ids to abort and to use should never clash between subtasks.
*/
@Test
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
index ffebf56..29e8524 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
@@ -91,6 +91,6 @@ public class TransactionalIdsGenerator {
}
private String generateTransactionalId(long transactionalId) {
- return String.format(prefix + "-%d", transactionalId);
+ return prefix + "-" + transactionalId;
}
}