You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/03/13 22:09:43 UTC

[1/2] git commit: SAMZA-182; use HADOOP_YARN_HOME instead of YARN_HOME in run-classh.sh.

Repository: incubator-samza
Updated Branches:
  refs/heads/master c932c5029 -> 3a6f2555a


SAMZA-182; use HADOOP_YARN_HOME instead of YARN_HOME in run-classh.sh.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/782a0923
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/782a0923
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/782a0923

Branch: refs/heads/master
Commit: 782a0923652b712af84d81dc6688a3880ab040b7
Parents: c932c50
Author: Chris Riccomini <cr...@criccomi-ld.linkedin.biz>
Authored: Thu Mar 13 14:06:14 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-ld.linkedin.biz>
Committed: Thu Mar 13 14:06:14 2014 -0700

----------------------------------------------------------------------
 samza-shell/src/main/bash/run-class.sh | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/782a0923/samza-shell/src/main/bash/run-class.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/run-class.sh b/samza-shell/src/main/bash/run-class.sh
index 2fa2acf..5aa34cd 100755
--- a/samza-shell/src/main/bash/run-class.sh
+++ b/samza-shell/src/main/bash/run-class.sh
@@ -33,8 +33,9 @@ if [ ! -d "$base_dir/lib" ]; then
   exit 1
 fi
 
-YARN_HOME="${YARN_HOME:-$HOME/.samza}"
-CLASSPATH=$YARN_HOME/conf
+HADOOP_YARN_HOME="${HADOOP_YARN_HOME:-$HOME/.samza}"
+HADOOP_CONF_DIR="${HADOOP_CONF_DIR:-$HADOOP_YARN_HOME/conf}"
+CLASSPATH=$HADOOP_CONF_DIR
 
 for file in $base_dir/lib/*.[jw]ar;
 do


[2/2] git commit: SAMZA-157; add samza.offset.default configuration for streams and systems.

Posted by cr...@apache.org.
SAMZA-157; add samza.offset.default configuration for streams and systems.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/3a6f2555
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/3a6f2555
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/3a6f2555

Branch: refs/heads/master
Commit: 3a6f2555aa92d47dd85b54f3f1503719c08d31a7
Parents: 782a092
Author: Chris Riccomini <cr...@criccomi-ld.linkedin.biz>
Authored: Thu Mar 13 14:08:46 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-ld.linkedin.biz>
Committed: Thu Mar 13 14:08:46 2014 -0700

----------------------------------------------------------------------
 .../org/apache/samza/system/SystemAdmin.java    |  14 +-
 .../samza/system/SystemStreamMetadata.java      |  50 +++
 ...inglePartitionWithoutOffsetsSystemAdmin.java |  12 +
 .../apache/samza/checkpoint/OffsetManager.scala | 351 +++++++++++++++++++
 .../org/apache/samza/config/StreamConfig.scala  |   4 +
 .../org/apache/samza/config/SystemConfig.scala  |   3 +
 .../apache/samza/container/SamzaContainer.scala |  47 +--
 .../apache/samza/container/TaskInstance.scala   |  72 ++--
 .../system/chooser/BootstrappingChooser.scala   |  19 +-
 .../samza/checkpoint/TestOffsetManager.scala    | 188 ++++++++++
 .../samza/container/TestTaskInstance.scala      |  28 +-
 .../samza/system/kafka/KafkaSystemAdmin.scala   |  16 +-
 .../system/kafka/TestKafkaSystemAdmin.scala     |  19 +-
 .../samza/system/mock/MockSystemAdmin.java      |  12 +
 .../test/integration/TestStatefulTask.scala     |  20 +-
 15 files changed, 737 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 437bfb2..3976253 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -29,9 +29,21 @@ import java.util.Set;
  * system.
  */
 public interface SystemAdmin {
+
+  /**
+   * Fetches the offsets for the messages immediately after the supplied offsets
+   * for a group of SystemStreamPartitions.
+   * 
+   * @param offsets
+   *          Map from SystemStreamPartition to current offsets.
+   * @return Map from SystemStreamPartition to offsets immediately after the
+   *         current offsets.
+   */
+  Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets);
+
   /**
    * Fetch metadata from a system for a set of streams.
-   *
+   * 
    * @param streamNames
    *          The streams to to fetch metadata for.
    * @return A map from stream name to SystemStreamMetadata for each stream

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java
index 32e142a..817c557 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java
@@ -21,7 +21,9 @@ package org.apache.samza.system;
 
 import java.util.Collections;
 import java.util.Map;
+
 import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
 
 /**
  * SystemAdmins use this class to return useful metadata about a stream's offset
@@ -139,6 +141,23 @@ public class SystemStreamMetadata {
       return upcomingOffset;
     }
 
+    /**
+     * @param offsetType
+     *          The type of offset to get. Either oldest, newest, or upcoming.
+     * @return The corresponding offset for the offset type requested.
+     */
+    public String getOffset(OffsetType offsetType) {
+      if (offsetType.equals(OffsetType.OLDEST)) {
+        return getOldestOffset();
+      } else if (offsetType.equals(OffsetType.NEWEST)) {
+        return getNewestOffset();
+      } else if (offsetType.equals(OffsetType.UPCOMING)) {
+        return getUpcomingOffset();
+      } else {
+        throw new SamzaException("Invalid offset type defined " + offsetType + ".");
+      }
+    }
+
     @Override
     public int hashCode() {
       final int prime = 31;
@@ -181,4 +200,35 @@ public class SystemStreamMetadata {
       return "SystemStreamPartitionMetadata [oldestOffset=" + oldestOffset + ", newestOffset=" + newestOffset + ", upcomingOffset=" + upcomingOffset + "]";
     }
   }
