You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/07/31 01:47:58 UTC

[1/2] git commit: SAMZA-359; refactor grouper package names

Repository: incubator-samza
Updated Branches:
  refs/heads/master 3a8e2f9d1 -> cc11e02dd


SAMZA-359; refactor grouper package names


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/2c1391f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/2c1391f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/2c1391f6

Branch: refs/heads/master
Commit: 2c1391f6721c90e0a004ee25f746e2d627ccf2a0
Parents: 3a8e2f9
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Jul 30 15:58:40 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Jul 30 15:58:40 2014 -0700

----------------------------------------------------------------------
 .../container/SystemStreamPartitionGrouper.java | 40 --------------
 .../SystemStreamPartitionGrouperFactory.java    | 28 ----------
 .../org/apache/samza/container/TaskName.java    |  2 +-
 .../stream/SystemStreamPartitionGrouper.java    | 41 ++++++++++++++
 .../SystemStreamPartitionGrouperFactory.java    | 28 ++++++++++
 .../org/apache/samza/config/JobConfig.scala     |  2 +-
 .../SystemStreamPartitionTaskNameGrouper.scala  | 38 -------------
 .../grouper/stream/GroupByPartition.scala       | 41 ++++++++++++++
 .../stream/GroupBySystemStreamPartition.scala   | 38 +++++++++++++
 .../grouper/task/GroupByContainerCount.scala    | 50 +++++++++++++++++
 .../grouper/task/TaskNameGrouper.scala          | 40 ++++++++++++++
 .../groupers/GroupByPartition.scala             | 41 --------------
 .../groupers/GroupBySystemStreamPartition.scala | 38 -------------
 ...leSystemStreamPartitionTaskNameGrouper.scala | 50 -----------------
 .../main/scala/org/apache/samza/util/Util.scala |  9 ++--
 .../SystemStreamPartitionGrouperTestBase.scala  | 57 --------------------
 .../grouper/stream/GroupByTestBase.scala        | 57 ++++++++++++++++++++
 .../grouper/stream/TestGroupByPartition.scala   | 37 +++++++++++++
 .../TestGroupBySystemStreamPartition.scala      | 41 ++++++++++++++
 .../task/TestGroupByContainerCount.scala        | 54 +++++++++++++++++++
 .../groupers/TestGroupByPartition.scala         | 37 -------------
 .../TestGroupBySystemStreamPartition.scala      | 41 --------------
 ...leSystemStreamPartitionTaskNameGrouper.scala | 54 -------------------
 .../kafka/TestKafkaCheckpointManager.scala      |  2 +-
 24 files changed, 435 insertions(+), 431 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java b/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java
