You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/08/24 16:50:45 UTC

[2/2] samza git commit: SAMZA-676: Implemented broadcast stream

SAMZA-676: Implemented broadcast stream


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1f77f8b9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1f77f8b9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1f77f8b9

Branch: refs/heads/master
Commit: 1f77f8b986e1e58982704b71ea9bb497ac0f70df
Parents: 79ec5db
Author: Yan Fang <ya...@gmail.com>
Authored: Mon Aug 24 10:46:16 2015 -0400
Committer: Yan Fang <ya...@gmail.com>
Committed: Mon Aug 24 10:46:16 2015 -0400

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   6 +
 .../versioned/container/samza-container.md      |  10 +
 .../versioned/jobs/configuration-table.html     |  16 ++
 .../org/apache/samza/system/SystemAdmin.java    |  10 +
 ...inglePartitionWithoutOffsetsSystemAdmin.java |   4 +
 .../org/apache/samza/config/TaskConfigJava.java |  84 ++++++++
 .../grouper/stream/GroupByPartition.java        |  83 ++++++++
 .../grouper/stream/GroupByPartitionFactory.java |  29 +++
 .../stream/GroupBySystemStreamPartition.java    |  80 ++++++++
 .../GroupBySystemStreamPartitionFactory.java    |  31 +++
 .../apache/samza/checkpoint/OffsetManager.scala | 201 ++++++++++++-------
 .../org/apache/samza/config/JobConfig.scala     |   2 +-
 .../org/apache/samza/container/RunLoop.scala    |  23 ++-
 .../apache/samza/container/SamzaContainer.scala |   5 +-
 .../apache/samza/container/TaskInstance.scala   |  71 +++++--
 .../samza/container/TaskInstanceMetrics.scala   |   1 +
 .../grouper/stream/GroupByPartition.scala       |  41 ----
 .../stream/GroupBySystemStreamPartition.scala   |  38 ----
 .../filereader/FileReaderSystemAdmin.scala      |   4 +
 .../apache/samza/config/TestTaskConfigJava.java |  61 ++++++
 .../grouper/stream/TestGroupByPartition.java    | 130 ++++++++++++
 .../TestGroupBySystemStreamPartition.java       | 104 ++++++++++
 .../samza/checkpoint/TestOffsetManager.scala    |  39 ++--
 .../apache/samza/container/TestRunLoop.scala    |  16 ++
 .../samza/container/TestSamzaContainer.scala    |   1 +
 .../samza/container/TestTaskInstance.scala      |  92 ++++++++-
 .../grouper/stream/GroupByTestBase.scala        |  58 ------
 .../grouper/stream/TestGroupByPartition.scala   |  39 ----
 .../TestGroupBySystemStreamPartition.scala      |  42 ----
 .../samza/coordinator/TestJobCoordinator.scala  |   2 +
 .../elasticsearch/ElasticsearchSystemAdmin.java |   5 +
 .../samza/system/hdfs/HdfsSystemAdmin.scala     |   3 +
 .../samza-hdfs-test-batch-job-text.properties   |  17 ++
 .../samza-hdfs-test-batch-job.properties        |  17 ++
 .../samza-hdfs-test-job-text.properties         |  17 ++
 .../resources/samza-hdfs-test-job.properties    |  16 ++
 .../samza/system/kafka/KafkaSystemAdmin.scala   |  34 ++--
 .../system/kafka/KafkaSystemConsumer.scala      |   9 +-
 .../samza/system/kafka/KafkaSystemFactory.scala |   1 +
 .../system/kafka/TestKafkaSystemConsumer.scala  |  36 +++-
 .../samza/system/mock/MockSystemAdmin.java      |   5 +
 .../yarn/TestSamzaAppMasterTaskManager.scala    |   2 +
 42 files changed, 1117 insertions(+), 368 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index aaa235a..bc07ae8 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -33,6 +33,10 @@
 
     <subpackage name="config">
         <allow class="org.apache.samza.SamzaException" />
+        <allow pkg="org.apache.samza.system" />
+        <allow pkg="org.apache.samza.util" />
+
+        <allow class="org.apache.samza.Partition" />
     </subpackage>
 
     <subpackage name="serializers">
@@ -113,6 +117,8 @@
             <subpackage name="stream">
                 <allow pkg="org.apache.samza.container" />
                 <allow pkg="org.apache.samza.system" />
+
+                <allow class="org.apache.samza.Partition" />
             </subpackage>
 
             <subpackage name="task">

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/docs/learn/documentation/versioned/container/samza-container.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/container/samza-container.md b/docs/learn/documentation/versioned/container/samza-container.md
index 9f46414..f97e8a3 100644
--- a/docs/learn/documentation/versioned/container/samza-container.md
+++ b/docs/learn/documentation/versioned/container/samza-container.md
@@ -102,4 +102,14 @@ Thus, if you want two events in different streams to be processed by the same ta
 
 There is one caveat in all of this: Samza currently assumes that a stream's partition count will never change. Partition splitting or repartitioning is not supported. If an input stream has N partitions, it is expected that it has always had, and will always have N partitions. If you want to re-partition a stream, you can write a job that reads messages from the stream, and writes them out to a new stream with the required number of partitions. For example, you could read messages from PageViewEvent, and write them to PageViewEventRepartition.
 
+### Broadcast Streams
+
+After 0.10.0, Samza supports broadcast streams. You can assign partitions from some streams to all the tasks. For example, you want all the tasks can consume partition 0 and 1 from a stream called global-stream-1, and partition 2 from a stream called global-stream-2. You now can configure:
+
+{% highlight jproperties %}
+task.broadcast.inputs=yourSystem.broadcast-stream-1#[0-1], yourSystem.broadcast-stream-2#2 
+{% endhighlight %}
+
+If you use "[]", you are specifying a range.
+
 ## [Streams &raquo;](streams.html)

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 8177fe5..78f2927 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -447,6 +447,22 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="task-global-inputs">task.global.inputs</td>
+                    <td class="default"></td>
+                    <td class="description">
+                       This property specifies the partitions that all tasks should consume. The systemStreamPartitions you put
+                       here will be sent to all the tasks.
+                       <dl>
+                         <dt>Format: <span class="system">system-name</span>.<span class="stream">stream-name</span>#<i>partitionId</i>
+                       or <span class="system">system-name</span>.<span class="stream">stream-name</span>#[<i>startingPartitionId</i>-<i>endingPartitionId</i>]</dt>
+                       </dl>
+                       <dl>
+                            <dt>Example: <code>task.global.inputs=mySystem.globalStream#[1-2], mySystem.anotherGlobalStream#1</code></dt>
+                       </dl>
+                    </td>
+                </tr>
+
+                <tr>
                     <th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th>
                 </tr>
 

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index a920a10..bc926c5 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -79,4 +79,14 @@ public interface SystemAdmin {
    */
   void createCoordinatorStream(String streamName);
 
+  /**
+   * Compare the two offsets. -1, 0, +1 means offset1 &lt; offset2,
+   * offset1 == offset2 and offset1 &gt; offset2 respectively. Return
+   * null if those two offsets are not comparable
+   *
+   * @param offset1
+   * @param offset2
+   * @return -1 if offset1 &lt; offset2; 0 if offset1 == offset2; 1 if offset1 &gt; offset2. Null if not comparable
+   */
+  Integer offsetComparator(String offset1, String offset2);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
index 63a1666..2157e69 100644
--- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
@@ -81,4 +81,8 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
     throw new UnsupportedOperationException("Single partition admin can't create coordinator streams.");
   }
 
