You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jg...@apache.org on 2014/03/19 20:17:49 UTC
git commit: SMAZA-194: createStreamPartitionString should not use
characters that overlap with uris
Repository: incubator-samza
Updated Branches:
refs/heads/master bfd97fce0 -> 464a7e27d
SMAZA-194: createStreamPartitionString should not use characters that overlap with uris
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/464a7e27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/464a7e27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/464a7e27
Branch: refs/heads/master
Commit: 464a7e27d436b526ad210ec8d0e8e5223ac328f2
Parents: bfd97fc
Author: Jakob Glen Homan <jg...@apache.org>
Authored: Wed Mar 19 12:17:40 2014 -0700
Committer: Jakob Glen Homan <jg...@apache.org>
Committed: Wed Mar 19 12:17:40 2014 -0700
----------------------------------------------------------------------
.../src/main/scala/org/apache/samza/util/Util.scala | 13 ++++++++-----
.../test/scala/org/apache/samza/util/TestUtil.scala | 2 +-
2 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/464a7e27/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 5b429df..24a954c 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -140,6 +140,9 @@ object Util extends Logging {
ssp.filter(_.getPartition.getPartitionId % containerCount == containerId)
}
+ val partitionSeparator = ";"
+ val topicSeparator = ","
+ val topicStreamGrouper = "#"
/**
* Serialize a collection of stream-partitions to a string suitable for passing between processes.
* The streams will be grouped by partition. The partition will be separated from the topics by
@@ -153,13 +156,13 @@ object Util extends Logging {
*/
def createStreamPartitionString(sp: Set[SystemStreamPartition]): String = {
for (
- ch <- List(':', ',', '/');
+ ch <- List(partitionSeparator, topicSeparator, topicStreamGrouper);
s <- sp
) {
if (s.getStream.contains(ch)) throw new IllegalArgumentException(s + " contains illegal character " + ch)
}
- sp.groupBy(_.getPartition).map(z => z._1.getPartitionId + ":" + z._2.map(y => y.getSystem + "." + y.getStream).mkString(",")).mkString("/")
+ sp.groupBy(_.getPartition).map(z => z._1.getPartitionId + partitionSeparator + z._2.map(y => y.getSystem + "." + y.getStream).mkString(topicSeparator)).mkString(topicStreamGrouper)
}
@@ -174,14 +177,14 @@ object Util extends Logging {
if (sp == null || sp.isEmpty) return Set.empty
def splitPartitionGroup(pg: String) = {
- val split = pg.split(":") // Seems like there should be a more scalar way of doing this
+ val split = pg.split(partitionSeparator) // Seems like there should be a more scalar way of doing this
val part = split(0).toInt
- val streams = split(1).split(",").toList
+ val streams = split(1).split(topicSeparator).toList
streams.map(s => new SystemStreamPartition(getSystemStreamFromNames(s), new Partition(part))).toSet
}
- sp.split("/").map(splitPartitionGroup(_)).toSet.flatten
+ sp.split(topicStreamGrouper).map(splitPartitionGroup(_)).toSet.flatten
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/464a7e27/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index 60c9615..1bfd63c 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -87,7 +87,7 @@ class TestUtil {
@Test
def testCreateStreamPartitionStringBlocksDelimeters() {
val partOne = new Partition(1)
- val toTry = List(':', ',', '/')
+ val toTry = List(Util.topicSeparator, Util.topicStreamGrouper, Util.partitionSeparator)
.map(ch => (ch, Set(new SystemStreamPartition("kafka", "good1", partOne),
new SystemStreamPartition("kafka", "bad" + ch, partOne),
new SystemStreamPartition("notkafka", "alsogood", partOne))))