deleted file mode 100644
index 897d9f5..0000000
--- a/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container;
-
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Group a set of SystemStreamPartitions into logical taskNames that share a common characteristic, defined
- * by the implementation.  Each taskName has a key that uniquely describes what sets may be in it, but does
- * not generally enumerate the elements of those sets.  For example, a SystemStreamPartitionGrouper that
- * groups SystemStreamPartitions (each with 4 partitions) by their partition, would end up generating
- * four TaskNames: 0, 1, 2, 3.  These TaskNames describe the partitions but do not list all of the
- * SystemStreamPartitions, which allows new SystemStreamPartitions to be added later without changing
- * the definition of the TaskNames, assuming these new SystemStreamPartitions do not have more than
- * four partitions.  On the other hand, a SystemStreamPartitionGrouper that wanted each SystemStreamPartition
- * to be its own, unique group would use the SystemStreamPartition's entire description to generate
- * the TaskNames.
- */
-public interface SystemStreamPartitionGrouper {
-  public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps);
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java b/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java
deleted file mode 100644
index 10ac6e2..0000000
--- a/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container;
-
-import org.apache.samza.config.Config;
-
-/**
- * Return an instance a SystemStreamPartitionGrouper per the particular implementation
- */
-public interface SystemStreamPartitionGrouperFactory {
-  public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config);
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-api/src/main/java/org/apache/samza/container/TaskName.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/TaskName.java b/samza-api/src/main/java/org/apache/samza/container/TaskName.java
index 13a1206..0833586 100644
--- a/samza-api/src/main/java/org/apache/samza/container/TaskName.java
+++ b/samza-api/src/main/java/org/apache/samza/container/TaskName.java
@@ -20,7 +20,7 @@ package org.apache.samza.container;
 
 /**
  * A unique identifier of a set of a SystemStreamPartitions that have been grouped by
- * a {@link org.apache.samza.container.SystemStreamPartitionGrouper}.  The
+ * a {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}.  The
  * SystemStreamPartitionGrouper determines the TaskName for each set it creates.
  */
 public class TaskName implements Comparable<TaskName> {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.java b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.java
new file mode 100644
index 0000000..f374c4b
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream;
+
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Group a set of SystemStreamPartitions into logical taskNames that share a common characteristic, defined
+ * by the implementation.  Each taskName has a key that uniquely describes what sets may be in it, but does
+ * not generally enumerate the elements of those sets.  For example, a SystemStreamPartitionGrouper that
+ * groups SystemStreamPartitions (each with 4 partitions) by their partition, would end up generating
+ * four TaskNames: 0, 1, 2, 3.  These TaskNames describe the partitions but do not list all of the
+ * SystemStreamPartitions, which allows new SystemStreamPartitions to be added later without changing
+ * the definition of the TaskNames, assuming these new SystemStreamPartitions do not have more than
+ * four partitions.  On the other hand, a SystemStreamPartitionGrouper that wanted each SystemStreamPartition
+ * to be its own, unique group would use the SystemStreamPartition's entire description to generate
+ * the TaskNames.
+ */
+public interface SystemStreamPartitionGrouper {
+  public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouperFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouperFactory.java b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouperFactory.java
new file mode 100644
index 0000000..6c0b12d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouperFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream;
+
+import org.apache.samza.config.Config;
+
+/**
+ * Return an instance a SystemStreamPartitionGrouper per the particular implementation
+ */
+public interface SystemStreamPartitionGrouperFactory {
+  public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index f84aeea..3b6685e 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -19,7 +19,7 @@
 
 package org.apache.samza.config
 
-import org.apache.samza.container.systemstreampartition.groupers.GroupByPartitionFactory
+import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
 
 object JobConfig {
   // job config constants

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala b/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala
deleted file mode 100644
index a8c93ac..0000000
--- a/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container
-
-/**
- * After the input SystemStreamPartitions have been mapped to their TaskNames by an implementation of
- * {@link org.apache.samza.container.SystemStreamPartitionGrouper}, we can then map those groupings onto
- * the {@link org.apache.samza.container.SamzaContainer}s on which they will run.  This class takes
- * those groupings-of-SSPs and groups them together on which container each should run on.  A simple
- * implementation could assign each TaskNamesToSystemStreamPartition to a separate container.  More
- * advanced implementations could examine the TaskNamesToSystemStreamPartition to group by them
- * by data locality, anti-affinity, even distribution of expected bandwidth consumption, etc.
- */
-trait SystemStreamPartitionTaskNameGrouper {
-  /**
-   * Group TaskNamesToSystemStreamPartitions onto the containers they will share
-   *
-   * @param taskNames Pre-grouped SSPs
-   * @return Mapping of container ID to set if TaskNames it will run
-   */
-  def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions]
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
new file mode 100644
index 0000000..44e95fc
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream
+
+import org.apache.samza.container.TaskName
+import java.util
+import org.apache.samza.system.SystemStreamPartition
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+import org.apache.samza.config.Config
+
+/**
+ * Group the {@link org.apache.samza.system.SystemStreamPartition}s by their Partition, with the key being
+ * the string representation of the Partition.
+ */
+class GroupByPartition extends SystemStreamPartitionGrouper {
+  override def group(ssps: util.Set[SystemStreamPartition]) = {
+    ssps.groupBy( s => new TaskName("Partition " + s.getPartition.getPartitionId) )
+      .map(r => r._1 -> r._2.asJava)
+  }
+}
+
+class GroupByPartitionFactory extends SystemStreamPartitionGrouperFactory {
+  override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupByPartition
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
new file mode 100644
index 0000000..3c0acad
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream
+
+import org.apache.samza.container.TaskName
+import java.util
+import org.apache.samza.system.SystemStreamPartition
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+import org.apache.samza.config.Config
+
+/**
+ * Group the {@link org.apache.samza.system.SystemStreamPartition}s by themselves, effectively putting each
+ * SystemStreamPartition into its own group, with the key being the string representation of the SystemStringPartition
+ */
+class GroupBySystemStreamPartition extends SystemStreamPartitionGrouper {
+  override def group(ssps: util.Set[SystemStreamPartition]) = ssps.groupBy({ s=> new TaskName(s.toString)}).map(r => r._1 -> r._2.asJava)
+}
+
+class GroupBySystemStreamPartitionFactory extends SystemStreamPartitionGrouperFactory {
+  override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupBySystemStreamPartition
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
new file mode 100644
index 0000000..7a3ba46
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task
+
+import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions}
+import org.apache.samza.system.SystemStreamPartition
+
+/**
+ * Group the SSP taskNames by dividing the number of taskNames into the number of containers (n) and assigning n taskNames
+ * to each container as returned by iterating over the keys in the map of taskNames (whatever that ordering happens to be).
+ * No consideration is given towards locality, even distribution of aggregate SSPs within a container, even distribution
+ * of the number of taskNames between containers, etc.
+ */
+class GroupByContainerCount(numContainers:Int) extends TaskNameGrouper {
+  require(numContainers > 0, "Must have at least one container")
+
+  override def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions] = {
+    val keySize = taskNames.keySet.size
+    require(keySize > 0, "Must have some SSPs to group, but found none")
+
+    // Iterate through the taskNames, round-robining them per container
+    val byContainerNum = (0 until numContainers).map(_ -> scala.collection.mutable.Map[TaskName, Set[SystemStreamPartition]]()).toMap
+    var idx = 0
+    for(taskName <- taskNames.iterator) {
+      val currMap = byContainerNum.get(idx).get // safe to use simple get since we populated everybody above
+      idx = (idx + 1) % numContainers
+
+      currMap += taskName
+    }
+
+    byContainerNum.map(kv => kv._1 -> TaskNamesToSystemStreamPartitions(kv._2)).toMap
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
new file mode 100644
index 0000000..46e75b1
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task
+
+import org.apache.samza.container.TaskNamesToSystemStreamPartitions
+
+/**
+ * After the input SystemStreamPartitions have been mapped to their TaskNames by an implementation of
+ * {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}, we can then map those groupings onto
+ * the {@link org.apache.samza.container.SamzaContainer}s on which they will run.  This class takes
+ * those groupings-of-SSPs and groups them together on which container each should run on.  A simple
+ * implementation could assign each TaskNamesToSystemStreamPartition to a separate container.  More
+ * advanced implementations could examine the TaskNamesToSystemStreamPartition to group by them
+ * by data locality, anti-affinity, even distribution of expected bandwidth consumption, etc.
+ */
+trait TaskNameGrouper {
+  /**
+   * Group TaskNamesToSystemStreamPartitions onto the containers they will share
+   *
+   * @param taskNames Pre-grouped SSPs
+   * @return Mapping of container ID to set if TaskNames it will run
+   */
+  def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions]
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala
deleted file mode 100644
index 223862f..0000000
--- a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.systemstreampartition.groupers
-
-import org.apache.samza.container.{TaskName, SystemStreamPartitionGrouperFactory, SystemStreamPartitionGrouper}
-import java.util
-import org.apache.samza.system.SystemStreamPartition
-import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
-import org.apache.samza.config.Config
-
-/**
- * Group the {@link org.apache.samza.system.SystemStreamPartition}s by their Partition, with the key being
- * the string representation of the Partition.
- */
-class GroupByPartition extends SystemStreamPartitionGrouper {
-  override def group(ssps: util.Set[SystemStreamPartition]) = {
-    ssps.groupBy( s => new TaskName("Partition " + s.getPartition.getPartitionId) )
-      .map(r => r._1 -> r._2.asJava)
-  }
-}
-
-class GroupByPartitionFactory extends SystemStreamPartitionGrouperFactory {
-  override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupByPartition
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala
deleted file mode 100644
index a2bcfee..0000000
--- a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.systemstreampartition.groupers
-
-import org.apache.samza.container.{TaskName, SystemStreamPartitionGrouperFactory, SystemStreamPartitionGrouper}
-import java.util
-import org.apache.samza.system.SystemStreamPartition
-import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
-import org.apache.samza.config.Config
-
-/**
- * Group the {@link org.apache.samza.system.SystemStreamPartition}s by themselves, effectively putting each
- * SystemStreamPartition into its own group, with the key being the string representation of the SystemStringPartition
- */
-class GroupBySystemStreamPartition extends SystemStreamPartitionGrouper {
-  override def group(ssps: util.Set[SystemStreamPartition]) = ssps.groupBy({ s=> new TaskName(s.toString)}).map(r => r._1 -> r._2.asJava)
-}
-
-class GroupBySystemStreamPartitionFactory extends SystemStreamPartitionGrouperFactory {
-  override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupBySystemStreamPartition
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala
deleted file mode 100644
index 7913294..0000000
--- a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.systemstreampartition.taskname.groupers
-
-import org.apache.samza.container.{TaskName, SystemStreamPartitionTaskNameGrouper, TaskNamesToSystemStreamPartitions}
-import org.apache.samza.system.SystemStreamPartition
-
-/**
- * Group the SSP taskNames by dividing the number of taskNames into the number of containers (n) and assigning n taskNames
- * to each container as returned by iterating over the keys in the map of taskNames (whatever that ordering happens to be).
- * No consideration is given towards locality, even distribution of aggregate SSPs within a container, even distribution
- * of the number of taskNames between containers, etc.
- */
-class SimpleSystemStreamPartitionTaskNameGrouper(numContainers:Int) extends SystemStreamPartitionTaskNameGrouper {
-  require(numContainers > 0, "Must have at least one container")
-
-  override def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions] = {
-    val keySize = taskNames.keySet.size
-    require(keySize > 0, "Must have some SSPs to group, but found none")
-
-    // Iterate through the taskNames, round-robining them per container
-    val byContainerNum = (0 until numContainers).map(_ -> scala.collection.mutable.Map[TaskName, Set[SystemStreamPartition]]()).toMap
-    var idx = 0
-    for(taskName <- taskNames.iterator) {
-      val currMap = byContainerNum.get(idx).get // safe to use simple get since we populated everybody above
-      idx = (idx + 1) % numContainers
-
-      currMap += taskName
-    }
-
-    byContainerNum.map(kv => kv._1 -> TaskNamesToSystemStreamPartitions(kv._2)).toMap
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 32c2647..16ad5a2 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -32,13 +32,14 @@ import org.apache.samza.config.Config
 import org.apache.samza.config.StorageConfig.Config2Storage
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.container.systemstreampartition.groupers.GroupByPartitionFactory
-import org.apache.samza.container.systemstreampartition.taskname.groupers.SimpleSystemStreamPartitionTaskNameGrouper
-import org.apache.samza.container.{TaskName, SystemStreamPartitionTaskNameGrouper, TaskNamesToSystemStreamPartitions, SystemStreamPartitionGrouperFactory}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.{SystemStreamPartition, SystemFactory, StreamMetadataCache, SystemStream}
 import scala.collection.JavaConversions._
 import scala.collection
+import org.apache.samza.container.TaskName
+import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
+import org.apache.samza.container.TaskNamesToSystemStreamPartitions
+import org.apache.samza.container.grouper.task.GroupByContainerCount
 
 object Util extends Logging {
   val random = new Random
@@ -148,7 +149,7 @@ object Util extends Logging {
     info("Assigning " + sspTaskNames.keySet.size + " SystemStreamPartitions taskNames to " + containerCount + " containers.")
 
     // Here is where we should put in a pluggable option for the SSPTaskNameGrouper for locality, load-balancing, etc.
-    val sspTaskNameGrouper = new SimpleSystemStreamPartitionTaskNameGrouper(containerCount)
+    val sspTaskNameGrouper = new GroupByContainerCount(containerCount)
 
     val containersToTaskNames = sspTaskNameGrouper.groupTaskNames(sspTaskNames).toMap
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala b/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala
deleted file mode 100644
index 3032b00..0000000
--- a/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container
-
-import org.apache.samza.Partition
-import org.apache.samza.system.SystemStreamPartition
-import org.junit.Test
-import java.util.HashSet
-import java.util.Map
-import java.util.Set
-import org.junit.Assert.assertEquals
-import org.junit.Assert.assertTrue
-import java.util.Collections
-
-object SystemStreamPartitionGrouperTestBase {
-  val aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0))
-  val aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1))
-  val aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2))
-  val ab1 = new SystemStreamPartition("SystemA", "StreamB", new Partition(1))
-  val ab2 = new SystemStreamPartition("SystemA", "StreamB", new Partition(2))
-  val ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0))
-  val allSSPs = new HashSet[SystemStreamPartition]
-  Collections.addAll(allSSPs, aa0, aa1, aa2, ab1, ab2, ac0)
-}
-
-abstract class SystemStreamPartitionGrouperTestBase {
-  def getGrouper: SystemStreamPartitionGrouper
-
-  @Test
-  def emptySetReturnsEmptyMap {
-    val grouper: SystemStreamPartitionGrouper = getGrouper
-    val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(new HashSet[SystemStreamPartition])
-    assertTrue(result.isEmpty)
-  }
-
-  def verifyGroupGroupsCorrectly(input: Set[SystemStreamPartition], output: Map[TaskName, Set[SystemStreamPartition]]) {
-    val grouper: SystemStreamPartitionGrouper = getGrouper
-    val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(input)
-    assertEquals(output, result)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
new file mode 100644
index 0000000..47d716e
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream
+
+import org.apache.samza.Partition
+import org.apache.samza.system.SystemStreamPartition
+import org.junit.Test
+import java.util.HashSet
+import java.util.Map
+import java.util.Set
+import org.junit.Assert._
+import java.util.Collections
+import org.apache.samza.container.TaskName
+
+object GroupByTestBase {
+  val aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0))
+  val aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1))
+  val aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2))
+  val ab1 = new SystemStreamPartition("SystemA", "StreamB", new Partition(1))
+  val ab2 = new SystemStreamPartition("SystemA", "StreamB", new Partition(2))
+  val ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0))
+  val allSSPs = new HashSet[SystemStreamPartition]
+  Collections.addAll(allSSPs, aa0, aa1, aa2, ab1, ab2, ac0)
+}
+
+abstract class GroupByTestBase {
+  def getGrouper: SystemStreamPartitionGrouper
+
+  @Test
+  def emptySetReturnsEmptyMap {
+    val grouper: SystemStreamPartitionGrouper = getGrouper
+    val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(new HashSet[SystemStreamPartition])
+    assertTrue(result.isEmpty)
+  }
+
+  def verifyGroupGroupsCorrectly(input: Set[SystemStreamPartition], output: Map[TaskName, Set[SystemStreamPartition]]) {
+    val grouper: SystemStreamPartitionGrouper = getGrouper
+    val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(input)
+    assertEquals(output, result)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
new file mode 100644
index 0000000..2fa718c
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream
+
+import org.apache.samza.container.TaskName
+import scala.collection.JavaConverters._
+import org.junit.Test
+
+class TestGroupByPartition extends GroupByTestBase {
+  import GroupByTestBase._
+
+  val expected /* from base class provided set */ =  Map(new TaskName("Partition 0") -> Set(aa0, ac0).asJava,
+                                                         new TaskName("Partition 1") -> Set(aa1, ab1).asJava,
+                                                         new TaskName("Partition 2") -> Set(aa2, ab2).asJava).asJava
+
+  override def getGrouper: SystemStreamPartitionGrouper = new GroupByPartition
+
+  @Test def groupingWorks() {
+    verifyGroupGroupsCorrectly(allSSPs, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
new file mode 100644
index 0000000..8da0595
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream
+
+import org.apache.samza.container.TaskName
+import scala.collection.JavaConverters._
+import org.junit.Test
+
+class TestGroupBySystemStreamPartition extends GroupByTestBase {
+  import GroupByTestBase._
+
+  // Building manually to avoid just duplicating a logic potential logic error here and there
+  val expected /* from base class provided set */ =  Map(new TaskName(aa0.toString) -> Set(aa0).asJava,
+    new TaskName(aa1.toString) -> Set(aa1).asJava,
+    new TaskName(aa2.toString) -> Set(aa2).asJava,
+    new TaskName(ab1.toString) -> Set(ab1).asJava,
+    new TaskName(ab2.toString) -> Set(ab2).asJava,
+    new TaskName(ac0.toString) -> Set(ac0).asJava).asJava
+
+  override def getGrouper: SystemStreamPartitionGrouper = new GroupBySystemStreamPartition
+
+  @Test def groupingWorks() {
+    verifyGroupGroupsCorrectly(allSSPs, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
new file mode 100644
index 0000000..20f41a8
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task
+
+import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions}
+import org.apache.samza.system.SystemStreamPartition
+import org.junit.Assert._
+import org.junit.Test
+
+class TestGroupByContainerCount {
+  val emptySSPSet = Set[SystemStreamPartition]()
+
+  @Test
+  def weGetAsExactlyManyGroupsAsWeAskFor() {
+    // memoize the maps used in the test to avoid an O(n^3) loop
+    val tntsspCache = scala.collection.mutable.Map[Int, TaskNamesToSystemStreamPartitions]()
+
+    def tntsspOfSize(size:Int) = {
+      def getMap(size:Int) = TaskNamesToSystemStreamPartitions((0 until size).map(z => new TaskName("tn" + z) -> emptySSPSet).toMap)
+
+      tntsspCache.getOrElseUpdate(size, getMap(size))
+    }
+
+    val maxTNTSSPSize = 1000
+    val maxNumGroups = 140
+    for(numGroups <- 1 to maxNumGroups) {
+      val grouper = new GroupByContainerCount(numGroups)
+
+      for (tntsspSize <- numGroups to maxTNTSSPSize) {
+        val map = tntsspOfSize(tntsspSize)
+        assertEquals(tntsspSize, map.size)
+
+        val grouped = grouper.groupTaskNames(map)
+        assertEquals("Asked for " + numGroups + " but got " + grouped.size, numGroups, grouped.size)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala
deleted file mode 100644
index 733be20..0000000
--- a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.systemstreampartition.groupers
-
-import org.apache.samza.container.{TaskName, SystemStreamPartitionGrouperTestBase, SystemStreamPartitionGrouper}
-import scala.collection.JavaConverters._
-import org.junit.Test
-
-class TestGroupByPartition extends SystemStreamPartitionGrouperTestBase {
-  import SystemStreamPartitionGrouperTestBase._
-
-  val expected /* from base class provided set */ =  Map(new TaskName("Partition 0") -> Set(aa0, ac0).asJava,
-                                                         new TaskName("Partition 1") -> Set(aa1, ab1).asJava,
-                                                         new TaskName("Partition 2") -> Set(aa2, ab2).asJava).asJava
-
-  override def getGrouper: SystemStreamPartitionGrouper = new GroupByPartition
-
-  @Test def groupingWorks() {
-    verifyGroupGroupsCorrectly(allSSPs, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala
deleted file mode 100644
index e9c15a5..0000000
--- a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.systemstreampartition.groupers
-
-import org.apache.samza.container.{TaskName, SystemStreamPartitionGrouperTestBase, SystemStreamPartitionGrouper}
-import scala.collection.JavaConverters._
-import org.junit.Test
-
-class TestGroupBySystemStreamPartition extends SystemStreamPartitionGrouperTestBase {
-  import SystemStreamPartitionGrouperTestBase._
-
-  // Building manually to avoid just duplicating a logic potential logic error here and there
-  val expected /* from base class provided set */ =  Map(new TaskName(aa0.toString) -> Set(aa0).asJava,
-    new TaskName(aa1.toString) -> Set(aa1).asJava,
-    new TaskName(aa2.toString) -> Set(aa2).asJava,
-    new TaskName(ab1.toString) -> Set(ab1).asJava,
-    new TaskName(ab2.toString) -> Set(ab2).asJava,
-    new TaskName(ac0.toString) -> Set(ac0).asJava).asJava
-
-  override def getGrouper: SystemStreamPartitionGrouper = new GroupBySystemStreamPartition
-
-  @Test def groupingWorks() {
-    verifyGroupGroupsCorrectly(allSSPs, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala
deleted file mode 100644
index 7ea09cd..0000000
--- a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.systemstreampartition.taskname.groupers
-
-import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions}
-import org.apache.samza.system.SystemStreamPartition
-import org.junit.Assert._
-import org.junit.Test
-
-class TestSimpleSystemStreamPartitionTaskNameGrouper {
-  val emptySSPSet = Set[SystemStreamPartition]()
-
-  @Test
-  def weGetAsExactlyManyGroupsAsWeAskFor() {
-    // memoize the maps used in the test to avoid an O(n^3) loop
-    val tntsspCache = scala.collection.mutable.Map[Int, TaskNamesToSystemStreamPartitions]()
-
-    def tntsspOfSize(size:Int) = {
-      def getMap(size:Int) = TaskNamesToSystemStreamPartitions((0 until size).map(z => new TaskName("tn" + z) -> emptySSPSet).toMap)
-
-      tntsspCache.getOrElseUpdate(size, getMap(size))
-    }
-
-    val maxTNTSSPSize = 1000
-    val maxNumGroups = 140
-    for(numGroups <- 1 to maxNumGroups) {
-      val grouper = new SimpleSystemStreamPartitionTaskNameGrouper(numGroups)
-
-      for (tntsspSize <- numGroups to maxTNTSSPSize) {
-        val map = tntsspOfSize(tntsspSize)
-        assertEquals(tntsspSize, map.size)
-
-        val grouped = grouper.groupTaskNames(map)
-        assertEquals("Asked for " + numGroups + " but got " + grouped.size, numGroups, grouped.size)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index cddee13..34fe6dd 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -42,7 +42,7 @@ import org.junit.Assert._
 import org.junit.{AfterClass, BeforeClass, Test}
 import scala.collection.JavaConversions._
 import scala.collection._
-import org.apache.samza.container.systemstreampartition.groupers.GroupByPartitionFactory
+import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
 
 object TestKafkaCheckpointManager {
   val zkConnect: String = TestZKUtils.zookeeperConnect


[2/2] git commit: SAMZA-357; default to utf8 to fix rat task failures

Posted by cr...@apache.org.
SAMZA-357; default to utf8 to fix rat task failures


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/cc11e02d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/cc11e02d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/cc11e02d

Branch: refs/heads/master
Commit: cc11e02dd3d1e5aff3e8e766d908431b98219161
Parents: 2c1391f
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Jul 30 16:05:27 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Jul 30 16:05:27 2014 -0700

----------------------------------------------------------------------
 gradle.properties | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cc11e02d/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index 6f35fb4..5711c1b 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -21,3 +21,5 @@ scalaVersion=2.10
 gradleVersion=2.0
 
 org.gradle.jvmargs="-XX:MaxPermSize=512m"
+
+systemProp.file.encoding=utf-8