+  @Override
+  public Integer offsetComparator(String offset1, String offset2) {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
new file mode 100644
index 0000000..015e994
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TaskConfigJava extends MapConfig {
+  // broadcast streams consumed by all tasks. e.g. kafka.foo#1
+  public static final String BROADCAST_INPUT_STREAMS = "task.broadcast.inputs";
+  private static final String BROADCAST_STREAM_PATTERN = "[^#\\.]+\\.[^#\\.]+#[\\d]+";
+  private static final String BROADCAST_STREAM_RANGE_PATTERN = "[^#\\.]+\\.[^#\\.]+#\\[[\\d]+\\-[\\d]+\\]+";
+  public static final Logger LOGGER = LoggerFactory.getLogger(TaskConfigJava.class);
+
+
+  public TaskConfigJava(Config config) {
+    super(config);
+  }
+
+  /**
+   * Get the systemStreamPartitions of the broadcast stream. Specifying
+   * one partition for one stream or a range of the partitions for one
+   * stream is allowed.
+   *
+   * @return a Set of SystemStreamPartitions
+   */
+  public Set<SystemStreamPartition> getBroadcastSystemStreamPartitions() {
+    HashSet<SystemStreamPartition> systemStreamPartitionSet = new HashSet<SystemStreamPartition>();
+    List<String> systemStreamPartitions = getList(BROADCAST_INPUT_STREAMS);
+
+    for (String systemStreamPartition : systemStreamPartitions) {
+      if (Pattern.matches(BROADCAST_STREAM_PATTERN, systemStreamPartition)) {
+
+        int hashPosition = systemStreamPartition.indexOf("#");
+        SystemStream systemStream = Util.getSystemStreamFromNames(systemStreamPartition.substring(0, hashPosition));
+        systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(Integer.valueOf(systemStreamPartition.substring(hashPosition + 1)))));
+
+      } else if (Pattern.matches(BROADCAST_STREAM_RANGE_PATTERN, systemStreamPartition)) {
+
+        SystemStream systemStream = Util.getSystemStreamFromNames(systemStreamPartition.substring(0, systemStreamPartition.indexOf("#")));
+
+        int startingPartition = Integer.valueOf(systemStreamPartition.substring(systemStreamPartition.indexOf("[") + 1, systemStreamPartition.lastIndexOf("-")));
+        int endingPartition = Integer.valueOf(systemStreamPartition.substring(systemStreamPartition.lastIndexOf("-") + 1, systemStreamPartition.indexOf("]")));
+
+        if (startingPartition > endingPartition) {
+          LOGGER.warn("The starting partition in stream " + systemStream.toString() + " is bigger than the ending Partition. No partition is added");
+        }
+        for (int i = startingPartition; i <= endingPartition; i++) {
+          systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(i)));
+        }
+      } else {
+        throw new IllegalArgumentException("incorrect format in " + systemStreamPartition
+            + ". Broadcast stream names should be in the form 'system.stream#partitionId' or 'system.stream#[partitionN-partitionM]'");
+      }
+    }
+    return systemStreamPartitionSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
new file mode 100644
index 0000000..3022b72
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.grouper.stream;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+
+public class GroupByPartition implements SystemStreamPartitionGrouper {
+  private TaskConfigJava taskConfig = null;
+  private Set<SystemStreamPartition> broadcastStreams = new HashSet<SystemStreamPartition>();
+
+  /**
+   * default constructor
+   */
+  public GroupByPartition() {
+  }
+
+  /**
+   * Accepts the config in the constructor
+   *
+   * @param config job's config
+   */
+  public GroupByPartition(Config config) {
+    if (config.containsKey(TaskConfigJava.BROADCAST_INPUT_STREAMS)) {
+      taskConfig = new TaskConfigJava(config);
+      this.broadcastStreams = taskConfig.getBroadcastSystemStreamPartitions();
+    }
+  }
+
+  @Override
+  public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps) {
+    Map<TaskName, Set<SystemStreamPartition>> groupedMap = new HashMap<TaskName, Set<SystemStreamPartition>>();
+
+    for (SystemStreamPartition ssp : ssps) {
+      // skip the broadcast streams if there is any
+      if (broadcastStreams.contains(ssp)) {
+        continue;
+      }
+
+      TaskName taskName = new TaskName("Partition " + ssp.getPartition().getPartitionId());
+      if (!groupedMap.containsKey(taskName)) {
+        groupedMap.put(taskName, new HashSet<SystemStreamPartition>());
+      }
+      groupedMap.get(taskName).add(ssp);
+    }
+
+    // assign the broadcast streams to all the taskNames
+    if (!broadcastStreams.isEmpty()) {
+      for (Set<SystemStreamPartition> value : groupedMap.values()) {
+        for (SystemStreamPartition ssp : broadcastStreams) {
+          value.add(ssp);
+        }
+      }
+    }
+
+    return groupedMap;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
new file mode 100644
index 0000000..608508e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.grouper.stream;
+
+import org.apache.samza.config.Config;
+
+public class GroupByPartitionFactory implements SystemStreamPartitionGrouperFactory {
+  @Override
+  public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config) {
+    return new GroupByPartition(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
new file mode 100644
index 0000000..a8b41de
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.grouper.stream;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+
+public class GroupBySystemStreamPartition implements SystemStreamPartitionGrouper {
+  private TaskConfigJava taskConfig = null;
+  private Set<SystemStreamPartition> broadcastStreams = new HashSet<SystemStreamPartition>();
+
+  /**
+   * default constructor
+   */
+  public GroupBySystemStreamPartition() {
+  }
+
+  /**
+   * A constructor that accepts job config as the parameter
+   *
+   * @param config job config
+   */
+  public GroupBySystemStreamPartition(Config config) {
+    if (config.containsKey(TaskConfigJava.BROADCAST_INPUT_STREAMS)) {
+      taskConfig = new TaskConfigJava(config);
+      broadcastStreams = taskConfig.getBroadcastSystemStreamPartitions();
+    }
+  }
+
+  @Override
+  public Map<TaskName, Set<SystemStreamPartition>> group(Set<SystemStreamPartition> ssps) {
+    Map<TaskName, Set<SystemStreamPartition>> groupedMap = new HashMap<TaskName, Set<SystemStreamPartition>>();
+
+    for (SystemStreamPartition ssp : ssps) {
+      if (broadcastStreams.contains(ssp)) {
+        continue;
+      }
+
+      HashSet<SystemStreamPartition> sspSet = new HashSet<SystemStreamPartition>();
+      sspSet.add(ssp);
+      groupedMap.put(new TaskName(ssp.toString()), sspSet);
+    }
+
+    // assign the broadcast streams to all the taskNames
+    if (!broadcastStreams.isEmpty()) {
+      for (Set<SystemStreamPartition> value : groupedMap.values()) {
+        for (SystemStreamPartition ssp : broadcastStreams) {
+          value.add(ssp);
+        }
+      }
+    }
+
+    return groupedMap;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
new file mode 100644
index 0000000..04a7444
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.grouper.stream;
+
+import org.apache.samza.config.Config;
+
+public class GroupBySystemStreamPartitionFactory implements SystemStreamPartitionGrouperFactory {
+
+  @Override
+  public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config) {
+    return new GroupBySystemStreamPartition();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 20e5d26..1464acc 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -72,8 +72,8 @@ object OffsetManager extends Logging {
     config: Config,
     checkpointManager: CheckpointManager = null,
     systemAdmins: Map[String, SystemAdmin] = Map(),
-    offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics,
-    latestOffsets: Map[SystemStreamPartition, String] = Map()) = {
+    offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics,
+    latestOffsets: Map[TaskName, Map[SystemStreamPartition, String]] = Map()) = {
     debug("Building offset manager for %s." format systemStreamMetadata)
 
     val offsetSettings = systemStreamMetadata
@@ -142,25 +142,28 @@ class OffsetManager(
   /**
    * offsetManagerMetrics for keeping track of checkpointed offsets of each SystemStreamPartition.
    */
-  val offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics,
+  val offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics,
 
   /*
    * The previously read checkpoints restored from the coordinator stream
    */
-  val previousCheckpointedOffsets: Map[SystemStreamPartition, String] = Map()
-  ) extends Logging {
+  val previousCheckpointedOffsets: Map[TaskName, Map[SystemStreamPartition, String]] = Map()) extends Logging {
 
   /**
    * Last offsets processed for each SystemStreamPartition.
    */
   // Filter out null offset values, we can't use them, these exist only because of SSP information
-  var lastProcessedOffsets = previousCheckpointedOffsets.filter(_._2 != null)
+  var lastProcessedOffsets = previousCheckpointedOffsets.map {
+    case (taskName, sspToOffset) => {
+      taskName -> sspToOffset.filter(_._2 != null)
+    }
+  }
 
   /**
    * Offsets to start reading from for each SystemStreamPartition. This
    * variable is populated after all checkpoints have been restored.
    */
-  var startingOffsets = Map[SystemStreamPartition, String]()
+  var startingOffsets = Map[TaskName, Map[SystemStreamPartition, String]]()
 
   /**
    * The set of system stream partitions that have been registered with the
@@ -172,7 +175,7 @@ class OffsetManager(
   def register(taskName: TaskName, systemStreamPartitionsToRegister: Set[SystemStreamPartition]) {
     systemStreamPartitions.getOrElseUpdate(taskName, mutable.Set[SystemStreamPartition]()).addAll(systemStreamPartitionsToRegister)
     // register metrics
-    systemStreamPartitions.foreach{ case (taskName, ssp) => ssp.foreach (ssp => offsetManagerMetrics.addCheckpointedOffset(ssp, "")) }
+    systemStreamPartitions.foreach { case (taskName, ssp) => ssp.foreach(ssp => offsetManagerMetrics.addCheckpointedOffset(ssp, "")) }
   }
 
   def start {
@@ -190,23 +193,32 @@ class OffsetManager(
   /**
    * Set the last processed offset for a given SystemStreamPartition.
    */
-  def update(systemStreamPartition: SystemStreamPartition, offset: String) {
-    lastProcessedOffsets += systemStreamPartition -> offset
+  def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) {
+    lastProcessedOffsets.get(taskName) match {
+      case Some(sspToOffsets) => lastProcessedOffsets += taskName -> (sspToOffsets + (systemStreamPartition -> offset))
+      case None => lastProcessedOffsets += (taskName -> Map(systemStreamPartition -> offset))
+    }
   }
 
   /**
    * Get the last processed offset for a SystemStreamPartition.
    */
-  def getLastProcessedOffset(systemStreamPartition: SystemStreamPartition) = {
-    lastProcessedOffsets.get(systemStreamPartition)
+  def getLastProcessedOffset(taskName: TaskName, systemStreamPartition: SystemStreamPartition) = {
+    lastProcessedOffsets.get(taskName) match {
+      case Some(sspToOffsets) => sspToOffsets.get(systemStreamPartition)
+      case None => None
+    }
   }
 
   /**
    * Get the starting offset for a SystemStreamPartition. This is the offset
    * where a SamzaContainer begins reading from when it starts up.
    */
-  def getStartingOffset(systemStreamPartition: SystemStreamPartition) = {
-    startingOffsets.get(systemStreamPartition)
+  def getStartingOffset(taskName: TaskName, systemStreamPartition: SystemStreamPartition) = {
+    startingOffsets.get(taskName) match {
+      case Some(sspToOffsets) => sspToOffsets.get(systemStreamPartition)
+      case None => None
+    }
   }
 
   /**
@@ -217,10 +229,19 @@ class OffsetManager(
       debug("Checkpointing offsets for taskName %s." format taskName)
 
       val sspsForTaskName = systemStreamPartitions.getOrElse(taskName, throw new SamzaException("No such SystemStreamPartition set " + taskName + " registered for this checkpointmanager")).toSet
-      val partitionOffsets = lastProcessedOffsets.filterKeys(sspsForTaskName.contains(_))
+      val partitionOffsets = lastProcessedOffsets.get(taskName) match {
+        case Some(sspToOffsets) => sspToOffsets.filterKeys(sspsForTaskName.contains(_))
+        case None => {
+          warn(taskName + " is not found... ")
+          Map[SystemStreamPartition, String]()
+        }
+      }
 
       checkpointManager.writeCheckpoint(taskName, new Checkpoint(partitionOffsets))
-      lastProcessedOffsets.foreach{ case (ssp, checkpoint) => offsetManagerMetrics.checkpointedOffsets(ssp).set(checkpoint) }
+      lastProcessedOffsets.get(taskName) match {
+        case Some(sspToOffsets) => sspToOffsets.foreach { case (ssp, checkpoint) => offsetManagerMetrics.checkpointedOffsets(ssp).set(checkpoint) }
+        case None =>
+      }
     } else {
       debug("Skipping checkpointing for taskName %s because no checkpoint manager is defined." format taskName)
     }
@@ -262,7 +283,9 @@ class OffsetManager(
    */
   private def loadOffsets {
     debug("Loading offsets")
-    lastProcessedOffsets.filter {
+    lastProcessedOffsets.map {
+      case (taskName, sspToOffsets) => {
+        taskName -> sspToOffsets.filter {
           case (systemStreamPartition, offset) =>
             val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream)
             if (!shouldKeep) {
@@ -271,7 +294,8 @@ class OffsetManager(
             info("Checkpointed offset is currently %s for %s." format (offset, systemStreamPartition))
             shouldKeep
         }
-
+      }
+    }
   }
 
   /**
@@ -279,27 +303,43 @@ class OffsetManager(
    * reset using resetOffsets.
    */
   private def stripResetStreams {
-    val systemStreamPartitionsToReset = getSystemStreamPartitionsToReset(lastProcessedOffsets.keys)
-
-    systemStreamPartitionsToReset.foreach(systemStreamPartition => {
-      val offset = lastProcessedOffsets(systemStreamPartition)
-      info("Got offset %s for %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStreamPartition))
-    })
+    val systemStreamPartitionsToReset = getSystemStreamPartitionsToReset(lastProcessedOffsets)
+
+    systemStreamPartitionsToReset.foreach {
+      case (taskName, systemStreamPartitions) => {
+        systemStreamPartitions.foreach {
+          systemStreamPartition =>
+            {
+              val offset = lastProcessedOffsets(taskName).get(systemStreamPartition)
+              info("Got offset %s for %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStreamPartition))
+            }
+        }
+      }
+    }
 
-    lastProcessedOffsets --= systemStreamPartitionsToReset
+    lastProcessedOffsets = lastProcessedOffsets.map {
+      case (taskName, sspToOffsets) => {
+        taskName -> (sspToOffsets -- systemStreamPartitionsToReset(taskName))
+      }
+    }
   }
 
   /**
-   * Returns a set of all SystemStreamPartitions in lastProcessedOffsets that need to be reset
+   * Returns a map of all SystemStreamPartitions in lastProcessedOffsets that need to be reset
    */
-  private def getSystemStreamPartitionsToReset(systemStreamPartitions: Iterable[SystemStreamPartition]): Set[SystemStreamPartition] = {
-    systemStreamPartitions
-      .filter(systemStreamPartition => {
-        val systemStream = systemStreamPartition.getSystemStream
-        offsetSettings
-          .getOrElse(systemStream, throw new SamzaException("Attempting to reset a stream that doesn't have offset settings %s." format systemStream))
-          .resetOffset
-      }).toSet
+  private def getSystemStreamPartitionsToReset(taskNameTosystemStreamPartitions: Map[TaskName, Map[SystemStreamPartition, String]]): Map[TaskName, Set[SystemStreamPartition]] = {
+    taskNameTosystemStreamPartitions.map {
+      case (taskName, sspToOffsets) => {
+        taskName -> (sspToOffsets.filter {
+          case (systemStreamPartition, offset) => {
+            val systemStream = systemStreamPartition.getSystemStream
+            offsetSettings
+              .getOrElse(systemStream, throw new SamzaException("Attempting to reset a stream that doesn't have offset settings %s." format systemStream))
+              .resetOffset
+          }
+        }.keys.toSet)
+      }
+    }
   }
 
   /**
@@ -307,16 +347,18 @@ class OffsetManager(
    * SystemStreamPartition, and populate startingOffsets.
    */
   private def loadStartingOffsets {
-    startingOffsets ++= lastProcessedOffsets
-      // Group offset map according to systemName.
-      .groupBy(_._1.getSystem)
-      // Get next offsets for each system.
-      .flatMap {
-        case (systemName, systemStreamPartitionOffsets) =>
-          systemAdmins
-            .getOrElse(systemName, throw new SamzaException("Missing system admin for %s. Need system admin to load starting offsets." format systemName))
-            .getOffsetsAfter(systemStreamPartitionOffsets)
+    startingOffsets = lastProcessedOffsets.map {
+      case (taskName, sspToOffsets) => {
+        taskName -> {
+          sspToOffsets.groupBy(_._1.getSystem).flatMap {
+            case (systemName, systemStreamPartitionOffsets) =>
+              systemAdmins
+                .getOrElse(systemName, throw new SamzaException("Missing system admin for %s. Need system admin to load starting offsets." format systemName))
+                .getOffsetsAfter(systemStreamPartitionOffsets)
+          }
+        }
       }
+    }
   }
 
   /**
@@ -324,42 +366,47 @@ class OffsetManager(
    * that was registered, but has no offset.
    */
   private def loadDefaults {
-    val allSSPs: Set[SystemStreamPartition] = systemStreamPartitions
-      .values
-      .flatten
-      .toSet
-
-    allSSPs.foreach(systemStreamPartition => {
-      if (!startingOffsets.contains(systemStreamPartition)) {
-        val systemStream = systemStreamPartition.getSystemStream
-        val partition = systemStreamPartition.getPartition
-        val offsetSetting = offsetSettings.getOrElse(systemStream, throw new SamzaException("Attempting to load defaults for stream %s, which has no offset settings." format systemStream))
-        val systemStreamMetadata = offsetSetting.metadata
-        val offsetType = offsetSetting.defaultOffset
-
-        debug("Got default offset type %s for %s" format (offsetType, systemStreamPartition))
-
-        val systemStreamPartitionMetadata = systemStreamMetadata
-          .getSystemStreamPartitionMetadata
-          .get(partition)
-
-        if (systemStreamPartitionMetadata != null) {
-          val nextOffset = {
-            val requested = systemStreamPartitionMetadata.getOffset(offsetType)
-
-            if (requested == null) {
-              warn("Requested offset type %s in %s, but the stream is empty. Defaulting to the upcoming offset." format (offsetType, systemStreamPartition))
-              systemStreamPartitionMetadata.getOffset(OffsetType.UPCOMING)
-            } else requested
+    val taskNameToSSPs: Map[TaskName, Set[SystemStreamPartition]] = systemStreamPartitions
+
+    taskNameToSSPs.foreach {
+      case (taskName, systemStreamPartitions) => {
+        systemStreamPartitions.foreach { systemStreamPartition =>
+          if (!startingOffsets.contains(taskName) || !startingOffsets(taskName).contains(systemStreamPartition)) {
+            val systemStream = systemStreamPartition.getSystemStream
+            val partition = systemStreamPartition.getPartition
+            val offsetSetting = offsetSettings.getOrElse(systemStream, throw new SamzaException("Attempting to load defaults for stream %s, which has no offset settings." format systemStream))
+            val systemStreamMetadata = offsetSetting.metadata
+            val offsetType = offsetSetting.defaultOffset
+
+            debug("Got default offset type %s for %s" format (offsetType, systemStreamPartition))
+
+            val systemStreamPartitionMetadata = systemStreamMetadata
+              .getSystemStreamPartitionMetadata
+              .get(partition)
+
+            if (systemStreamPartitionMetadata != null) {
+              val nextOffset = {
+                val requested = systemStreamPartitionMetadata.getOffset(offsetType)
+
+                if (requested == null) {
+                  warn("Requested offset type %s in %s, but the stream is empty. Defaulting to the upcoming offset." format (offsetType, systemStreamPartition))
+                  systemStreamPartitionMetadata.getOffset(OffsetType.UPCOMING)
+                } else requested
+              }
+
+              debug("Got next default offset %s for %s" format (nextOffset, systemStreamPartition))
+
+              startingOffsets.get(taskName) match {
+                case Some(sspToOffsets) => startingOffsets += taskName -> (sspToOffsets + (systemStreamPartition -> nextOffset))
+                case None => startingOffsets += taskName -> Map(systemStreamPartition -> nextOffset)
+              }
+
+            } else {
+              throw new SamzaException("No metadata available for partition %s." format systemStreamPartitionMetadata)
+            }
           }
-
-          debug("Got next default offset %s for %s" format (nextOffset, systemStreamPartition))
-
-          startingOffsets += systemStreamPartition -> nextOffset
-        } else {
-          throw new SamzaException("No metadata available for partition %s." format systemStreamPartitionMetadata)
         }
       }
-    })
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index e4b14f4..6d73bb9 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -19,10 +19,10 @@
 
 package org.apache.samza.config
 
-import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.util.Logging
+import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
 
 object JobConfig {
   // job config constants

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index 24da35f..6916c5c 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -51,14 +51,17 @@ class RunLoop(
 
   // Messages come from the chooser with no connection to the TaskInstance they're bound for.
   // Keep a mapping of SystemStreamPartition to TaskInstance to efficiently route them.
-  val systemStreamPartitionToTaskInstance: Map[SystemStreamPartition, TaskInstance] = {
+  val systemStreamPartitionToTaskInstances = getSystemStreamPartitionToTaskInstancesMapping
+
+  def getSystemStreamPartitionToTaskInstancesMapping: Map[SystemStreamPartition, List[TaskInstance]] = {
     // We could just pass in the SystemStreamPartitionMap during construction, but it's safer and cleaner to derive the information directly
     def getSystemStreamPartitionToTaskInstance(taskInstance: TaskInstance) = taskInstance.systemStreamPartitions.map(_ -> taskInstance).toMap
 
-    taskInstances.values.map { getSystemStreamPartitionToTaskInstance }.flatten.toMap
+    taskInstances.values.map { getSystemStreamPartitionToTaskInstance }.flatten.groupBy(_._1).map {
+      case (ssp, ssp2taskInstance) => ssp -> ssp2taskInstance.map(_._2).toList
+    }
   }
 
-
   /**
    * Starts the run loop. Blocks until either the tasks request shutdown, or an
    * unhandled exception is thrown.
@@ -111,11 +114,15 @@ class RunLoop(
         trace("Processing incoming message envelope for SSP %s." format ssp)
         metrics.envelopes.inc
 
-        val taskInstance = systemStreamPartitionToTaskInstance(ssp)
-        val coordinator = new ReadableCoordinator(taskInstance.taskName)
-
-        taskInstance.process(envelope, coordinator)
-        checkCoordinator(coordinator)
+        val taskInstances = systemStreamPartitionToTaskInstances(ssp)
+        taskInstances.foreach {
+          taskInstance =>
+            {
+              val coordinator = new ReadableCoordinator(taskInstance.taskName)
+              taskInstance.process(envelope, coordinator)
+              checkCoordinator(coordinator)
+            }
+        }
       } else {
         trace("No incoming message envelope was available.")
         metrics.nullEnvelopes.inc

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 85b012b..61e228b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -335,8 +335,8 @@ object SamzaContainer extends Logging {
 
     info("Got checkpoint manager: %s" format checkpointManager)
 
-    val combinedOffsets: Map[SystemStreamPartition, String] =
-      containerModel.getTasks.values().flatMap(_.getCheckpointedOffsets).toMap
+    val combinedOffsets: Map[TaskName, Map[SystemStreamPartition, String]] =
+      containerModel.getTasks.map{case (taskName, taskModel) => taskName -> mapAsScalaMap(taskModel.getCheckpointedOffsets).toMap }.toMap
 
     val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics, combinedOffsets)
 
@@ -508,6 +508,7 @@ object SamzaContainer extends Logging {
         taskName = taskName,
         config = config,
         metrics = taskInstanceMetrics,
+        systemAdmins = systemAdmins,
         consumerMultiplexer = consumerMultiplexer,
         collector = collector,
         containerContext = containerContext,

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index c5a5ea5..d32a929 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -36,14 +36,15 @@ import org.apache.samza.task.StreamTask
 import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.task.TaskInstanceCollector
 import org.apache.samza.util.Logging
-
 import scala.collection.JavaConversions._
+import org.apache.samza.system.SystemAdmin
 
 class TaskInstance(
   task: StreamTask,
   val taskName: TaskName,
   config: Config,
   metrics: TaskInstanceMetrics,
+  systemAdmins: Map[String, SystemAdmin],
   consumerMultiplexer: SystemConsumers,
   collector: TaskInstanceCollector,
   containerContext: SamzaContainerContext,
@@ -69,9 +70,15 @@ class TaskInstance(
     def getSamzaContainerContext = containerContext
 
     override def setStartingOffset(ssp: SystemStreamPartition, offset: String): Unit = {
-      offsetManager.startingOffsets += (ssp -> offset)
+      val startingOffsets = offsetManager.startingOffsets
+      offsetManager.startingOffsets += taskName -> (startingOffsets(taskName) + (ssp -> offset))
     }
   }
+  // store the (ssp -> if this ssp is catched up) mapping. "catched up"
+  // means the same ssp in other taskInstances have the same offset as
+  // the one here.
+  var ssp2catchedupMapping: scala.collection.mutable.Map[SystemStreamPartition, Boolean] = scala.collection.mutable.Map[SystemStreamPartition, Boolean]()
+  systemStreamPartitions.foreach(ssp2catchedupMapping += _ -> false)
 
   def registerMetrics {
     debug("Registering metrics for taskName: %s" format taskName)
@@ -115,13 +122,13 @@ class TaskInstance(
     debug("Registering consumers for taskName: %s" format taskName)
 
     systemStreamPartitions.foreach(systemStreamPartition => {
-      val offset = offsetManager.getStartingOffset(systemStreamPartition)
-        .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format systemStreamPartition))
+      val offset = offsetManager.getStartingOffset(taskName, systemStreamPartition)
+      .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format systemStreamPartition))
       consumerMultiplexer.register(systemStreamPartition, offset)
       metrics.addOffsetGauge(systemStreamPartition, () => {
         offsetManager
-          .getLastProcessedOffset(systemStreamPartition)
-          .getOrElse(null)
+          .getLastProcessedOffset(taskName, systemStreamPartition)
+          .orNull
       })
     })
   }
@@ -129,15 +136,24 @@ class TaskInstance(
   def process(envelope: IncomingMessageEnvelope, coordinator: ReadableCoordinator) {
     metrics.processes.inc
 
-    trace("Processing incoming message envelope for taskName and SSP: %s, %s" format (taskName, envelope.getSystemStreamPartition))
-
-    exceptionHandler.maybeHandle {
-      task.process(envelope, collector, coordinator)
+    if (!ssp2catchedupMapping.getOrElse(envelope.getSystemStreamPartition,
+      throw new SamzaException(envelope.getSystemStreamPartition + " is not registered!"))) {
+      checkCaughtUp(envelope)
     }
 
-    trace("Updating offset map for taskName, SSP and offset: %s, %s, %s" format (taskName, envelope.getSystemStreamPartition, envelope.getOffset))
+    if (ssp2catchedupMapping(envelope.getSystemStreamPartition)) {
+      metrics.messagesActuallyProcessed.inc
+
+      trace("Processing incoming message envelope for taskName and SSP: %s, %s" format (taskName, envelope.getSystemStreamPartition))
+
+      exceptionHandler.maybeHandle {
+        task.process(envelope, collector, coordinator)
+      }
+
+      trace("Updating offset map for taskName, SSP and offset: %s, %s, %s" format (taskName, envelope.getSystemStreamPartition, envelope.getOffset))
 
-    offsetManager.update(envelope.getSystemStreamPartition, envelope.getOffset)
+      offsetManager.update(taskName, envelope.getSystemStreamPartition, envelope.getOffset)
+    }
   }
 
   def window(coordinator: ReadableCoordinator) {
@@ -193,4 +209,35 @@ class TaskInstance(
   override def toString() = "TaskInstance for class %s and taskName %s." format (task.getClass.getName, taskName)
 
   def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, closable=%s]" format (taskName, isWindowableTask, isClosableTask)
+
+  /**
+   * From the envelope, check if this SSP has catched up with the starting offset of the SSP
+   * in this TaskInstance. If the offsets are not comparable, default to true, which means
+   * it's already catched-up.
+   */
+  private def checkCaughtUp(envelope: IncomingMessageEnvelope) = {
+    systemAdmins match {
+      case null => {
+        warn("systemAdmin is null. Set all SystemStreamPartitions to catched-up")
+        ssp2catchedupMapping(envelope.getSystemStreamPartition) = true
+      }
+      case others => {
+        val startingOffset = offsetManager.getStartingOffset(taskName, envelope.getSystemStreamPartition)
+            .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format envelope.getSystemStreamPartition))
+        val system = envelope.getSystemStreamPartition.getSystem
+        others(system).offsetComparator(envelope.getOffset, startingOffset) match {
+          case null => {
+            info("offsets in " + system + " is not comparable. Set all SystemStreamPartitions to catched-up")
+            ssp2catchedupMapping(envelope.getSystemStreamPartition) = true // not comparable
+          }
+          case result => {
+            if (result >= 0) {
+              info(envelope.getSystemStreamPartition.toString + " is catched up.")
+              ssp2catchedupMapping(envelope.getSystemStreamPartition) = true
+            }
+          }
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
index 9dc7051..8b86388 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
@@ -31,6 +31,7 @@ class TaskInstanceMetrics(
   val commits = newCounter("commit-calls")
   val windows = newCounter("window-calls")
   val processes = newCounter("process-calls")
+  val messagesActuallyProcessed = newCounter("messages-actually-processed")
   val sends = newCounter("send-calls")
   val flushes = newCounter("flush-calls")
   val messagesSent = newCounter("messages-sent")

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
deleted file mode 100644
index 44e95fc..0000000
--- a/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.grouper.stream
-
-import org.apache.samza.container.TaskName
-import java.util
-import org.apache.samza.system.SystemStreamPartition
-import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
-import org.apache.samza.config.Config
-
-/**
- * Group the {@link org.apache.samza.system.SystemStreamPartition}s by their Partition, with the key being
- * the string representation of the Partition.
- */
-class GroupByPartition extends SystemStreamPartitionGrouper {
-  override def group(ssps: util.Set[SystemStreamPartition]) = {
-    ssps.groupBy( s => new TaskName("Partition " + s.getPartition.getPartitionId) )
-      .map(r => r._1 -> r._2.asJava)
-  }
-}
-
-class GroupByPartitionFactory extends SystemStreamPartitionGrouperFactory {
-  override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupByPartition
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala b/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
deleted file mode 100644
index 3c0acad..0000000
--- a/samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.container.grouper.stream
-
-import org.apache.samza.container.TaskName
-import java.util
-import org.apache.samza.system.SystemStreamPartition
-import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
-import org.apache.samza.config.Config
-
-/**
- * Group the {@link org.apache.samza.system.SystemStreamPartition}s by themselves, effectively putting each
- * SystemStreamPartition into its own group, with the key being the string representation of the SystemStringPartition
- */
-class GroupBySystemStreamPartition extends SystemStreamPartitionGrouper {
-  override def group(ssps: util.Set[SystemStreamPartition]) = ssps.groupBy({ s=> new TaskName(s.toString)}).map(r => r._1 -> r._2.asJava)
-}
-
-class GroupBySystemStreamPartitionFactory extends SystemStreamPartitionGrouperFactory {
-  override def getSystemStreamPartitionGrouper(config: Config): SystemStreamPartitionGrouper = new GroupBySystemStreamPartition
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
index c29853d..460d11c 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
@@ -142,4 +142,8 @@ class FileReaderSystemAdmin extends SystemAdmin with Logging {
   def createCoordinatorStream(streamName: String) {
     throw new UnsupportedOperationException("Method not implemented.")
   }
+
+  override def offsetComparator(offset1: String , offset2: String) = {
+    null
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
new file mode 100644
index 0000000..2d6060e
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+public class TestTaskConfigJava {
+
+  @Test
+  public void testGetBroadcastSystemStreamPartitions() {
+    HashMap<String, String> map = new HashMap<String, String>();
+    map.put("task.broadcast.inputs", "kafka.foo#4, kafka.boo#5, kafka.z-o-o#[12-14]");
+    Config config = new MapConfig(map);
+
+    TaskConfigJava taskConfig = new TaskConfigJava(config);
+    Set<SystemStreamPartition> systemStreamPartitionSet = taskConfig.getBroadcastSystemStreamPartitions();
+
+    HashSet<SystemStreamPartition> expected = new HashSet<SystemStreamPartition>();
+    expected.add(new SystemStreamPartition("kafka", "foo", new Partition(4)));
+    expected.add(new SystemStreamPartition("kafka", "boo", new Partition(5)));
+    expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(12)));
+    expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(13)));
+    expected.add(new SystemStreamPartition("kafka", "z-o-o", new Partition(14)));
+    assertEquals(expected, systemStreamPartitionSet);
+
+    map.put("task.broadcast.inputs", "kafka.foo");
+    taskConfig = new TaskConfigJava(new MapConfig(map));
+    boolean catchCorrectException = false;
+    try {
+      taskConfig.getBroadcastSystemStreamPartitions();
+    } catch (IllegalArgumentException e) {
+      catchCorrectException = true;
+    }
+    assertTrue(catchCorrectException);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
new file mode 100644
index 0000000..2a8d447
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.grouper.stream;
+
+import static org.junit.Assert.*;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+public class TestGroupByPartition {
+  SystemStreamPartition aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0));
+  SystemStreamPartition aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1));
+  SystemStreamPartition aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2));
+  SystemStreamPartition ab1 = new SystemStreamPartition("SystemA", "StreamB", new Partition(1));
+  SystemStreamPartition ab2 = new SystemStreamPartition("SystemA", "StreamB", new Partition(2));
+  SystemStreamPartition ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0));
+
+  @Test
+  public void testLocalStreamsGroupedCorrectly() {
+    HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>();
+    GroupByPartition grouper = new GroupByPartition();
+    Map<TaskName, Set<SystemStreamPartition>> emptyResult = grouper.group(allSSPs);
+    // empty SSP set gets empty groups
+    assertTrue(emptyResult.isEmpty());
+
+    Collections.addAll(allSSPs, aa0, aa1, aa2, ab1, ab2, ac0);
+    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
+    Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>();
+
+    HashSet<SystemStreamPartition> partition0 = new HashSet<SystemStreamPartition>();
+    partition0.add(aa0);
+    partition0.add(ac0);
+    expectedResult.put(new TaskName("Partition 0"), partition0);
+
+    HashSet<SystemStreamPartition> partition1 = new HashSet<SystemStreamPartition>();
+    partition1.add(aa1);
+    partition1.add(ab1);
+    expectedResult.put(new TaskName("Partition 1"), partition1);
+
+    HashSet<SystemStreamPartition> partition2 = new HashSet<SystemStreamPartition>();
+    partition2.add(aa2);
+    partition2.add(ab2);
+    expectedResult.put(new TaskName("Partition 2"), partition2);
+
+    assertEquals(expectedResult, result);
+  }
+
+  @Test
+  public void testBroadcastStreamsGroupedCorrectly() {
+    HashMap<String, String> configMap = new HashMap<String, String>();
+    configMap.put("task.broadcast.inputs", "SystemA.StreamA#0, SystemA.StreamB#1");
+    Config config = new MapConfig(configMap);
+    GroupByPartition grouper = new GroupByPartition(config);
+
+    HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>();
+    Collections.addAll(allSSPs, aa0, aa1, aa2, ab1, ab2, ac0);
+    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
+
+    Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>();
+
+    HashSet<SystemStreamPartition> partition0 = new HashSet<SystemStreamPartition>();
+    partition0.add(aa0); // broadcast stream
+    partition0.add(ac0);
+    partition0.add(ab1); // broadcast stream
+    expectedResult.put(new TaskName("Partition 0"), partition0);
+
+    HashSet<SystemStreamPartition> partition1 = new HashSet<SystemStreamPartition>();
+    partition1.add(aa1);
+    partition1.add(ab1); // broadcast stream
+    partition1.add(aa0); // broadcast stream
+    expectedResult.put(new TaskName("Partition 1"), partition1);
+
+    HashSet<SystemStreamPartition> partition2 = new HashSet<SystemStreamPartition>();
+    partition2.add(aa2);
+    partition2.add(ab2);
+    partition2.add(aa0); // broadcast stream
+    partition2.add(ab1); // broadcast stream
+    expectedResult.put(new TaskName("Partition 2"), partition2);
+
+    assertEquals(expectedResult, result);
+  }
+
+  @Test
+  public void testNoTaskOnlyContainsBroadcastStreams() {
+    HashMap<String, String> configMap = new HashMap<String, String>();
+    configMap.put("task.broadcast.inputs", "SystemA.StreamA#0, SystemA.StreamB#1");
+    Config config = new MapConfig(configMap);
+    GroupByPartition grouper = new GroupByPartition(config);
+
+    HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>();
+    Collections.addAll(allSSPs, aa0, ab1, ab2);
+    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
+
+    Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>();
+    HashSet<SystemStreamPartition> partition2 = new HashSet<SystemStreamPartition>();
+    partition2.add(aa0); // broadcast stream
+    partition2.add(ab1);
+    partition2.add(ab2); // broadcast stream
+    expectedResult.put(new TaskName("Partition 2"), partition2);
+
+    assertEquals(expectedResult, result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
new file mode 100644
index 0000000..1bd14a4
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.grouper.stream;
+
+import static org.junit.Assert.*;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+public class TestGroupBySystemStreamPartition {
+  SystemStreamPartition aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0));
+  SystemStreamPartition aa1 = new SystemStreamPartition("SystemA", "StreamA", new Partition(1));
+  SystemStreamPartition aa2 = new SystemStreamPartition("SystemA", "StreamA", new Partition(2));
+  SystemStreamPartition ac0 = new SystemStreamPartition("SystemA", "StreamB", new Partition(0));
+
+  @Test
+  public void testLocalStreamGroupedCorrectly() {
+    HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>();
+
+    GroupBySystemStreamPartition grouper = new GroupBySystemStreamPartition();
+    Map<TaskName, Set<SystemStreamPartition>> emptyResult = grouper.group(allSSPs);
+    assertTrue(emptyResult.isEmpty());
+
+    Collections.addAll(allSSPs, aa0, aa1, aa2, ac0);
+    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
+    Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>();
+
+    HashSet<SystemStreamPartition> partitionaa0 = new HashSet<SystemStreamPartition>();
+    partitionaa0.add(aa0);
+    expectedResult.put(new TaskName(aa0.toString()), partitionaa0);
+
+    HashSet<SystemStreamPartition> partitionaa1 = new HashSet<SystemStreamPartition>();
+    partitionaa1.add(aa1);
+    expectedResult.put(new TaskName(aa1.toString()), partitionaa1);
+
+    HashSet<SystemStreamPartition> partitionaa2 = new HashSet<SystemStreamPartition>();
+    partitionaa2.add(aa2);
+    expectedResult.put(new TaskName(aa2.toString()), partitionaa2);
+
+    HashSet<SystemStreamPartition> partitionac0 = new HashSet<SystemStreamPartition>();
+    partitionac0.add(ac0);
+    expectedResult.put(new TaskName(ac0.toString()), partitionac0);
+
+    assertEquals(expectedResult, result);
+  }
+
+  @Test
+  public void testBroadcastStreamGroupedCorrectly() {
+    HashMap<String, String> configMap = new HashMap<String, String>();
+    configMap.put("task.broadcast.inputs", "SystemA.StreamA#0");
+    Config config = new MapConfig(configMap);
+
+    HashSet<SystemStreamPartition> allSSPs = new HashSet<SystemStreamPartition>();
+    Collections.addAll(allSSPs, aa0, aa1, aa2, ac0);
+    GroupBySystemStreamPartition grouper = new GroupBySystemStreamPartition(config);
+    Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
+
+    Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<TaskName, Set<SystemStreamPartition>>();
+
+    HashSet<SystemStreamPartition> partitionaa1 = new HashSet<SystemStreamPartition>();
+    partitionaa1.add(aa1);
+    partitionaa1.add(aa0);
+    expectedResult.put(new TaskName(aa1.toString()), partitionaa1);
+
+    HashSet<SystemStreamPartition> partitionaa2 = new HashSet<SystemStreamPartition>();
+    partitionaa2.add(aa2);
+    partitionaa2.add(aa0);
+    expectedResult.put(new TaskName(aa2.toString()), partitionaa2);
+
+    HashSet<SystemStreamPartition> partitionac0 = new HashSet<SystemStreamPartition>();
+    partitionac0.add(ac0);
+    partitionac0.add(aa0);
+    expectedResult.put(new TaskName(ac0.toString()), partitionac0);
+
+    assertEquals(expectedResult, result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index daa5eab..c00ef91 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -49,9 +49,9 @@ class TestOffsetManager {
     val offsetManager = OffsetManager(systemStreamMetadata, config)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
-    assertFalse(offsetManager.getLastProcessedOffset(systemStreamPartition).isDefined)
-    assertTrue(offsetManager.getStartingOffset(systemStreamPartition).isDefined)
-    assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get)
+    assertFalse(offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).isDefined)
+    assertTrue(offsetManager.getStartingOffset(taskName, systemStreamPartition).isDefined)
+    assertEquals("0", offsetManager.getStartingOffset(taskName, systemStreamPartition).get)
   }
 
   @Test
@@ -73,14 +73,14 @@ class TestOffsetManager {
     assertEquals(taskName, checkpointManager.registered.head)
     assertEquals(checkpointManager.checkpoints.head._2, checkpointManager.readLastCheckpoint(taskName))
     // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
-    assertEquals("46", offsetManager.getStartingOffset(systemStreamPartition).get)
-    assertEquals("45", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
-    offsetManager.update(systemStreamPartition, "46")
-    assertEquals("46", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
-    offsetManager.update(systemStreamPartition, "47")
-    assertEquals("47", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
+    assertEquals("46", offsetManager.getStartingOffset(taskName, systemStreamPartition).get)
+    assertEquals("45", offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).get)
+    offsetManager.update(taskName, systemStreamPartition, "46")
+    assertEquals("46", offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).get)
+    offsetManager.update(taskName, systemStreamPartition, "47")
+    assertEquals("47", offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).get)
     // Should never update starting offset.
-    assertEquals("46", offsetManager.getStartingOffset(systemStreamPartition).get)
+    assertEquals("46", offsetManager.getStartingOffset(taskName, systemStreamPartition).get)
     offsetManager.checkpoint(taskName)
     val expectedCheckpoint = new Checkpoint(Map(systemStreamPartition -> "47"))
     assertEquals(expectedCheckpoint, checkpointManager.readLastCheckpoint(taskName))
@@ -103,11 +103,11 @@ class TestOffsetManager {
     // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
     offsetManager.checkpoint(taskName)
     assertEquals("45", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
-    offsetManager.update(systemStreamPartition, "46")
-    offsetManager.update(systemStreamPartition, "47")
+    offsetManager.update(taskName, systemStreamPartition, "46")
+    offsetManager.update(taskName, systemStreamPartition, "47")
     offsetManager.checkpoint(taskName)
     assertEquals("47", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
-    offsetManager.update(systemStreamPartition, "48")
+    offsetManager.update(taskName, systemStreamPartition, "48")
     offsetManager.checkpoint(taskName)
     assertEquals("48", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
   }
@@ -133,7 +133,7 @@ class TestOffsetManager {
     assertEquals(taskName, checkpointManager.registered.head)
     assertEquals(checkpoint, checkpointManager.readLastCheckpoint(taskName))
     // Should be zero even though the checkpoint has an offset of 45, since we're forcing a reset.
-    assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get)
+    assertEquals("0", offsetManager.getStartingOffset(taskName, systemStreamPartition).get)
   }
 
   @Test
@@ -223,7 +223,7 @@ class TestOffsetManager {
     offsetManager.start
     assertTrue(checkpointManager.isStarted)
     assertEquals(1, checkpointManager.registered.size)
-    assertNull(offsetManager.getLastProcessedOffset(systemStreamPartition1).getOrElse(null))
+    assertNull(offsetManager.getLastProcessedOffset(taskName, systemStreamPartition1).getOrElse(null))
   }
 
   @Test
@@ -236,7 +236,7 @@ class TestOffsetManager {
     val offsetManager = new OffsetManager(offsetSettings = Map(ssp.getSystemStream -> settings))
     offsetManager.register(taskName, Set(ssp))
     offsetManager.start
-    assertEquals(Some("13"), offsetManager.getStartingOffset(ssp))
+    assertEquals(Some("13"), offsetManager.getStartingOffset(taskName, ssp))
   }
 
 
@@ -255,10 +255,7 @@ class TestOffsetManager {
       override def stop { isStopped = true }
 
       // Only for testing purposes - not present in actual checkpoint manager
-      def getOffets: util.Map[SystemStreamPartition, String] =
-      {
-        checkpoint.getOffsets()
-      }
+      def getOffets = Map(taskName -> mapAsScalaMap(checkpoint.getOffsets()).toMap)
     }
   }
 
@@ -281,6 +278,8 @@ class TestOffsetManager {
       override def createCoordinatorStream(streamName: String) {
         new UnsupportedOperationException("Method not implemented.")
       }
+
+      override def offsetComparator(offset1: String, offset2: String) = null
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index b9d9e73..b4d6f35 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -20,6 +20,7 @@
 package org.apache.samza.container
 
 import org.junit.Test
+import org.junit.Assert._
 import org.mockito.Matchers
 import org.mockito.Mockito._
 import org.mockito.invocation.InvocationOnMock
@@ -243,4 +244,19 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
     testMetrics.commits.getCount should equal(1L)
     testMetrics.windows.getCount should equal(1L)
   }
+
+  @Test
+  def testGetSystemStreamPartitionToTaskInstancesMapping {
+    val ti0 = mock[TaskInstance]
+    val ti1 = mock[TaskInstance]
+    val ti2 = mock[TaskInstance]
+    when(ti0.systemStreamPartitions).thenReturn(Set(ssp0))
+    when(ti1.systemStreamPartitions).thenReturn(Set(ssp1))
+    when(ti2.systemStreamPartitions).thenReturn(Set(ssp1))
+
+    val mockTaskInstances = Map(taskName0 -> ti0, taskName1 -> ti1, new TaskName("2") -> ti2)
+    val runLoop = new RunLoop(mockTaskInstances, null, new SamzaContainerMetrics)
+    val expected = Map(ssp0 -> List(ti0), ssp1 -> List(ti1, ti2))
+    assertEquals(expected, runLoop.getSystemStreamPartitionToTaskInstancesMapping)
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 4db6d5c..6de8710 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -152,6 +152,7 @@ class TestSamzaContainer extends AssertionsForJUnit {
       taskName,
       config,
       new TaskInstanceMetrics,
+      null,
       consumerMultiplexer,
       collector,
       containerContext

http://git-wip-us.apache.org/repos/asf/samza/blob/1f77f8b9/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 7caad28..5457f0e 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -20,7 +20,6 @@
 package org.apache.samza.container
 
 import java.util.concurrent.ConcurrentHashMap
-
 import org.apache.samza.SamzaException
 import org.apache.samza.Partition
 import org.apache.samza.checkpoint.OffsetManager
@@ -44,8 +43,9 @@ import org.apache.samza.task._
 import org.junit.Assert._
 import org.junit.Test
 import org.scalatest.Assertions.intercept
-
 import scala.collection.JavaConversions._
+import org.apache.samza.system.SystemAdmin
+import scala.collection.mutable.ListBuffer
 
 class TestTaskInstance {
   @Test
@@ -64,6 +64,7 @@ class TestTaskInstance {
       new SerdeManager)
     val systemStream = new SystemStream("test-system", "test-stream")
     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+    val systemStreamPartitions = Set(systemStreamPartition)
     // Pretend our last checkpointed (next) offset was 2.
     val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
     val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
@@ -75,15 +76,17 @@ class TestTaskInstance {
       taskName,
       config,
       new TaskInstanceMetrics,
+      null,
       consumerMultiplexer,
       collector,
       containerContext,
-      offsetManager)
+      offsetManager,
+      systemStreamPartitions = systemStreamPartitions)
     // Pretend we got a message with offset 2 and next offset 3.
     val coordinator = new ReadableCoordinator(taskName)
     taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), coordinator)
     // Check to see if the offset manager has been properly updated with offset 3.
-    val lastProcessedOffset = offsetManager.getLastProcessedOffset(systemStreamPartition)
+    val lastProcessedOffset = offsetManager.getLastProcessedOffset(taskName, systemStreamPartition)
     assertTrue(lastProcessedOffset.isDefined)
     assertEquals("2", lastProcessedOffset.get)
   }
@@ -156,6 +159,7 @@ class TestTaskInstance {
       new SerdeManager)
     val systemStream = new SystemStream("test-system", "test-stream")
     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+    val systemStreamPartitions = Set(systemStreamPartition)
     // Pretend our last checkpointed (next) offset was 2.
     val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
     val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
@@ -170,10 +174,12 @@ class TestTaskInstance {
       taskName,
       config,
       taskMetrics,
+      null,
       consumerMultiplexer,
       collector,
       containerContext,
       offsetManager,
+      systemStreamPartitions = systemStreamPartitions,
       exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config))
 
     val coordinator = new ReadableCoordinator(taskName)
@@ -210,6 +216,7 @@ class TestTaskInstance {
       new SerdeManager)
     val systemStream = new SystemStream("test-system", "test-stream")
     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+    val systemStreamPartitions = Set(systemStreamPartition)
     // Pretend our last checkpointed (next) offset was 2.
     val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
     val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
@@ -224,10 +231,12 @@ class TestTaskInstance {
       taskName,
       config,
       taskMetrics,
+      null,
       consumerMultiplexer,
       collector,
       containerContext,
       offsetManager,
+      systemStreamPartitions = systemStreamPartitions,
       exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config))
 
     val coordinator = new ReadableCoordinator(taskName)
@@ -242,7 +251,6 @@ class TestTaskInstance {
     assertEquals(1L, getCount(group, classOf[FatalException].getName))
   }
 
-
   /**
    * Tests that the init() method of task can override the existing offset
    * assignment.
@@ -258,8 +266,7 @@ class TestTaskInstance {
       override def init(config: Config, context: TaskContext): Unit = {
 
         assertTrue("Can only update offsets for assigned partition",
-          context.getSystemStreamPartitions.contains(partition1)
-        )
+          context.getSystemStreamPartitions.contains(partition1))
 
         context.setStartingOffset(partition1, "10")
       }
@@ -278,22 +285,85 @@ class TestTaskInstance {
 
     val offsetManager = new OffsetManager()
 
-    offsetManager.startingOffsets ++= Map(partition0 -> "0", partition1 -> "0")
+    offsetManager.startingOffsets += taskName -> Map(partition0 -> "0", partition1 -> "0")
 
     val taskInstance = new TaskInstance(
       task,
       taskName,
       config,
       metrics,
+      null,
       consumers,
       collector,
       containerContext,
       offsetManager,
-      systemStreamPartitions = Set(partition0, partition1) )
+      systemStreamPartitions = Set(partition0, partition1))
 
     taskInstance.initTask
 
-    assertEquals(Some("0"), offsetManager.getStartingOffset(partition0))
-    assertEquals(Some("10"), offsetManager.getStartingOffset(partition1))
+    assertEquals(Some("0"), offsetManager.getStartingOffset(taskName, partition0))
+    assertEquals(Some("10"), offsetManager.getStartingOffset(taskName, partition1))
+  }
+
+  @Test
+  def testIgnoreMessagesOlderThanStartingOffsets {
+    val partition0 = new SystemStreamPartition("system", "stream", new Partition(0))
+    val partition1 = new SystemStreamPartition("system", "stream", new Partition(1))
+    val config = new MapConfig()
+    val chooser = new RoundRobinChooser()
+    val consumers = new SystemConsumers(chooser, consumers = Map.empty)
+    val producers = new SystemProducers(Map.empty, new SerdeManager())
+    val metrics = new TaskInstanceMetrics()
+    val taskName = new TaskName("testing")
+    val collector = new TaskInstanceCollector(producers)
+    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
+    val offsetManager = new OffsetManager()
+    offsetManager.startingOffsets += taskName -> Map(partition0 -> "0", partition1 -> "100")
+    val systemAdmins = Map("system" -> new MockSystemAdmin)
+    var result = new ListBuffer[IncomingMessageEnvelope]
+
+    val task = new StreamTask {
+      def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
+        result += envelope
+      }
+    }
+
+    val taskInstance = new TaskInstance(
+      task,
+      taskName,
+      config,
+      metrics,
+      systemAdmins,
+      consumers,
+      collector,
+      containerContext,
+      offsetManager,
+      systemStreamPartitions = Set(partition0, partition1))
+
+    val coordinator = new ReadableCoordinator(taskName)
+    val envelope1 = new IncomingMessageEnvelope(partition0, "1", null, null)
+    val envelope2 = new IncomingMessageEnvelope(partition0, "2", null, null)
+    val envelope3 = new IncomingMessageEnvelope(partition1, "1", null, null)
+    val envelope4 = new IncomingMessageEnvelope(partition1, "102", null, null)
+
+    taskInstance.process(envelope1, coordinator)
+    taskInstance.process(envelope2, coordinator)
+    taskInstance.process(envelope3, coordinator)
+    taskInstance.process(envelope4, coordinator)
+
+    val expected = List(envelope1, envelope2, envelope4)
+    assertEquals(expected, result.toList)
+  }
+}
+
+class MockSystemAdmin extends SystemAdmin {
+  override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = { offsets }
+  override def getSystemStreamMetadata(streamNames: java.util.Set[String]) = null
+  override def createCoordinatorStream(stream: String) = {}
+  override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = {}
+  override def validateChangelogStream(topicName: String, numOfPartitions: Int) = {}
+
+  override def offsetComparator(offset1: String, offset2: String) = {
+    offset1.toLong compare offset2.toLong
   }
 }