You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jg...@apache.org on 2014/03/19 20:17:49 UTC

git commit: SMAZA-194: createStreamPartitionString should not use characters that overlap with uris

Repository: incubator-samza
Updated Branches:
  refs/heads/master bfd97fce0 -> 464a7e27d


SMAZA-194: createStreamPartitionString should not use characters that overlap with uris


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/464a7e27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/464a7e27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/464a7e27

Branch: refs/heads/master
Commit: 464a7e27d436b526ad210ec8d0e8e5223ac328f2
Parents: bfd97fc
Author: Jakob Glen Homan <jg...@apache.org>
Authored: Wed Mar 19 12:17:40 2014 -0700
Committer: Jakob Glen Homan <jg...@apache.org>
Committed: Wed Mar 19 12:17:40 2014 -0700

----------------------------------------------------------------------
 .../src/main/scala/org/apache/samza/util/Util.scala    | 13 ++++++++-----
 .../test/scala/org/apache/samza/util/TestUtil.scala    |  2 +-
 2 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/464a7e27/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 5b429df..24a954c 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -140,6 +140,9 @@ object Util extends Logging {
     ssp.filter(_.getPartition.getPartitionId % containerCount == containerId)
   }
 
+  val partitionSeparator = ";"
+  val topicSeparator = ","
+  val topicStreamGrouper = "#"
   /**
    * Serialize a collection of stream-partitions to a string suitable for passing between processes.
    * The streams will be grouped by partition. The partition will be separated from the topics by
@@ -153,13 +156,13 @@ object Util extends Logging {
    */
   def createStreamPartitionString(sp: Set[SystemStreamPartition]): String = {
     for (
-      ch <- List(':', ',', '/');
+      ch <- List(partitionSeparator, topicSeparator, topicStreamGrouper);
       s <- sp
     ) {
       if (s.getStream.contains(ch)) throw new IllegalArgumentException(s + " contains illegal character " + ch)
     }
 
-    sp.groupBy(_.getPartition).map(z => z._1.getPartitionId + ":" + z._2.map(y => y.getSystem + "." + y.getStream).mkString(",")).mkString("/")
+    sp.groupBy(_.getPartition).map(z => z._1.getPartitionId + partitionSeparator + z._2.map(y => y.getSystem + "." + y.getStream).mkString(topicSeparator)).mkString(topicStreamGrouper)
 
   }
 
@@ -174,14 +177,14 @@ object Util extends Logging {
     if (sp == null || sp.isEmpty) return Set.empty
 
     def splitPartitionGroup(pg: String) = {
-      val split = pg.split(":") // Seems like there should be a more scalar way of doing this
+      val split = pg.split(partitionSeparator) // Seems like there should be a more scalar way of doing this
       val part = split(0).toInt
-      val streams = split(1).split(",").toList
+      val streams = split(1).split(topicSeparator).toList
 
       streams.map(s => new SystemStreamPartition(getSystemStreamFromNames(s), new Partition(part))).toSet
     }
 
-    sp.split("/").map(splitPartitionGroup(_)).toSet.flatten
+    sp.split(topicStreamGrouper).map(splitPartitionGroup(_)).toSet.flatten
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/464a7e27/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index 60c9615..1bfd63c 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -87,7 +87,7 @@ class TestUtil {
   @Test
   def testCreateStreamPartitionStringBlocksDelimeters() {
     val partOne = new Partition(1)
-    val toTry = List(':', ',', '/')
+    val toTry = List(Util.topicSeparator, Util.topicStreamGrouper, Util.partitionSeparator)
       .map(ch => (ch, Set(new SystemStreamPartition("kafka", "good1", partOne),
       new SystemStreamPartition("kafka", "bad" + ch, partOne),
       new SystemStreamPartition("notkafka", "alsogood", partOne))))