You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/10/23 01:52:35 UTC
samza git commit: SAMZA-797: Fix parsing errors in broadcast stream
config values
Repository: samza
Updated Branches:
refs/heads/master 775d79195 -> 6f2383273
SAMZA-797: Fix parsing errors in broadcast stream config values
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6f238327
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6f238327
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6f238327
Branch: refs/heads/master
Commit: 6f23832731ac08cfbe0a9fa412edd8ff0f70775f
Parents: 775d791
Author: Navina Ramesh <na...@gmail.com>
Authored: Thu Oct 22 16:44:06 2015 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Thu Oct 22 16:52:13 2015 -0700
----------------------------------------------------------------------
.../org/apache/samza/config/TaskConfigJava.java | 48 +++++++++++---------
.../apache/samza/config/TestTaskConfigJava.java | 14 +++++-
2 files changed, 39 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/6f238327/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
index 015e994..8acb6ca 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
@@ -34,8 +34,8 @@ import org.slf4j.LoggerFactory;
public class TaskConfigJava extends MapConfig {
// broadcast streams consumed by all tasks. e.g. kafka.foo#1
public static final String BROADCAST_INPUT_STREAMS = "task.broadcast.inputs";
- private static final String BROADCAST_STREAM_PATTERN = "[^#\\.]+\\.[^#\\.]+#[\\d]+";
- private static final String BROADCAST_STREAM_RANGE_PATTERN = "[^#\\.]+\\.[^#\\.]+#\\[[\\d]+\\-[\\d]+\\]+";
+ private static final String BROADCAST_STREAM_PATTERN = "^[\\d]+$";
+ private static final String BROADCAST_STREAM_RANGE_PATTERN = "^\\[[\\d]+\\-[\\d]+\\]$";
public static final Logger LOGGER = LoggerFactory.getLogger(TaskConfigJava.class);
@@ -55,28 +55,32 @@ public class TaskConfigJava extends MapConfig {
List<String> systemStreamPartitions = getList(BROADCAST_INPUT_STREAMS);
for (String systemStreamPartition : systemStreamPartitions) {
- if (Pattern.matches(BROADCAST_STREAM_PATTERN, systemStreamPartition)) {
-
- int hashPosition = systemStreamPartition.indexOf("#");
- SystemStream systemStream = Util.getSystemStreamFromNames(systemStreamPartition.substring(0, hashPosition));
- systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(Integer.valueOf(systemStreamPartition.substring(hashPosition + 1)))));
-
- } else if (Pattern.matches(BROADCAST_STREAM_RANGE_PATTERN, systemStreamPartition)) {
-
- SystemStream systemStream = Util.getSystemStreamFromNames(systemStreamPartition.substring(0, systemStreamPartition.indexOf("#")));
-
- int startingPartition = Integer.valueOf(systemStreamPartition.substring(systemStreamPartition.indexOf("[") + 1, systemStreamPartition.lastIndexOf("-")));
- int endingPartition = Integer.valueOf(systemStreamPartition.substring(systemStreamPartition.lastIndexOf("-") + 1, systemStreamPartition.indexOf("]")));
-
- if (startingPartition > endingPartition) {
- LOGGER.warn("The starting partition in stream " + systemStream.toString() + " is bigger than the ending Partition. No partition is added");
- }
- for (int i = startingPartition; i <= endingPartition; i++) {
- systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(i)));
- }
- } else {
+ int hashPosition = systemStreamPartition.indexOf("#");
+ if (hashPosition == -1) {
throw new IllegalArgumentException("incorrect format in " + systemStreamPartition
+ ". Broadcast stream names should be in the form 'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'");
+ } else {
+ String systemStreamName = systemStreamPartition.substring(0, hashPosition);
+ String partitionSegment = systemStreamPartition.substring(hashPosition + 1);
+ SystemStream systemStream = Util.getSystemStreamFromNames(systemStreamName);
+
+ if (Pattern.matches(BROADCAST_STREAM_PATTERN, partitionSegment)) {
+ systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(Integer.valueOf(partitionSegment))));
+ } else {
+ if (Pattern.matches(BROADCAST_STREAM_RANGE_PATTERN, partitionSegment)) {
+ int partitionStart = Integer.valueOf(partitionSegment.substring(1, partitionSegment.lastIndexOf("-")));
+ int partitionEnd = Integer.valueOf(partitionSegment.substring(partitionSegment.lastIndexOf("-") + 1, partitionSegment.indexOf("]")));
+ if (partitionStart > partitionEnd) {
+ LOGGER.warn("The starting partition in stream " + systemStream.toString() + " is bigger than the ending Partition. No partition is added");
+ }
+ for (int i = partitionStart; i <= partitionEnd; i++) {
+ systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(i)));
+ }
+ } else {
+ throw new IllegalArgumentException("incorrect format in " + systemStreamPartition
+ + ". Broadcast stream names should be in the form 'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'");
+ }
+ }
}
}
return systemStreamPartitionSet;
http://git-wip-us.apache.org/repos/asf/samza/blob/6f238327/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
index 2d6060e..878ca01 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
@@ -34,7 +34,7 @@ public class TestTaskConfigJava {
@Test
public void testGetBroadcastSystemStreamPartitions() {
HashMap<String, String> map = new HashMap<String, String>();
- map.put("task.broadcast.inputs", "kafka.foo#4, kafka.boo#5, kafka.z-o-o#[12-14]");
+ map.put("task.broadcast.inputs", "kafka.foo#4, kafka.boo#5, kafka.z-o-o#[12-14], kafka.foo.bar#[3-4]");
Config config = new MapConfig(map);
TaskConfigJava taskConfig = new TaskConfigJava(config);
@@ -46,6 +46,8 @@ public class TestTaskConfigJava {
expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(12)));
expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(13)));
expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(14)));
+ expected.add(new SystemStreamPartition("kafka", "foo.bar", new Partition(3)));
+ expected.add(new SystemStreamPartition("kafka", "foo.bar", new Partition(4)));
assertEquals(expected, systemStreamPartitionSet);
map.put("task.broadcast.inputs", "kafka.foo");
@@ -57,5 +59,15 @@ public class TestTaskConfigJava {
catchCorrectException = true;
}
assertTrue(catchCorrectException);
+
+ map.put("task.broadcast.inputs", "kafka.org.apache.events.WhitelistedIps#1-2");
+ taskConfig = new TaskConfigJava(new MapConfig(map));
+ boolean invalidFormatException = false;
+ try {
+ taskConfig.getBroadcastSystemStreamPartitions();
+ } catch (IllegalArgumentException e) {
+ invalidFormatException = true;
+ }
+ assertTrue(invalidFormatException);
}
}