You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/08/28 11:49:30 UTC

[flink] branch release-1.8 updated: [FLINK-13789][kafka] Simplify transactional ID generation

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new d22e15c  [FLINK-13789][kafka] Simplify transactional ID generation
d22e15c is described below

commit d22e15cce1b9098244c800b551e80632c0f5c346
Author: haodang <hd...@gmail.com>
AuthorDate: Wed Aug 28 04:47:02 2019 -0700

    [FLINK-13789][kafka] Simplify transactional ID generation
    
    Remove String.format usage to prevent unexpected behaviors if the configured prefix contains format specifiers.
---
 .../connectors/kafka/internal/TransactionalIdsGenerator.java |  2 +-
 .../kafka/internal/TransactionalIdsGeneratorTest.java        | 12 ++++++++++++
 .../connectors/kafka/internal/TransactionalIdsGenerator.java |  2 +-
 3 files changed, 14 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
index ffebf56..29e8524 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
@@ -91,6 +91,6 @@ public class TransactionalIdsGenerator {
 	}
 
 	private String generateTransactionalId(long transactionalId) {
-		return String.format(prefix + "-%d", transactionalId);
+		return prefix + "-" + transactionalId;
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
index 7ad3779..2ffb2c2 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
@@ -46,6 +46,18 @@ public class TransactionalIdsGeneratorTest {
 	}
 
 	/**
+	 * Task name may contain percent sign which should not impact the formatter.
+	 */
+	@Test
+	public void testTaskNameMayContainPercentSign() {
+		TransactionalIdsGenerator generator = new TransactionalIdsGenerator("%pattern%", 2, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR);
+
+		assertEquals(
+			new HashSet<>(Arrays.asList("%pattern%-42", "%pattern%-43", "%pattern%-44")),
+			generator.generateIdsToUse(36));
+	}
+
+	/**
 	 * Ids to abort and to use should never clash between subtasks.
 	 */
 	@Test
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
index ffebf56..29e8524 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
@@ -91,6 +91,6 @@ public class TransactionalIdsGenerator {
 	}
 
 	private String generateTransactionalId(long transactionalId) {
-		return String.format(prefix + "-%d", transactionalId);
+		return prefix + "-" + transactionalId;
 	}
 }