You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/07/31 01:47:58 UTC
[1/2] git commit: SAMZA-359; refactor grouper package names
Repository: incubator-samza
Updated Branches:
refs/heads/master 3a8e2f9d1 -> cc11e02dd
SAMZA-359; refactor grouper package names
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/2c1391f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/2c1391f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/2c1391f6
Branch: refs/heads/master
Commit: 2c1391f6721c90e0a004ee25f746e2d627ccf2a0
Parents: 3a8e2f9
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Jul 30 15:58:40 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Jul 30 15:58:40 2014 -0700
----------------------------------------------------------------------
.../container/SystemStreamPartitionGrouper.java | 40 --------------
.../SystemStreamPartitionGrouperFactory.java | 28 ----------
.../org/apache/samza/container/TaskName.java | 2 +-
.../stream/SystemStreamPartitionGrouper.java | 41 ++++++++++++++
.../SystemStreamPartitionGrouperFactory.java | 28 ++++++++++
.../org/apache/samza/config/JobConfig.scala | 2 +-
.../SystemStreamPartitionTaskNameGrouper.scala | 38 -------------
.../grouper/stream/GroupByPartition.scala | 41 ++++++++++++++
.../stream/GroupBySystemStreamPartition.scala | 38 +++++++++++++
.../grouper/task/GroupByContainerCount.scala | 50 +++++++++++++++++
.../grouper/task/TaskNameGrouper.scala | 40 ++++++++++++++
.../groupers/GroupByPartition.scala | 41 --------------
.../groupers/GroupBySystemStreamPartition.scala | 38 -------------
...leSystemStreamPartitionTaskNameGrouper.scala | 50 -----------------
.../main/scala/org/apache/samza/util/Util.scala | 9 ++--
.../SystemStreamPartitionGrouperTestBase.scala | 57 --------------------
.../grouper/stream/GroupByTestBase.scala | 57 ++++++++++++++++++++
.../grouper/stream/TestGroupByPartition.scala | 37 +++++++++++++
.../TestGroupBySystemStreamPartition.scala | 41 ++++++++++++++
.../task/TestGroupByContainerCount.scala | 54 +++++++++++++++++++
.../groupers/TestGroupByPartition.scala | 37 -------------
.../TestGroupBySystemStreamPartition.scala | 41 --------------
...leSystemStreamPartitionTaskNameGrouper.scala | 54 -------------------
.../kafka/TestKafkaCheckpointManager.scala | 2 +-
24 files changed, 435 insertions(+), 431 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java b/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java
deleted file mode 100644
index 897d9f5..0000000
--- a/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouper.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container;
-
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Group a set of SystemStreamPartitions into logical taskNames that share a common characteristic, defined
- * by the implementation. Each taskName has a key that uniquely describes what sets may be in it, but does
- * not generally enumerate the elements of those sets. For example, a SystemStreamPartitionGrouper that
- * groups SystemStreamPartitions (each with 4 partitions) by their partition, would end up generating
- * four TaskNames: 0, 1, 2, 3. These TaskNames describe the partitions but do not list all of the
- * SystemStreamPartitions, which allows new SystemStreamPartitions to be added later without changing
- * the definition of the TaskNames, assuming these new SystemStreamPartitions do not have more than
- * four partitions. On the other hand, a SystemStreamPartitionGrouper that wanted each SystemStreamPartition
- * to be its own, unique group would use the SystemStreamPartition's entire description to generate
- * the TaskNames.
- */
-public interface SystemStreamPartitionGrouper {
- public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps);
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java b/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java
deleted file mode 100644
index 10ac6e2..0000000
--- a/samza-api/src/main/java/org/apache/samza/container/SystemStreamPartitionGrouperFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container;
-
-import org.apache.samza.config.Config;
-
-/**
- * Return an instance a SystemStreamPartitionGrouper per the particular implementation
- */
-public interface SystemStreamPartitionGrouperFactory {
- public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config);
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-api/src/main/java/org/apache/samza/container/TaskName.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/TaskName.java b/samza-api/src/main/java/org/apache/samza/container/TaskName.java
index 13a1206..0833586 100644
--- a/samza-api/src/main/java/org/apache/samza/container/TaskName.java
+++ b/samza-api/src/main/java/org/apache/samza/container/TaskName.java
@@ -20,7 +20,7 @@ package org.apache.samza.container;
/**
* A unique identifier of a set of a SystemStreamPartitions that have been grouped by
- * a {@link org.apache.samza.container.SystemStreamPartitionGrouper}. The
+ * a {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}. The
* SystemStreamPartitionGrouper determines the TaskName for each set it creates.
*/
public class TaskName implements Comparable<TaskName> {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.java b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.java
new file mode 100644
index 0000000..f374c4b
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream;
+
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Group a set of SystemStreamPartitions into logical taskNames that share a common characteristic, defined
+ * by the implementation. Each taskName has a key that uniquely describes what sets may be in it, but does
+ * not generally enumerate the elements of those sets. For example, a SystemStreamPartitionGrouper that
+ * groups SystemStreamPartitions (each with 4 partitions) by their partition, would end up generating
+ * four TaskNames: 0, 1, 2, 3. These TaskNames describe the partitions but do not list all of the
+ * SystemStreamPartitions, which allows new SystemStreamPartitions to be added later without changing
+ * the definition of the TaskNames, assuming these new SystemStreamPartitions do not have more than
+ * four partitions. On the other hand, a SystemStreamPartitionGrouper that wanted each SystemStreamPartition
+ * to be its own, unique group would use the SystemStreamPartition's entire description to generate
+ * the TaskNames.
+ */
+public interface SystemStreamPartitionGrouper {
+ public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps);
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouperFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouperFactory.java b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouperFactory.java
new file mode 100644
index 0000000..6c0b12d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouperFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream;
+
+import org.apache.samza.config.Config;
+
+/**
+ * Return an instance a SystemStreamPartitionGrouper per the particular implementation
+ */
+public interface SystemStreamPartitionGrouperFactory {
+ public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config);
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index f84aeea..3b6685e 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -19,7 +19,7 @@
package org.apache.samza.config
-import org.apache.samza.container.systemstreampartition.groupers.GroupByPartitionFactory
+import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
object JobConfig {
// job config constants
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala b/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala
deleted file mode 100644
index a8c93ac..0000000
--- a/samza-core/src/main/scala/org/apache/samza/container/SystemStreamPartitionTaskNameGrouper.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container
-
-/**
- * After the input SystemStreamPartitions have been mapped to their TaskNames by an implementation of
- * {@link org.apache.samza.container.SystemStreamPartitionGrouper}, we can then map those groupings onto
- * the {@link org.apache.samza.container.SamzaContainer}s on which they will run. This class takes
- * those groupings-of-SSPs and groups them together on which container each should run on. A simple
- * implementation could assign each TaskNamesToSystemStreamPartition to a separate container. More
- * advanced implementations could examine the TaskNamesToSystemStreamPartition to group by them
- * by data locality, anti-affinity, even distribution of expected bandwidth consumption, etc.
- */
-trait SystemStreamPartitionTaskNameGrouper {
- /**
- * Group TaskNamesToSystemStreamPartitions onto the containers they will share
- *
- * @param taskNames Pre-grouped SSPs
- * @return Mapping of container ID to set if TaskNames it will run
- */
- def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions]
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
new file mode 100644
index 0000000..44e95fc
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream
+
+import org.apache.samza.container.TaskName
+import java.util
+import org.apache.samza.system.SystemStreamPartition
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+import org.apache.samza.config.Config
+
+/**
+ * Group the {@link org.apache.samza.system.SystemStreamPartition}s by their Partition, with the key being
+ * the string representation of the Partition.
+ */
+class GroupByPartition extends SystemStreamPartitionGrouper {
+ override def group(ssps: util.Set[SystemStreamPartition]) = {
+ ssps.groupBy( s => new TaskName("Partition " + s.getPartition.getPartitionId) )
+ .map(r => r._1 -> r._2.asJava)
+ }
+}
+
+class GroupByPartitionFactory extends SystemStreamPartitionGrouperFactory {
+ override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupByPartition
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
new file mode 100644
index 0000000..3c0acad
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream
+
+import org.apache.samza.container.TaskName
+import java.util
+import org.apache.samza.system.SystemStreamPartition
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+import org.apache.samza.config.Config
+
+/**
+ * Group the {@link org.apache.samza.system.SystemStreamPartition}s by themselves, effectively putting each
+ * SystemStreamPartition into its own group, with the key being the string representation of the SystemStringPartition
+ */
+class GroupBySystemStreamPartition extends SystemStreamPartitionGrouper {
+ override def group(ssps: util.Set[SystemStreamPartition]) = ssps.groupBy({ s=> new TaskName(s.toString)}).map(r => r._1 -> r._2.asJava)
+}
+
+class GroupBySystemStreamPartitionFactory extends SystemStreamPartitionGrouperFactory {
+ override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupBySystemStreamPartition
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
new file mode 100644
index 0000000..7a3ba46
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task
+
+import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions}
+import org.apache.samza.system.SystemStreamPartition
+
+/**
+ * Group the SSP taskNames by dividing the number of taskNames into the number of containers (n) and assigning n taskNames
+ * to each container as returned by iterating over the keys in the map of taskNames (whatever that ordering happens to be).
+ * No consideration is given towards locality, even distribution of aggregate SSPs within a container, even distribution
+ * of the number of taskNames between containers, etc.
+ */
+class GroupByContainerCount(numContainers:Int) extends TaskNameGrouper {
+ require(numContainers > 0, "Must have at least one container")
+
+ override def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions] = {
+ val keySize = taskNames.keySet.size
+ require(keySize > 0, "Must have some SSPs to group, but found none")
+
+ // Iterate through the taskNames, round-robining them per container
+ val byContainerNum = (0 until numContainers).map(_ -> scala.collection.mutable.Map[TaskName, Set[SystemStreamPartition]]()).toMap
+ var idx = 0
+ for(taskName <- taskNames.iterator) {
+ val currMap = byContainerNum.get(idx).get // safe to use simple get since we populated everybody above
+ idx = (idx + 1) % numContainers
+
+ currMap += taskName
+ }
+
+ byContainerNum.map(kv => kv._1 -> TaskNamesToSystemStreamPartitions(kv._2)).toMap
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
new file mode 100644
index 0000000..46e75b1
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/container/grouper/task/TaskNameGrouper.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task
+
+import org.apache.samza.container.TaskNamesToSystemStreamPartitions
+
+/**
+ * After the input SystemStreamPartitions have been mapped to their TaskNames by an implementation of
+ * {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}, we can then map those groupings onto
+ * the {@link org.apache.samza.container.SamzaContainer}s on which they will run. This class takes
+ * those groupings-of-SSPs and groups them together on which container each should run on. A simple
+ * implementation could assign each TaskNamesToSystemStreamPartition to a separate container. More
+ * advanced implementations could examine the TaskNamesToSystemStreamPartition to group by them
+ * by data locality, anti-affinity, even distribution of expected bandwidth consumption, etc.
+ */
+trait TaskNameGrouper {
+ /**
+ * Group TaskNamesToSystemStreamPartitions onto the containers they will share
+ *
+ * @param taskNames Pre-grouped SSPs
+ * @return Mapping of container ID to set if TaskNames it will run
+ */
+ def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions]
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala
deleted file mode 100644
index 223862f..0000000
--- a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupByPartition.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.systemstreampartition.groupers
-
-import org.apache.samza.container.{TaskName, SystemStreamPartitionGrouperFactory, SystemStreamPartitionGrouper}
-import java.util
-import org.apache.samza.system.SystemStreamPartition
-import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
-import org.apache.samza.config.Config
-
-/**
- * Group the {@link org.apache.samza.system.SystemStreamPartition}s by their Partition, with the key being
- * the string representation of the Partition.
- */
-class GroupByPartition extends SystemStreamPartitionGrouper {
- override def group(ssps: util.Set[SystemStreamPartition]) = {
- ssps.groupBy( s => new TaskName("Partition " + s.getPartition.getPartitionId) )
- .map(r => r._1 -> r._2.asJava)
- }
-}
-
-class GroupByPartitionFactory extends SystemStreamPartitionGrouperFactory {
- override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupByPartition
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala
deleted file mode 100644
index a2bcfee..0000000
--- a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/groupers/GroupBySystemStreamPartition.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.systemstreampartition.groupers
-
-import org.apache.samza.container.{TaskName, SystemStreamPartitionGrouperFactory, SystemStreamPartitionGrouper}
-import java.util
-import org.apache.samza.system.SystemStreamPartition
-import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
-import org.apache.samza.config.Config
-
-/**
- * Group the {@link org.apache.samza.system.SystemStreamPartition}s by themselves, effectively putting each
- * SystemStreamPartition into its own group, with the key being the string representation of the SystemStringPartition
- */
-class GroupBySystemStreamPartition extends SystemStreamPartitionGrouper {
- override def group(ssps: util.Set[SystemStreamPartition]) = ssps.groupBy({ s=> new TaskName(s.toString)}).map(r => r._1 -> r._2.asJava)
-}
-
-class GroupBySystemStreamPartitionFactory extends SystemStreamPartitionGrouperFactory {
- override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupBySystemStreamPartition
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala b/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala
deleted file mode 100644
index 7913294..0000000
--- a/samza-core/src/main/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/SimpleSystemStreamPartitionTaskNameGrouper.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.systemstreampartition.taskname.groupers
-
-import org.apache.samza.container.{TaskName, SystemStreamPartitionTaskNameGrouper, TaskNamesToSystemStreamPartitions}
-import org.apache.samza.system.SystemStreamPartition
-
-/**
- * Group the SSP taskNames by dividing the number of taskNames into the number of containers (n) and assigning n taskNames
- * to each container as returned by iterating over the keys in the map of taskNames (whatever that ordering happens to be).
- * No consideration is given towards locality, even distribution of aggregate SSPs within a container, even distribution
- * of the number of taskNames between containers, etc.
- */
-class SimpleSystemStreamPartitionTaskNameGrouper(numContainers:Int) extends SystemStreamPartitionTaskNameGrouper {
- require(numContainers > 0, "Must have at least one container")
-
- override def groupTaskNames(taskNames: TaskNamesToSystemStreamPartitions): Map[Int, TaskNamesToSystemStreamPartitions] = {
- val keySize = taskNames.keySet.size
- require(keySize > 0, "Must have some SSPs to group, but found none")
-
- // Iterate through the taskNames, round-robining them per container
- val byContainerNum = (0 until numContainers).map(_ -> scala.collection.mutable.Map[TaskName, Set[SystemStreamPartition]]()).toMap
- var idx = 0
- for(taskName <- taskNames.iterator) {
- val currMap = byContainerNum.get(idx).get // safe to use simple get since we populated everybody above
- idx = (idx + 1) % numContainers
-
- currMap += taskName
- }
-
- byContainerNum.map(kv => kv._1 -> TaskNamesToSystemStreamPartitions(kv._2)).toMap
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 32c2647..16ad5a2 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -32,13 +32,14 @@ import org.apache.samza.config.Config
import org.apache.samza.config.StorageConfig.Config2Storage
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.container.systemstreampartition.groupers.GroupByPartitionFactory
-import org.apache.samza.container.systemstreampartition.taskname.groupers.SimpleSystemStreamPartitionTaskNameGrouper
-import org.apache.samza.container.{TaskName, SystemStreamPartitionTaskNameGrouper, TaskNamesToSystemStreamPartitions, SystemStreamPartitionGrouperFactory}
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.system.{SystemStreamPartition, SystemFactory, StreamMetadataCache, SystemStream}
import scala.collection.JavaConversions._
import scala.collection
+import org.apache.samza.container.TaskName
+import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
+import org.apache.samza.container.TaskNamesToSystemStreamPartitions
+import org.apache.samza.container.grouper.task.GroupByContainerCount
object Util extends Logging {
val random = new Random
@@ -148,7 +149,7 @@ object Util extends Logging {
info("Assigning " + sspTaskNames.keySet.size + " SystemStreamPartitions taskNames to " + containerCount + " containers.")
// Here is where we should put in a pluggable option for the SSPTaskNameGrouper for locality, load-balancing, etc.
- val sspTaskNameGrouper = new SimpleSystemStreamPartitionTaskNameGrouper(containerCount)
+ val sspTaskNameGrouper = new GroupByContainerCount(containerCount)
val containersToTaskNames = sspTaskNameGrouper.groupTaskNames(sspTaskNames).toMap
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala b/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala
deleted file mode 100644
index 3032b00..0000000
--- a/samza-core/src/test/scala/org/apache/samza/container/SystemStreamPartitionGrouperTestBase.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container
-
-import org.apache.samza.Partition
-import org.apache.samza.system.SystemStreamPartition
-import org.junit.Test
-import java.util.HashSet
-import java.util.Map
-import java.util.Set
-import org.junit.Assert.assertEquals
-import org.junit.Assert.assertTrue
-import java.util.Collections
-
-object SystemStreamPartitionGrouperTestBase {
- val aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0))
- val aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1))
- val aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2))
- val ab1 = new SystemStreamPartition("SystemA", "StreamB", new Partition(1))
- val ab2 = new SystemStreamPartition("SystemA", "StreamB", new Partition(2))
- val ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0))
- val allSSPs = new HashSet[SystemStreamPartition]
- Collections.addAll(allSSPs, aa0, aa1, aa2, ab1, ab2, ac0)
-}
-
-abstract class SystemStreamPartitionGrouperTestBase {
- def getGrouper: SystemStreamPartitionGrouper
-
- @Test
- def emptySetReturnsEmptyMap {
- val grouper: SystemStreamPartitionGrouper = getGrouper
- val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(new HashSet[SystemStreamPartition])
- assertTrue(result.isEmpty)
- }
-
- def verifyGroupGroupsCorrectly(input: Set[SystemStreamPartition], output: Map[TaskName, Set[SystemStreamPartition]]) {
- val grouper: SystemStreamPartitionGrouper = getGrouper
- val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(input)
- assertEquals(output, result)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
new file mode 100644
index 0000000..47d716e
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream
+
+import org.apache.samza.Partition
+import org.apache.samza.system.SystemStreamPartition
+import org.junit.Test
+import java.util.HashSet
+import java.util.Map
+import java.util.Set
+import org.junit.Assert._
+import java.util.Collections
+import org.apache.samza.container.TaskName
+
+object GroupByTestBase {
+ val aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0))
+ val aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1))
+ val aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2))
+ val ab1 = new SystemStreamPartition("SystemA", "StreamB", new Partition(1))
+ val ab2 = new SystemStreamPartition("SystemA", "StreamB", new Partition(2))
+ val ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0))
+ val allSSPs = new HashSet[SystemStreamPartition]
+ Collections.addAll(allSSPs, aa0, aa1, aa2, ab1, ab2, ac0)
+}
+
+abstract class GroupByTestBase {
+ def getGrouper: SystemStreamPartitionGrouper
+
+ @Test
+ def emptySetReturnsEmptyMap {
+ val grouper: SystemStreamPartitionGrouper = getGrouper
+ val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(new HashSet[SystemStreamPartition])
+ assertTrue(result.isEmpty)
+ }
+
+ def verifyGroupGroupsCorrectly(input: Set[SystemStreamPartition], output: Map[TaskName, Set[SystemStreamPartition]]) {
+ val grouper: SystemStreamPartitionGrouper = getGrouper
+ val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(input)
+ assertEquals(output, result)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
new file mode 100644
index 0000000..2fa718c
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream
+
+import org.apache.samza.container.TaskName
+import scala.collection.JavaConverters._
+import org.junit.Test
+
+class TestGroupByPartition extends GroupByTestBase {
+ import GroupByTestBase._
+
+ val expected /* from base class provided set */ = Map(new TaskName("Partition 0") -> Set(aa0, ac0).asJava,
+ new TaskName("Partition 1") -> Set(aa1, ab1).asJava,
+ new TaskName("Partition 2") -> Set(aa2, ab2).asJava).asJava
+
+ override def getGrouper: SystemStreamPartitionGrouper = new GroupByPartition
+
+ @Test def groupingWorks() {
+ verifyGroupGroupsCorrectly(allSSPs, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
new file mode 100644
index 0000000..8da0595
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.stream
+
+import org.apache.samza.container.TaskName
+import scala.collection.JavaConverters._
+import org.junit.Test
+
+class TestGroupBySystemStreamPartition extends GroupByTestBase {
+ import GroupByTestBase._
+
+ // Building manually to avoid just duplicating a logic potential logic error here and there
+ val expected /* from base class provided set */ = Map(new TaskName(aa0.toString) -> Set(aa0).asJava,
+ new TaskName(aa1.toString) -> Set(aa1).asJava,
+ new TaskName(aa2.toString) -> Set(aa2).asJava,
+ new TaskName(ab1.toString) -> Set(ab1).asJava,
+ new TaskName(ab2.toString) -> Set(ab2).asJava,
+ new TaskName(ac0.toString) -> Set(ac0).asJava).asJava
+
+ override def getGrouper: SystemStreamPartitionGrouper = new GroupBySystemStreamPartition
+
+ @Test def groupingWorks() {
+ verifyGroupGroupsCorrectly(allSSPs, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
new file mode 100644
index 0000000..20f41a8
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.grouper.task
+
+import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions}
+import org.apache.samza.system.SystemStreamPartition
+import org.junit.Assert._
+import org.junit.Test
+
+class TestGroupByContainerCount {
+ val emptySSPSet = Set[SystemStreamPartition]()
+
+ @Test
+ def weGetAsExactlyManyGroupsAsWeAskFor() {
+ // memoize the maps used in the test to avoid an O(n^3) loop
+ val tntsspCache = scala.collection.mutable.Map[Int, TaskNamesToSystemStreamPartitions]()
+
+ def tntsspOfSize(size:Int) = {
+ def getMap(size:Int) = TaskNamesToSystemStreamPartitions((0 until size).map(z => new TaskName("tn" + z) -> emptySSPSet).toMap)
+
+ tntsspCache.getOrElseUpdate(size, getMap(size))
+ }
+
+ val maxTNTSSPSize = 1000
+ val maxNumGroups = 140
+ for(numGroups <- 1 to maxNumGroups) {
+ val grouper = new GroupByContainerCount(numGroups)
+
+ for (tntsspSize <- numGroups to maxTNTSSPSize) {
+ val map = tntsspOfSize(tntsspSize)
+ assertEquals(tntsspSize, map.size)
+
+ val grouped = grouper.groupTaskNames(map)
+ assertEquals("Asked for " + numGroups + " but got " + grouped.size, numGroups, grouped.size)
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala
deleted file mode 100644
index 733be20..0000000
--- a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupByPartition.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.systemstreampartition.groupers
-
-import org.apache.samza.container.{TaskName, SystemStreamPartitionGrouperTestBase, SystemStreamPartitionGrouper}
-import scala.collection.JavaConverters._
-import org.junit.Test
-
-class TestGroupByPartition extends SystemStreamPartitionGrouperTestBase {
- import SystemStreamPartitionGrouperTestBase._
-
- val expected /* from base class provided set */ = Map(new TaskName("Partition 0") -> Set(aa0, ac0).asJava,
- new TaskName("Partition 1") -> Set(aa1, ab1).asJava,
- new TaskName("Partition 2") -> Set(aa2, ab2).asJava).asJava
-
- override def getGrouper: SystemStreamPartitionGrouper = new GroupByPartition
-
- @Test def groupingWorks() {
- verifyGroupGroupsCorrectly(allSSPs, expected)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala
deleted file mode 100644
index e9c15a5..0000000
--- a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/groupers/TestGroupBySystemStreamPartition.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.systemstreampartition.groupers
-
-import org.apache.samza.container.{TaskName, SystemStreamPartitionGrouperTestBase, SystemStreamPartitionGrouper}
-import scala.collection.JavaConverters._
-import org.junit.Test
-
-class TestGroupBySystemStreamPartition extends SystemStreamPartitionGrouperTestBase {
- import SystemStreamPartitionGrouperTestBase._
-
- // Building manually to avoid just duplicating a logic potential logic error here and there
- val expected /* from base class provided set */ = Map(new TaskName(aa0.toString) -> Set(aa0).asJava,
- new TaskName(aa1.toString) -> Set(aa1).asJava,
- new TaskName(aa2.toString) -> Set(aa2).asJava,
- new TaskName(ab1.toString) -> Set(ab1).asJava,
- new TaskName(ab2.toString) -> Set(ab2).asJava,
- new TaskName(ac0.toString) -> Set(ac0).asJava).asJava
-
- override def getGrouper: SystemStreamPartitionGrouper = new GroupBySystemStreamPartition
-
- @Test def groupingWorks() {
- verifyGroupGroupsCorrectly(allSSPs, expected)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala b/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala
deleted file mode 100644
index 7ea09cd..0000000
--- a/samza-core/src/test/scala/org/apache/samza/container/systemstreampartition/taskname/groupers/TestSimpleSystemStreamPartitionTaskNameGrouper.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.systemstreampartition.taskname.groupers
-
-import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions}
-import org.apache.samza.system.SystemStreamPartition
-import org.junit.Assert._
-import org.junit.Test
-
-class TestSimpleSystemStreamPartitionTaskNameGrouper {
- val emptySSPSet = Set[SystemStreamPartition]()
-
- @Test
- def weGetAsExactlyManyGroupsAsWeAskFor() {
- // memoize the maps used in the test to avoid an O(n^3) loop
- val tntsspCache = scala.collection.mutable.Map[Int, TaskNamesToSystemStreamPartitions]()
-
- def tntsspOfSize(size:Int) = {
- def getMap(size:Int) = TaskNamesToSystemStreamPartitions((0 until size).map(z => new TaskName("tn" + z) -> emptySSPSet).toMap)
-
- tntsspCache.getOrElseUpdate(size, getMap(size))
- }
-
- val maxTNTSSPSize = 1000
- val maxNumGroups = 140
- for(numGroups <- 1 to maxNumGroups) {
- val grouper = new SimpleSystemStreamPartitionTaskNameGrouper(numGroups)
-
- for (tntsspSize <- numGroups to maxTNTSSPSize) {
- val map = tntsspOfSize(tntsspSize)
- assertEquals(tntsspSize, map.size)
-
- val grouped = grouper.groupTaskNames(map)
- assertEquals("Asked for " + numGroups + " but got " + grouped.size, numGroups, grouped.size)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2c1391f6/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index cddee13..34fe6dd 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -42,7 +42,7 @@ import org.junit.Assert._
import org.junit.{AfterClass, BeforeClass, Test}
import scala.collection.JavaConversions._
import scala.collection._
-import org.apache.samza.container.systemstreampartition.groupers.GroupByPartitionFactory
+import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
object TestKafkaCheckpointManager {
val zkConnect: String = TestZKUtils.zookeeperConnect
[2/2] git commit: SAMZA-357; default to utf8 to fix rat task failures
Posted by cr...@apache.org.
SAMZA-357; default to utf8 to fix rat task failures
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/cc11e02d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/cc11e02d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/cc11e02d
Branch: refs/heads/master
Commit: cc11e02dd3d1e5aff3e8e766d908431b98219161
Parents: 2c1391f
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Jul 30 16:05:27 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Jul 30 16:05:27 2014 -0700
----------------------------------------------------------------------
gradle.properties | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cc11e02d/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index 6f35fb4..5711c1b 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -21,3 +21,5 @@ scalaVersion=2.10
gradleVersion=2.0
org.gradle.jvmargs="-XX:MaxPermSize=512m"
+
+systemProp.file.encoding=utf-8