+
+  /**
+   * OffsetType is an enum used to define which offset should be used when
+   * reading from a SystemStreamPartition for the first time.
+   */
+  public enum OffsetType {
+
+    /**
+     * Signals the offset of the oldest message in a SystemStreamPartition.
+     */
+    OLDEST("oldest"),
+
+    /**
+     * Signals the offset of the newest message in a SystemStreamPartition.
+     */
+    NEWEST("newest"),
+
+    /**
+     * Signals the offset of the next message to be written into a
+     * SystemStreamPartition. If the offset of the most recent message written
+     * to a SystemStreamPartition is 7, then upcoming would signal offset 8
+     * (assuming the offsets were incremental).
+     */
+    UPCOMING("upcoming");
+
+    private final String offsetType;
+
+    private OffsetType(String offsetType) {
+      this.offsetType = offsetType;
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
index 44fd82a..38e313f 100644
--- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
@@ -26,6 +26,7 @@ import org.apache.samza.Partition;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import org.apache.samza.system.SystemStreamPartition;
 
 /**
  * A simple helper admin class that defines a single partition (partition 0) for
@@ -51,4 +52,15 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
 
     return metadata;
   }
+
+  @Override
+  public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+    Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>();
+
+    for (SystemStreamPartition systemStreamPartition : offsets.keySet()) {
+      offsetsAfter.put(systemStreamPartition, null);
+    }
+
+    return offsetsAfter;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
new file mode 100644
index 0000000..80341c8
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint
+
+import org.apache.samza.system.SystemStream
+import org.apache.samza.Partition
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.system.SystemStreamMetadata.OffsetType
+import org.apache.samza.SamzaException
+import scala.collection.JavaConversions._
+import grizzled.slf4j.Logging
+import org.apache.samza.config.Config
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.system.SystemAdmin
+
+/**
+ * OffsetSetting encapsulates a SystemStream's metadata, default offset, and
+ * reset offset settings. It's just a convenience class to make OffsetManager
+ * easier to work with.
+ */
+case class OffsetSetting(
+  /**
+   * The metadata for the SystemStream.
+   */
+  metadata: SystemStreamMetadata,
+
+  /**
+   * The default offset (oldest, newest, or upcoming) for the SystemStream.
+   * This setting is used when no checkpoint is available for a SystemStream
+   * if the job is starting for the first time, or the SystemStream has been
+   * reset (see resetOffsets, below).
+   */
+  defaultOffset: OffsetType,
+
+  /**
+   * Whether the SystemStream's offset should be reset or not. Determines
+   * whether an offset should be ignored at initialization time, even if a
+   * checkpoint is available. This is useful for jobs that wish to restart
+   * reading from a stream at a different position than where they last
+   * checkpointed. If this is true, then defaultOffset will be used to find
+   * the new starting position in the stream.
+   */
+  resetOffset: Boolean)
+
+/**
+ * OffsetManager object is a helper that does wiring to build an OffsetManager
+ * from a config object.
+ */
+object OffsetManager extends Logging {
+  def apply(
+    systemStreamMetadata: Map[SystemStream, SystemStreamMetadata],
+    config: Config,
+    checkpointManager: CheckpointManager = null,
+    systemAdmins: Map[String, SystemAdmin] = Map()) = {
+
+    debug("Building offset manager for %s." format systemStreamMetadata)
+
+    val offsetSettings = systemStreamMetadata
+      .map {
+        case (systemStream, systemStreamMetadata) =>
+          // Get default offset.
+          val streamDefaultOffset = config.getDefaultStreamOffset(systemStream)
+          val systemDefaultOffset = config.getDefaultSystemOffset(systemStream.getSystem)
+          val defaultOffsetType = if (streamDefaultOffset.isDefined) {
+            OffsetType.valueOf(streamDefaultOffset.get.toUpperCase)
+          } else if (systemDefaultOffset.isDefined) {
+            OffsetType.valueOf(systemDefaultOffset.get.toUpperCase)
+          } else {
+            info("No default offset for %s defined. Using newest." format systemStream)
+            OffsetType.UPCOMING
+          }
+          debug("Using default offset %s for %s." format (defaultOffsetType, systemStream))
+
+          // Get reset offset.
+          val resetOffset = config.getResetOffset(systemStream)
+          debug("Using reset offset %s for %s." format (resetOffset, systemStream))
+
+          // Build OffsetSetting so we can create a map for OffsetManager.
+          (systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset))
+      }.toMap
+    new OffsetManager(offsetSettings, checkpointManager, systemAdmins)
+  }
+}
+
+/**
+ * OffsetManager does several things:
+ *
+ * <ul>
+ * <li>Loads last checkpointed offset for all input SystemStreamPartitions in a
+ * SamzaContainer.</li>
+ * <li>Uses last checkpointed offset to figure out the next offset to start
+ * reading from for each input SystemStreamPartition in a SamzaContainer</li>
+ * <li>Keep track of the last processed offset for each SystemStreamPartitions
+ * in a SamzaContainer.</li>
+ * <li>Checkpoints the last processed offset for each SystemStreamPartitions
+ * in a SamzaContainer periodically to the CheckpointManager.</li>
+ * </ul>
+ *
+ * All partitions must be registered before start is called, and start must be
+ * called before get/update/checkpoint/stop are called.
+ */
+class OffsetManager(
+
+  /**
+   * Offset settings for all streams that the OffsetManager is managing.
+   */
+  val offsetSettings: Map[SystemStream, OffsetSetting] = Map(),
+
+  /**
+   * Optional checkpoint manager for checkpointing offsets whenever
+   * checkpoint is called.
+   */
+  val checkpointManager: CheckpointManager = null,
+
+  /**
+   * SystemAdmins that are used to get next offsets from last checkpointed
+   * offsets. Map is from system name to SystemAdmin class for the system.
+   */
+  val systemAdmins: Map[String, SystemAdmin] = Map()) extends Logging {
+
+  /**
+   * Last offsets processed for each SystemStreamPartition.
+   */
+  var lastProcessedOffsets = Map[SystemStreamPartition, String]()
+
+  /**
+   * Offsets to start reading from for each SystemStreamPartition. This
+   * variable is populated after all checkpoints have been restored.
+   */
+  var startingOffsets = Map[SystemStreamPartition, String]()
+
+  /**
+   * The set of system stream partitions that have been registered with the
+   * OffsetManager. These are the SSPs that will be tracked within the offset
+   * manager.
+   */
+  var systemStreamPartitions = Set[SystemStreamPartition]()
+
+  def register(systemStreamPartition: SystemStreamPartition) {
+    systemStreamPartitions += systemStreamPartition
+  }
+
+  def start {
+    registerCheckpointManager
+    loadOffsetsFromCheckpointManager
+    stripResetStreams
+    loadStartingOffsets
+    loadDefaults
+
+    info("Successfully loaded last processed offsets: %s" format lastProcessedOffsets)
+    info("Successfully loaded starting offsets: %s" format startingOffsets)
+  }
+
+  /**
+   * Set the last processed offset for a given SystemStreamPartition.
+   */
+  def update(systemStreamPartition: SystemStreamPartition, offset: String) {
+    lastProcessedOffsets += systemStreamPartition -> offset
+  }
+
+  /**
+   * Get the last processed offset for a SystemStreamPartition.
+   */
+  def getLastProcessedOffset(systemStreamPartition: SystemStreamPartition) = {
+    lastProcessedOffsets.get(systemStreamPartition)
+  }
+
+  /**
+   * Get the starting offset for a SystemStreamPartition. This is the offset 
+   * where a SamzaContainer begins reading from when it starts up.
+   */
+  def getStartingOffset(systemStreamPartition: SystemStreamPartition) = {
+    startingOffsets.get(systemStreamPartition)
+  }
+
+  /**
+   * Checkpoint all offsets for a given partition using the CheckpointManager.
+   */
+  def checkpoint(partition: Partition) {
+    if (checkpointManager != null) {
+      debug("Checkpointing offsets for partition %s." format partition)
+
+      val partitionOffsets = lastProcessedOffsets
+        .filterKeys(_.getPartition.equals(partition))
+        .map { case (systemStreamPartition, offset) => (systemStreamPartition.getSystemStream, offset) }
+        .toMap
+
+      checkpointManager.writeCheckpoint(partition, new Checkpoint(partitionOffsets))
+    } else {
+      debug("Skipping checkpointing for partition %s because no checkpoint manager is defined." format partition)
+    }
+  }
+
+  def stop {
+    if (checkpointManager != null) {
+      debug("Shutting down checkpoint manager.")
+
+      checkpointManager.stop
+    } else {
+      debug("Skipping checkpoint manager shutdown because no checkpoint manager is defined.")
+    }
+  }
+
+  /**
+   * Returns a set of partitions that have been registered with this offset
+   * manager.
+   */
+  private def getPartitions = {
+    systemStreamPartitions
+      .map(_.getPartition)
+      .toSet
+  }
+
+  /**
+   * Register all partitions with the CheckpointManager.
+   */
+  private def registerCheckpointManager {
+    if (checkpointManager != null) {
+      debug("Registering checkpoint manager.")
+
+      getPartitions.foreach(checkpointManager.register)
+    } else {
+      debug("Skipping checkpoint manager registration because no manager was defined.")
+    }
+  }
+
+  /**
+   * Loads last processed offsets from checkpoint manager for all registered
+   * partitions.
+   */
+  private def loadOffsetsFromCheckpointManager {
+    if (checkpointManager != null) {
+      debug("Loading offsets from checkpoint manager.")
+
+      checkpointManager.start
+
+      lastProcessedOffsets ++= getPartitions.flatMap(restoreOffsetsFromCheckpoint(_))
+    } else {
+      debug("Skipping offset load from checkpoint manager because no manager was defined.")
+    }
+  }
+
+  /**
+   * Loads last processed offsets for a single partition.
+   */
+  private def restoreOffsetsFromCheckpoint(partition: Partition): Map[SystemStreamPartition, String] = {
+    debug("Loading checkpoints for partition: %s." format partition)
+
+    checkpointManager
+      .readLastCheckpoint(partition)
+      .getOffsets
+      .map { case (systemStream, offset) => (new SystemStreamPartition(systemStream, partition), offset) }
+      .toMap
+  }
+
+  /**
+   * Removes offset settings for all SystemStreams that are to be forcibly
+   * reset using resetOffsets.
+   */
+  private def stripResetStreams {
+    val systemStreamPartitionsToReset = getSystemStreamPartitionsToReset(lastProcessedOffsets.keys)
+
+    systemStreamPartitionsToReset.foreach(systemStreamPartition => {
+      val offset = lastProcessedOffsets(systemStreamPartition)
+      info("Got offset %s for %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStreamPartition))
+    })
+
+    lastProcessedOffsets --= systemStreamPartitionsToReset
+  }
+
+  /**
+   * Returns a set of all SystemStreamPartitions in lastProcessedOffsets that need to be reset
+   */
+  private def getSystemStreamPartitionsToReset(systemStreamPartitions: Iterable[SystemStreamPartition]): Set[SystemStreamPartition] = {
+    systemStreamPartitions
+      .filter(systemStreamPartition => {
+        val systemStream = systemStreamPartition.getSystemStream
+        offsetSettings
+          .getOrElse(systemStream, throw new SamzaException("Attempting to reset a stream that doesn't have offset settings %s." format systemStream))
+          .resetOffset
+      }).toSet
+  }
+
+  /**
+   * Use last processed offsets to get next available offset for each
+   * SystemStreamPartition, and populate startingOffsets.
+   */
+  private def loadStartingOffsets {
+    startingOffsets ++= lastProcessedOffsets
+      // Group offset map according to systemName.
+      .groupBy(_._1.getSystem)
+      // Get next offsets for each system.
+      .flatMap {
+        case (systemName, systemStreamPartitionOffsets) =>
+          systemAdmins
+            .getOrElse(systemName, throw new SamzaException("Missing system admin for %s. Need system admin to load starting offsets." format systemName))
+            .getOffsetsAfter(systemStreamPartitionOffsets)
+      }
+  }
+
+  /**
+   * Use defaultOffsets to get a next offset for every SystemStreamPartition
+   * that was registered, but has no offset.
+   */
+  private def loadDefaults {
+    systemStreamPartitions.foreach(systemStreamPartition => {
+      if (!startingOffsets.contains(systemStreamPartition)) {
+        val systemStream = systemStreamPartition.getSystemStream
+        val partition = systemStreamPartition.getPartition
+        val offsetSetting = offsetSettings.getOrElse(systemStream, throw new SamzaException("Attempting to load defaults for stream %s, which has no offset settings." format systemStream))
+        val systemStreamMetadata = offsetSetting.metadata
+        val offsetType = offsetSetting.defaultOffset
+
+        debug("Got default offset type %s for %s" format (offsetType, systemStreamPartition))
+
+        val systemStreamPartitionMetadata = systemStreamMetadata
+          .getSystemStreamPartitionMetadata
+          .get(partition)
+
+        if (systemStreamPartitionMetadata != null) {
+          val nextOffset = systemStreamPartitionMetadata.getOffset(offsetType)
+
+          debug("Got next default offset %s for %s" format (nextOffset, systemStreamPartition))
+
+          startingOffsets += systemStreamPartition -> nextOffset
+        } else {
+          throw new SamzaException("No metadata available for partition %s." format systemStreamPartitionMetadata)
+        }
+      }
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index 517e9ae..d71ead1 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -31,6 +31,7 @@ object StreamConfig {
   val MSG_SERDE = STREAM_PREFIX + "samza.msg.serde"
   val KEY_SERDE = STREAM_PREFIX + "samza.key.serde"
   val CONSUMER_RESET_OFFSET = STREAM_PREFIX + "samza.reset.offset"
+  val CONSUMER_OFFSET_DEFAULT = STREAM_PREFIX + "samza.offset.default"
 
   implicit def Config2Stream(config: Config) = new StreamConfig(config)
 }
@@ -64,6 +65,9 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
       case _ => false
     }
 
+  def getDefaultStreamOffset(systemStream: SystemStream) =
+    getOption(StreamConfig.CONSUMER_OFFSET_DEFAULT format (systemStream.getSystem, systemStream.getStream))
+
   /**
    * Returns a list of all SystemStreams that have a serde defined from the config file.
    */

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
index ce63a8a..5bb17c7 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
@@ -29,6 +29,7 @@ object SystemConfig {
   val SYSTEM_FACTORY = "systems.%s.samza.factory"
   val KEY_SERDE = "systems.%s.samza.key.serde"
   val MSG_SERDE = "systems.%s.samza.msg.serde"
+  val CONSUMER_OFFSET_DEFAULT = SYSTEM_PREFIX + "samza.offset.default"
 
   implicit def Config2System(config: Config) = new SystemConfig(config)
 }
@@ -40,6 +41,8 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
 
   def getSystemMsgSerde(name: String) = getOption(SystemConfig.MSG_SERDE format name)
 
+  def getDefaultSystemOffset(systemName: String) = getOption(SystemConfig.CONSUMER_OFFSET_DEFAULT format (systemName))
+
   /**
    * Returns a list of all system names from the config file. Useful for
    * getting individual systems.

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 9c23244..c101b59 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -62,6 +62,7 @@ import org.apache.samza.system.chooser.RoundRobinChooserFactory
 import scala.collection.JavaConversions._
 import org.apache.samza.system.SystemAdmin
 import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.checkpoint.OffsetManager
 
 object SamzaContainer extends Logging {
   def main(args: Array[String]) {
@@ -100,12 +101,6 @@ object SamzaContainer extends Logging {
 
     info("Got system names: %s" format systemNames)
 
-    val resetInputStreams = systemNames.flatMap(systemName => {
-      config.getResetOffsetMap(systemName)
-    }).toMap
-
-    info("Got input stream resets: %s" format resetInputStreams)
-
     val serdeStreams = systemNames.foldLeft(Set[SystemStream]())(_ ++ config.getSerdeStreams(_))
 
     debug("Got serde streams: %s" format serdeStreams)
@@ -279,6 +274,10 @@ object SamzaContainer extends Logging {
 
     info("Got checkpoint manager: %s" format checkpointManager)
 
+    val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins)
+
+    info("Got offset manager: %s" format offsetManager)
+
     val consumerMultiplexer = new SystemConsumers(
       // TODO add config values for no new message timeout and max msgs per stream partition
       chooser = chooser,
@@ -420,12 +419,11 @@ object SamzaContainer extends Logging {
         metrics = taskInstanceMetrics,
         consumerMultiplexer = consumerMultiplexer,
         producerMultiplexer = producerMultiplexer,
+        offsetManager = offsetManager,
         storageManager = storageManager,
-        checkpointManager = checkpointManager,
         reporters = reporters,
         listeners = listeners,
         inputStreams = inputStreamsForThisPartition,
-        resetInputStreams = resetInputStreams,
         windowMs = taskWindowMs,
         commitMs = taskCommitMs,
         collector = collector)
@@ -440,8 +438,8 @@ object SamzaContainer extends Logging {
       config = config,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
+      offsetManager = offsetManager,
       metrics = samzaContainerMetrics,
-      checkpointManager = checkpointManager,
       reporters = reporters,
       jvm = jvm)
   }
@@ -482,7 +480,7 @@ class SamzaContainer(
   consumerMultiplexer: SystemConsumers,
   producerMultiplexer: SystemProducers,
   metrics: SamzaContainerMetrics,
-  checkpointManager: CheckpointManager = null,
+  offsetManager: OffsetManager = new OffsetManager,
   reporters: Map[String, MetricsReporter] = Map(),
   jvm: JvmMetrics = null) extends Runnable with Logging {
 
@@ -491,7 +489,7 @@ class SamzaContainer(
       info("Starting container.")
 
       startMetrics
-      startCheckpoints
+      startOffsetManager
       startStores
       startTask
       startProducers
@@ -524,7 +522,7 @@ class SamzaContainer(
       shutdownProducers
       shutdownTask
       shutdownStores
-      shutdownCheckpoints
+      shutdownOffsetManager
       shutdownMetrics
 
       info("Shutdown complete.")
@@ -550,18 +548,14 @@ class SamzaContainer(
     })
   }
 
-  def startCheckpoints {
-    info("Registering task instances with checkpoints.")
+  def startOffsetManager {
+    info("Registering task instances with offsets.")
 
-    taskInstances.values.foreach(_.registerCheckpoints)
+    taskInstances.values.foreach(_.registerOffsets)
 
-    if (checkpointManager != null) {
-      info("Registering checkpoint manager.")
+    info("Starting offset manager.")
 
-      checkpointManager.start
-    } else {
-      warn("No checkpoint manager defined. No consumer offsets will be maintained for this job.")
-    }
+    offsetManager.start
   }
 
   def startStores {
@@ -666,13 +660,10 @@ class SamzaContainer(
     taskInstances.values.foreach(_.shutdownStores)
   }
 
-  def shutdownCheckpoints {
-    if (checkpointManager != null) {
-      info("Shutting down checkpoint manager.")
-      checkpointManager.stop
-    } else {
-      info("No checkpoint manager defined, so skipping checkpoint manager stop.")
-    }
+  def shutdownOffsetManager {
+    info("Shutting down offset manager.")
+
+    offsetManager.stop
   }
 
   def shutdownMetrics {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 5127595..c4b135c 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -1,5 +1,4 @@
 /*
-
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -43,6 +42,8 @@ import org.apache.samza.system.SystemConsumers
 import org.apache.samza.system.SystemProducers
 import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.metrics.Gauge
+import org.apache.samza.checkpoint.OffsetManager
+import org.apache.samza.SamzaException
 
 class TaskInstance(
   task: StreamTask,
@@ -51,18 +52,16 @@ class TaskInstance(
   metrics: TaskInstanceMetrics,
   consumerMultiplexer: SystemConsumers,
   producerMultiplexer: SystemProducers,
+  offsetManager: OffsetManager = new OffsetManager,
   storageManager: TaskStorageManager = null,
-  checkpointManager: CheckpointManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
   listeners: Seq[TaskLifecycleListener] = Seq(),
   inputStreams: Set[SystemStream] = Set(),
-  resetInputStreams: Map[SystemStream, Boolean] = Map(),
   windowMs: Long = -1,
   commitMs: Long = 60000,
   clock: () => Long = { System.currentTimeMillis },
   collector: ReadableCollector = new ReadableCollector) extends Logging {
 
-  var offsets = Map[SystemStream, String]()
   var lastWindowMs = 0L
   var lastCommitMs = 0L
   val isInitableTask = task.isInstanceOf[InitableTask]
@@ -86,14 +85,12 @@ class TaskInstance(
     reporters.values.foreach(_.register(metrics.source, metrics.registry))
   }
 
-  def registerCheckpoints {
-    if (checkpointManager != null) {
-      debug("Registering checkpoint manager for partition: %s." format partition)
+  def registerOffsets {
+    debug("Registering offsets for partition: %s." format partition)
 
-      checkpointManager.register(partition)
-    } else {
-      debug("Skipping checkpoint manager registration for partition: %s." format partition)
-    }
+    inputStreams.foreach(systemStream => {
+      offsetManager.register(new SystemStreamPartition(systemStream, partition))
+    })
   }
 
   def startStores {
@@ -127,34 +124,19 @@ class TaskInstance(
   }
 
   def registerConsumers {
-    if (checkpointManager != null) {
-      debug("Loading checkpoints for partition: %s." format partition)
-
-      val checkpoint = checkpointManager.readLastCheckpoint(partition)
-
-      if (checkpoint != null) {
-        for ((systemStream, offset) <- checkpoint.getOffsets) {
-          if (!resetInputStreams.getOrElse(systemStream, false)) {
-            offsets += systemStream -> offset
-
-            metrics.addOffsetGauge(systemStream, () => offsets(systemStream))
-          } else {
-            info("Got offset %s for %s and partition %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStream, partition))
-          }
-        }
-
-        info("Successfully loaded offsets for partition: %s, %s" format (partition, offsets))
-      } else {
-        warn("No checkpoint found for partition: %s. This is allowed if this is your first time running the job, but if it's not, you've probably lost data." format partition)
-      }
-    }
-
     debug("Registering consumers for partition: %s." format partition)
 
-    inputStreams.foreach(stream =>
-      consumerMultiplexer.register(
-        new SystemStreamPartition(stream, partition),
-        offsets.get(stream).getOrElse(null)))
+    inputStreams.foreach(systemStream => {
+      val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+      val offset = offsetManager.getStartingOffset(systemStreamPartition)
+        .getOrElse(throw new SamzaException("No offset defined for partition %s: %s" format (partition, systemStream)))
+      consumerMultiplexer.register(systemStreamPartition, offset)
+      metrics.addOffsetGauge(systemStream, () => {
+        offsetManager
+          .getLastProcessedOffset(systemStreamPartition)
+          .getOrElse(null)
+      })
+    })
   }
 
   def process(envelope: IncomingMessageEnvelope, coordinator: ReadableCoordinator) {
@@ -170,7 +152,7 @@ class TaskInstance(
 
     trace("Updating offset map for partition: %s, %s, %s" format (partition, envelope.getSystemStreamPartition, envelope.getOffset))
 
-    offsets += envelope.getSystemStreamPartition.getSystemStream -> envelope.getOffset
+    offsetManager.update(envelope.getSystemStreamPartition, envelope.getOffset)
   }
 
   def window(coordinator: ReadableCoordinator) {
@@ -221,11 +203,9 @@ class TaskInstance(
 
       producerMultiplexer.flush(metrics.source)
 
-      if (checkpointManager != null) {
-        trace("Committing checkpoint manager for partition: %s" format partition)
+      trace("Committing offset manager for partition: %s" format partition)
 
-        checkpointManager.writeCheckpoint(partition, new Checkpoint(offsets))
-      }
+      offsetManager.checkpoint(partition)
 
       lastCommitMs = clock()
     } else {
@@ -258,9 +238,9 @@ class TaskInstance(
       debug("Skipping storage manager shutdown for partition: %s" format partition)
     }
   }
-  
+
   override def toString() = "TaskInstance for class %s and partition %s." format (task.getClass.getName, partition)
- 
-  def toDetailedString() = "TaskInstance [windowable=%s, window_time=%s, commit_time=%s, closable=%s, collector_size=%s]" format (isWindowableTask, lastWindowMs, lastCommitMs, isClosableTask, collector.envelopes.size) 
-  
+
+  def toDetailedString() = "TaskInstance [windowable=%s, window_time=%s, commit_time=%s, closable=%s, collector_size=%s]" format (isWindowableTask, lastWindowMs, lastCommitMs, isClosableTask, collector.envelopes.size)
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
index f7d3c8b..91c1813 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -30,6 +30,7 @@ import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemStreamMetadata
 import scala.collection.JavaConversions._
 import org.apache.samza.SamzaException
+import org.apache.samza.system.SystemStreamMetadata.OffsetType
 
 /**
  * BootstrappingChooser is a composable MessageChooser that only chooses
@@ -114,7 +115,7 @@ class BootstrappingChooser(
     // offset for this system stream partition, then we've already read all
     // messages in the stream, and we're at head for this system stream 
     // partition.
-    checkOffset(systemStreamPartition, offset, Upcoming)
+    checkOffset(systemStreamPartition, offset, OffsetType.UPCOMING)
 
     wrapped.register(systemStreamPartition, offset)
   }
@@ -165,7 +166,7 @@ class BootstrappingChooser(
         // If the offset we just read is the same as the offset for the last 
         // message (newest) in this system stream partition, then we have read 
         // all messages, and can mark this SSP as bootstrapped.
-        checkOffset(systemStreamPartition, offset, Newest)
+        checkOffset(systemStreamPartition, offset, OffsetType.NEWEST)
       }
 
       envelope
@@ -207,7 +208,7 @@ class BootstrappingChooser(
    *                         Upcoming is useful during the registration phase,
    *                         and newest is useful during the choosing phase.
    */
-  private def checkOffset(systemStreamPartition: SystemStreamPartition, offset: String, newestOrUpcoming: OffsetType) {
+  private def checkOffset(systemStreamPartition: SystemStreamPartition, offset: String, offsetType: OffsetType) {
     val systemStream = systemStreamPartition.getSystemStream
     val systemStreamMetadata = bootstrapStreamMetadata.getOrElse(systemStreamPartition.getSystemStream, null)
     // Metadata for system/stream, and system/stream/partition are allowed to 
@@ -224,15 +225,11 @@ class BootstrappingChooser(
       // null. A null partition metadata implies that the stream is not a 
       // bootstrap stream, and therefore, there is no need to check its offset.
       null
-    } else if (Newest.equals(newestOrUpcoming)) {
-      systemStreamPartitionMetadata.getNewestOffset
-    } else if (Upcoming.equals(newestOrUpcoming)) {
-      systemStreamPartitionMetadata.getUpcomingOffset
     } else {
-      throw new SamzaException("Got unknown offset type %s" format newestOrUpcoming)
+      systemStreamPartitionMetadata.getOffset(offsetType)
     }
 
-    trace("Check %s offset %s against %s for %s." format (newestOrUpcoming.getClass.getSimpleName, offset, offsetToCheck, systemStreamPartition))
+    trace("Check %s offset %s against %s for %s." format (offsetType, offset, offsetToCheck, systemStreamPartition))
 
     // The SSP is no longer lagging if the envelope's offset equals the 
     // latest offset. 
@@ -272,7 +269,3 @@ class BootstrappingChooserMetrics(val registry: MetricsRegistry = new MetricsReg
     newGauge("%s-%s-lagging-partitions" format (systemStream.getSystem, systemStream.getStream), getValue)
   }
 }
-
-private sealed abstract class OffsetType
-private object Upcoming extends OffsetType
-private object Newest extends OffsetType

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
new file mode 100644
index 0000000..5844695
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint
+
+import scala.collection.JavaConversions._
+import org.apache.samza.Partition
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.system.SystemStreamMetadata.OffsetType
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.SystemStreamPartition
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
+import org.junit.Test
+import org.apache.samza.SamzaException
+import org.apache.samza.util.TestUtil._
+import org.apache.samza.config.MapConfig
+import org.apache.samza.system.SystemAdmin
+
+class TestOffsetManager {
+  @Test
+  def testSystemShouldUseDefaults {
+    val systemStream = new SystemStream("test-system", "test-stream")
+    val partition = new Partition(0)
+    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
+    val config = new MapConfig(Map("systems.test-system.samza.offset.default" -> "oldest"))
+    val offsetManager = OffsetManager(systemStreamMetadata, config)
+    offsetManager.register(systemStreamPartition)
+    offsetManager.start
+    assertTrue(!offsetManager.getLastProcessedOffset(systemStreamPartition).isDefined)
+    assertTrue(offsetManager.getStartingOffset(systemStreamPartition).isDefined)
+    assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get)
+  }
+
+  @Test
+  def testShouldLoadFromAndSaveWithCheckpointManager {
+    val systemStream = new SystemStream("test-system", "test-stream")
+    val partition = new Partition(0)
+    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
+    val config = new MapConfig
+    val checkpointManager = getCheckpointManager(systemStreamPartition)
+    val systemAdmins = Map("test-system" -> getSystemAdmin)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
+    offsetManager.register(systemStreamPartition)
+    offsetManager.start
+    assertTrue(checkpointManager.isStarted)
+    assertEquals(1, checkpointManager.registered.size)
+    assertEquals(partition, checkpointManager.registered.head)
+    assertEquals(checkpointManager.checkpoints.head._2, checkpointManager.readLastCheckpoint(partition))
+    // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
+    assertEquals("46", offsetManager.getStartingOffset(systemStreamPartition).get)
+    assertEquals("45", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
+    offsetManager.update(systemStreamPartition, "46")
+    assertEquals("46", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
+    offsetManager.update(systemStreamPartition, "47")
+    assertEquals("47", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
+    // Should never update starting offset.
+    assertEquals("46", offsetManager.getStartingOffset(systemStreamPartition).get)
+    offsetManager.checkpoint(partition)
+    val expectedCheckpoint = new Checkpoint(Map(systemStream -> "47"))
+    assertEquals(expectedCheckpoint, checkpointManager.readLastCheckpoint(partition))
+  }
+
+  @Test
+  def testShouldResetStreams {
+    val systemStream = new SystemStream("test-system", "test-stream")
+    val partition = new Partition(0)
+    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
+    val defaultOffsets = Map(systemStream -> OffsetType.OLDEST)
+    val checkpoint = new Checkpoint(Map(systemStream -> "45"))
+    val checkpointManager = getCheckpointManager(systemStreamPartition)
+    val config = new MapConfig(Map(
+      "systems.test-system.samza.offset.default" -> "oldest",
+      "systems.test-system.streams.test-stream.samza.reset.offset" -> "true"))
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager)
+    offsetManager.register(systemStreamPartition)
+    offsetManager.start
+    assertTrue(checkpointManager.isStarted)
+    assertEquals(1, checkpointManager.registered.size)
+    assertEquals(partition, checkpointManager.registered.head)
+    assertEquals(checkpoint, checkpointManager.readLastCheckpoint(partition))
+    // Should be zero even though the checkpoint has an offset of 45, since we're forcing a reset.
+    assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get)
+  }
+
+  @Test
+  def testShouldFailWhenMissingMetadata {
+    val systemStream = new SystemStream("test-system", "test-stream")
+    val partition = new Partition(0)
+    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+    val offsetManager = new OffsetManager
+    offsetManager.register(systemStreamPartition)
+
+    expect(classOf[SamzaException], Some("Attempting to load defaults for stream SystemStream [system=test-system, stream=test-stream], which has no offset settings.")) {
+      offsetManager.start
+    }
+  }
+
+  @Test
+  def testShouldFailWhenMissingDefault {
+    val systemStream = new SystemStream("test-system", "test-stream")
+    val partition = new Partition(0)
+    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
+    val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig(Map[String, String]()))
+    offsetManager.register(systemStreamPartition)
+
+    expect(classOf[SamzaException], Some("No default offeset defined for SystemStream [system=test-system, stream=test-stream]. Unable to load a default.")) {
+      offsetManager.start
+    }
+  }
+
+  @Test
+  def testDefaultSystemShouldFailWhenFailIsSpecified {
+    val systemStream = new SystemStream("test-system", "test-stream")
+    val partition = new Partition(0)
+    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
+    val config = new MapConfig(Map("systems.test-system.samza.offset.default" -> "fail"))
+    expect(classOf[IllegalArgumentException]) {
+      OffsetManager(systemStreamMetadata, config)
+    }
+  }
+
+  @Test
+  def testDefaultStreamShouldFailWhenFailIsSpecified {
+    val systemStream = new SystemStream("test-system", "test-stream")
+    val partition = new Partition(0)
+    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
+    val config = new MapConfig(Map("systems.test-system.streams.test-stream.samza.offset.default" -> "fail"))
+    expect(classOf[IllegalArgumentException]) {
+      OffsetManager(systemStreamMetadata, config)
+    }
+  }
+
+  private def getCheckpointManager(systemStreamPartition: SystemStreamPartition) = {
+    val checkpoint = new Checkpoint(Map(systemStreamPartition.getSystemStream -> "45"))
+
+    new CheckpointManager {
+      var isStarted = false
+      var isStopped = false
+      var registered = Set[Partition]()
+      var checkpoints = Map(systemStreamPartition.getPartition -> checkpoint)
+      def start { isStarted = true }
+      def register(partition: Partition) { registered += partition }
+      def writeCheckpoint(partition: Partition, checkpoint: Checkpoint) { checkpoints += partition -> checkpoint }
+      def readLastCheckpoint(partition: Partition) = checkpoints(partition)
+      def stop { isStopped = true }
+    }
+  }
+
+  private def getSystemAdmin = {
+    new SystemAdmin {
+      def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) =
+        offsets.mapValues(offset => (offset.toLong + 1).toString)
+
+      def getSystemStreamMetadata(streamNames: java.util.Set[String]) =
+        Map[String, SystemStreamMetadata]()
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 5e9dd07..27b4ca5 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -36,6 +36,10 @@ import org.apache.samza.system.SystemConsumer
 import org.apache.samza.system.SystemStream
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.task.ReadableCoordinator
+import org.apache.samza.checkpoint.OffsetManager
+import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import scala.collection.JavaConversions._
 
 class TestTaskInstance {
   @Test
@@ -53,20 +57,24 @@ class TestTaskInstance {
     val producerMultiplexer = new SystemProducers(
       Map[String, SystemProducer](),
       new SerdeManager)
+    val systemStream = new SystemStream("test-system", "test-stream")
+    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+    // Pretend our last checkpointed (next) offset was 2.
+    val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       partition,
       config,
       new TaskInstanceMetrics,
-      consumerMultiplexer: SystemConsumers,
-      producerMultiplexer: SystemProducers)
-    val systemStream = new SystemStream("test-system", "test-stream")
-    // Pretend our last checkpointed offset was 1.
-    taskInstance.offsets += systemStream -> "1"
-    // Pretend we got a message with offset 2.
-    taskInstance.process(new IncomingMessageEnvelope(new SystemStreamPartition("test-system", "test-stream", new Partition(0)), "2", null, null), new ReadableCoordinator)
-    // Check to see if the offset map has been properly updated with offset 2.
-    assertEquals(1, taskInstance.offsets.size)
-    assertEquals("2", taskInstance.offsets(systemStream))
+      consumerMultiplexer,
+      producerMultiplexer,
+      offsetManager)
+    // Pretend we got a message with offset 2 and next offset 3.
+    taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), new ReadableCoordinator)
+    // Check to see if the offset manager has been properly updated with offset 3.
+    val lastProcessedOffset = offsetManager.getLastProcessedOffset(systemStreamPartition)
+    assertTrue(lastProcessedOffset.isDefined)
+    assertEquals("2", lastProcessedOffset.get)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 5325549..dafc980 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -37,6 +37,7 @@ import grizzled.slf4j.Logging
 import java.util.UUID
 import scala.collection.JavaConversions._
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import kafka.consumer.ConsumerConfig
 
 object KafkaSystemAdmin extends Logging {
   /**
@@ -99,7 +100,7 @@ class KafkaSystemAdmin(
    * from Kafka. Equivalent to Kafka's socket.receive.buffer.bytes
    * configuration.
    */
-  bufferSize: Int = 1024000,
+  bufferSize: Int = ConsumerConfig.SocketBufferSize,
 
   /**
    * The client ID to use for the simple consumer when fetching metadata from
@@ -109,6 +110,17 @@ class KafkaSystemAdmin(
 
   import KafkaSystemAdmin._
 
+  /**
+   * Returns the offset for the message after the specified offset for each
+   * SystemStreamPartition that was passed in.
+   */
+  def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = {
+    // This is safe to do with Kafka, even if a topic is key-deduped. If the 
+    // offset doesn't exist on a compacted topic, Kafka will return the first 
+    // message AFTER the offset that was specified in the fetch request.
+    offsets.mapValues(offset => (offset.toLong + 1).toString)
+  }
+
   def getSystemStreamMetadata(streams: java.util.Set[String]) =
     getSystemStreamMetadata(streams, new ExponentialSleepStrategy(initialDelayMs = 500))
 
@@ -127,7 +139,7 @@ class KafkaSystemAdmin(
     var done = false
     var consumer: SimpleConsumer = null
 
-    debug("Fetching offsets for: %s" format streams)
+    debug("Fetching system stream metadata for: %s" format streams)
 
     while (!done) {
       try {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index eaa9e53..cd9d926 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -274,12 +274,23 @@ class TestKafkaSystemAdmin {
     val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic"))
     val metadata = initialOffsets.getOrElse("non-existent-topic", fail("missing metadata"))
     assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map(
-      new Partition(0) -> new SystemStreamPartitionMetadata(null, null, "0")
-    )))
+      new Partition(0) -> new SystemStreamPartitionMetadata(null, null, "0"))))
+  }
+
+  @Test
+  def testOffsetsAfter {
+    val systemAdmin = new KafkaSystemAdmin("test", brokers)
+    val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+    val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(1))
+    val offsetsAfter = systemAdmin.getOffsetsAfter(Map(
+      ssp1 -> "1",
+      ssp2 -> "2"))
+    assertEquals("2", offsetsAfter(ssp1))
+    assertEquals("3", offsetsAfter(ssp2))
   }
 
   class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers) {
-    import kafka.api.{TopicMetadata, TopicMetadataResponse}
+    import kafka.api.{ TopicMetadata, TopicMetadataResponse }
 
     // Simulate Kafka telling us that the leader for the topic is not available
     override def getTopicMetadata(topics: Set[String]) = {
@@ -292,7 +303,7 @@ class TestKafkaSystemAdmin {
 
   class MockSleepStrategy(maxCalls: Int) extends ExponentialSleepStrategy {
     var countCalls = 0
-  
+
     override def sleep() = {
       if (countCalls >= maxCalls) throw new CallLimitReached
       countCalls += 1

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
index 2abe1c8..fa1d51b 100644
--- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
+++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import org.apache.samza.Partition;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
 
 /**
@@ -51,4 +52,15 @@ public class MockSystemAdmin implements SystemAdmin {
 
     return metadata;
   }
+
+  @Override
+  public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+    Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>();
+
+    for (SystemStreamPartition systemStreamPartition : offsets.keySet()) {
+      offsetsAfter.put(systemStreamPartition, null);
+    }
+
+    return offsetsAfter;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3a6f2555/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 7e81387..8177cbf 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -211,24 +211,16 @@ class TestStatefulTask {
     "stores.mystore.factory" -> "org.apache.samza.storage.kv.KeyValueStorageEngineFactory",
     "stores.mystore.key.serde" -> "string",
     "stores.mystore.msg.serde" -> "string",
-    "stores.mystore.changelog" -> "kafka-state.mystore",
-
-    // TODO we don't need two different systems once SAMZA-157 is committed. 
-    // We will be able to do per-stream offset defaults.
+    "stores.mystore.changelog" -> "kafka.mystore",
 
     // Use smallest reset for input streams, so we can fix SAMZA-166.
     "systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory",
-    "systems.kafka.consumer.zookeeper.connect" -> zkConnect,
-    "systems.kafka.consumer.auto.offset.reset" -> "smallest",
-    "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format port1),
+    "systems.kafka.samza.offset.default" -> "oldest",
     "systems.kafka.samza.msg.serde" -> "string",
-
-    // Use largest offset for changelog stream, so we can test SAMZA-142.
-    "systems.kafka-state.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory",
-    "systems.kafka-state.consumer.zookeeper.connect" -> zkConnect,
-    "systems.kafka-state.consumer.auto.offset.reset" -> "smallest",
-    "systems.kafka-state.producer.metadata.broker.list" -> ("localhost:%s" format port1),
-    "systems.kafka-state.samza.msg.serde" -> "string")
+    "systems.kafka.consumer.zookeeper.connect" -> zkConnect,
+    // Use largest offset for reset, so we can test SAMZA-142.
+    "systems.kafka.consumer.auto.offset.reset" -> "largest",
+    "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format port1))
 
   @Test
   def testShouldStartAndRestore {