You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/10/23 01:52:35 UTC

samza git commit: SAMZA-797: Fix parsing errors in broadcast stream config values

Repository: samza
Updated Branches:
  refs/heads/master 775d79195 -> 6f2383273


SAMZA-797: Fix parsing errors in broadcast stream config values


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6f238327
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6f238327
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6f238327

Branch: refs/heads/master
Commit: 6f23832731ac08cfbe0a9fa412edd8ff0f70775f
Parents: 775d791
Author: Navina Ramesh <na...@gmail.com>
Authored: Thu Oct 22 16:44:06 2015 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Thu Oct 22 16:52:13 2015 -0700

----------------------------------------------------------------------
 .../org/apache/samza/config/TaskConfigJava.java | 48 +++++++++++---------
 .../apache/samza/config/TestTaskConfigJava.java | 14 +++++-
 2 files changed, 39 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/6f238327/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
index 015e994..8acb6ca 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
@@ -34,8 +34,8 @@ import org.slf4j.LoggerFactory;
 public class TaskConfigJava extends MapConfig {
   // broadcast streams consumed by all tasks. e.g. kafka.foo#1
   public static final String BROADCAST_INPUT_STREAMS = "task.broadcast.inputs";
-  private static final String BROADCAST_STREAM_PATTERN = "[^#\\.]+\\.[^#\\.]+#[\\d]+";
-  private static final String BROADCAST_STREAM_RANGE_PATTERN = "[^#\\.]+\\.[^#\\.]+#\\[[\\d]+\\-[\\d]+\\]+";
+  private static final String BROADCAST_STREAM_PATTERN = "^[\\d]+$";
+  private static final String BROADCAST_STREAM_RANGE_PATTERN = "^\\[[\\d]+\\-[\\d]+\\]$";
   public static final Logger LOGGER = LoggerFactory.getLogger(TaskConfigJava.class);
 
 
@@ -55,28 +55,32 @@ public class TaskConfigJava extends MapConfig {
     List<String> systemStreamPartitions = getList(BROADCAST_INPUT_STREAMS);
 
     for (String systemStreamPartition : systemStreamPartitions) {
-      if (Pattern.matches(BROADCAST_STREAM_PATTERN, systemStreamPartition)) {
-
-        int hashPosition = systemStreamPartition.indexOf("#");
-        SystemStream systemStream = Util.getSystemStreamFromNames(systemStreamPartition.substring(0, hashPosition));
-        systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(Integer.valueOf(systemStreamPartition.substring(hashPosition + 1)))));
-
-      } else if (Pattern.matches(BROADCAST_STREAM_RANGE_PATTERN, systemStreamPartition)) {
-
-        SystemStream systemStream = Util.getSystemStreamFromNames(systemStreamPartition.substring(0, systemStreamPartition.indexOf("#")));
-
-        int startingPartition = Integer.valueOf(systemStreamPartition.substring(systemStreamPartition.indexOf("[") + 1, systemStreamPartition.lastIndexOf("-")));
-        int endingPartition = Integer.valueOf(systemStreamPartition.substring(systemStreamPartition.lastIndexOf("-") + 1, systemStreamPartition.indexOf("]")));
-
-        if (startingPartition > endingPartition) {
-          LOGGER.warn("The starting partition in stream " + systemStream.toString() + " is bigger than the ending Partition. No partition is added");
-        }
-        for (int i = startingPartition; i <= endingPartition; i++) {
-          systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(i)));
-        }
-      } else {
+      int hashPosition = systemStreamPartition.indexOf("#");
+      if (hashPosition == -1) {
         throw new IllegalArgumentException("incorrect format in " + systemStreamPartition
             + ". Broadcast stream names should be in the form 'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'");
+      } else {
+        String systemStreamName = systemStreamPartition.substring(0, hashPosition);
+        String partitionSegment = systemStreamPartition.substring(hashPosition + 1);
+        SystemStream systemStream = Util.getSystemStreamFromNames(systemStreamName);
+
+        if (Pattern.matches(BROADCAST_STREAM_PATTERN, partitionSegment)) {
+          systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(Integer.valueOf(partitionSegment))));
+        } else {
+          if (Pattern.matches(BROADCAST_STREAM_RANGE_PATTERN, partitionSegment)) {
+            int partitionStart = Integer.valueOf(partitionSegment.substring(1, partitionSegment.lastIndexOf("-")));
+            int partitionEnd = Integer.valueOf(partitionSegment.substring(partitionSegment.lastIndexOf("-") + 1, partitionSegment.indexOf("]")));
+            if (partitionStart > partitionEnd) {
+              LOGGER.warn("The starting partition in stream " + systemStream.toString() + " is bigger than the ending Partition. No partition is added");
+            }
+            for (int i = partitionStart; i <= partitionEnd; i++) {
+              systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(i)));
+            }
+          } else {
+            throw new IllegalArgumentException("incorrect format in " + systemStreamPartition
+                + ". Broadcast stream names should be in the form 'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'");
+          }
+        }
       }
     }
     return systemStreamPartitionSet;

http://git-wip-us.apache.org/repos/asf/samza/blob/6f238327/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
index 2d6060e..878ca01 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
@@ -34,7 +34,7 @@ public class TestTaskConfigJava {
   @Test
   public void testGetBroadcastSystemStreamPartitions() {
     HashMap<String, String> map = new HashMap<String, String>();
-    map.put("task.broadcast.inputs", "kafka.foo#4, kafka.boo#5, kafka.z-o-o#[12-14]");
+    map.put("task.broadcast.inputs", "kafka.foo#4, kafka.boo#5, kafka.z-o-o#[12-14], kafka.foo.bar#[3-4]");
     Config config = new MapConfig(map);
 
     TaskConfigJava taskConfig = new TaskConfigJava(config);
@@ -46,6 +46,8 @@ public class TestTaskConfigJava {
     expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(12)));
     expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(13)));
     expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(14)));
+    expected.add(new SystemStreamPartition("kafka", "foo.bar", new Partition(3)));
+    expected.add(new SystemStreamPartition("kafka", "foo.bar", new Partition(4)));
     assertEquals(expected, systemStreamPartitionSet);
 
     map.put("task.broadcast.inputs", "kafka.foo");
@@ -57,5 +59,15 @@ public class TestTaskConfigJava {
       catchCorrectException = true;
     }
     assertTrue(catchCorrectException);
+
+    map.put("task.broadcast.inputs", "kafka.org.apache.events.WhitelistedIps#1-2");
+    taskConfig = new TaskConfigJava(new MapConfig(map));
+    boolean invalidFormatException = false;
+    try {
+      taskConfig.getBroadcastSystemStreamPartitions();
+    } catch (IllegalArgumentException e) {
+      invalidFormatException = true;
+    }
+    assertTrue(invalidFormatException);
   }
 }