You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/03/14 01:11:32 UTC

git commit: Seeing issues when running with SAMZA-157, so reverting.

Repository: incubator-samza
Updated Branches:
  refs/heads/master 3a6f2555a -> 2811c2a75


Seeing issues when running with SAMZA-157, so reverting.

This reverts commit 3a6f2555aa92d47dd85b54f3f1503719c08d31a7.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/2811c2a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/2811c2a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/2811c2a7

Branch: refs/heads/master
Commit: 2811c2a758dd2d387ed4ca25c757516dfcece70b
Parents: 3a6f255
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Thu Mar 13 17:11:07 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Thu Mar 13 17:11:07 2014 -0700

----------------------------------------------------------------------
 .../org/apache/samza/system/SystemAdmin.java    |  14 +-
 .../samza/system/SystemStreamMetadata.java      |  50 ---
 ...inglePartitionWithoutOffsetsSystemAdmin.java |  12 -
 .../apache/samza/checkpoint/OffsetManager.scala | 351 -------------------
 .../org/apache/samza/config/StreamConfig.scala  |   4 -
 .../org/apache/samza/config/SystemConfig.scala  |   3 -
 .../apache/samza/container/SamzaContainer.scala |  47 ++-
 .../apache/samza/container/TaskInstance.scala   |  72 ++--
 .../system/chooser/BootstrappingChooser.scala   |  19 +-
 .../samza/checkpoint/TestOffsetManager.scala    | 188 ----------
 .../samza/container/TestTaskInstance.scala      |  28 +-
 .../samza/system/kafka/KafkaSystemAdmin.scala   |  16 +-
 .../system/kafka/TestKafkaSystemAdmin.scala     |  19 +-
 .../samza/system/mock/MockSystemAdmin.java      |  12 -
 .../test/integration/TestStatefulTask.scala     |  20 +-
 15 files changed, 118 insertions(+), 737 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 3976253..437bfb2 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -29,21 +29,9 @@ import java.util.Set;
  * system.
  */
 public interface SystemAdmin {
-
-  /**
-   * Fetches the offsets for the messages immediately after the supplied offsets
-   * for a group of SystemStreamPartitions.
-   * 
-   * @param offsets
-   *          Map from SystemStreamPartition to current offsets.
-   * @return Map from SystemStreamPartition to offsets immediately after the
-   *         current offsets.
-   */
-  Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets);
-
   /**
    * Fetch metadata from a system for a set of streams.
-   * 
+   *
    * @param streamNames
    *          The streams to to fetch metadata for.
    * @return A map from stream name to SystemStreamMetadata for each stream

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java
index 817c557..32e142a 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamMetadata.java
@@ -21,9 +21,7 @@ package org.apache.samza.system;
 
 import java.util.Collections;
 import java.util.Map;
-
 import org.apache.samza.Partition;
-import org.apache.samza.SamzaException;
 
 /**
  * SystemAdmins use this class to return useful metadata about a stream's offset
@@ -141,23 +139,6 @@ public class SystemStreamMetadata {
       return upcomingOffset;
     }
 
-    /**
-     * @param offsetType
-     *          The type of offset to get. Either oldest, newest, or upcoming.
-     * @return The corresponding offset for the offset type requested.
-     */
-    public String getOffset(OffsetType offsetType) {
-      if (offsetType.equals(OffsetType.OLDEST)) {
-        return getOldestOffset();
-      } else if (offsetType.equals(OffsetType.NEWEST)) {
-        return getNewestOffset();
-      } else if (offsetType.equals(OffsetType.UPCOMING)) {
-        return getUpcomingOffset();
-      } else {
-        throw new SamzaException("Invalid offset type defined " + offsetType + ".");
-      }
-    }
-
     @Override
     public int hashCode() {
       final int prime = 31;
@@ -200,35 +181,4 @@ public class SystemStreamMetadata {
       return "SystemStreamPartitionMetadata [oldestOffset=" + oldestOffset + ", newestOffset=" + newestOffset + ", upcomingOffset=" + upcomingOffset + "]";
     }
   }
-
-  /**
-   * OffsetType is an enum used to define which offset should be used when
-   * reading from a SystemStreamPartition for the first time.
-   */
-  public enum OffsetType {
-
-    /**
-     * Signals the offset of the oldest message in a SystemStreamPartition.
-     */
-    OLDEST("oldest"),
-
-    /**
-     * Signals the offset of the newest message in a SystemStreamPartition.
-     */
-    NEWEST("newest"),
-
-    /**
-     * Signals the offset of the next message to be written into a
-     * SystemStreamPartition. If the offset of the most recent message written
-     * to a SystemStreamPartition is 7, then upcoming would signal offset 8
-     * (assuming the offsets were incremental).
-     */
-    UPCOMING("upcoming");
-
-    private final String offsetType;
-
-    private OffsetType(String offsetType) {
-      this.offsetType = offsetType;
-    }
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
index 38e313f..44fd82a 100644
--- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
@@ -26,7 +26,6 @@ import org.apache.samza.Partition;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
-import org.apache.samza.system.SystemStreamPartition;
 
 /**
  * A simple helper admin class that defines a single partition (partition 0) for
@@ -52,15 +51,4 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
 
     return metadata;
   }
-
-  @Override
-  public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
-    Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>();
-
-    for (SystemStreamPartition systemStreamPartition : offsets.keySet()) {
-      offsetsAfter.put(systemStreamPartition, null);
-    }
-
-    return offsetsAfter;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
deleted file mode 100644
index 80341c8..0000000
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint
-
-import org.apache.samza.system.SystemStream
-import org.apache.samza.Partition
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.SystemStreamMetadata
-import org.apache.samza.system.SystemStreamMetadata.OffsetType
-import org.apache.samza.SamzaException
-import scala.collection.JavaConversions._
-import grizzled.slf4j.Logging
-import org.apache.samza.config.Config
-import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.system.SystemAdmin
-
-/**
- * OffsetSetting encapsulates a SystemStream's metadata, default offset, and
- * reset offset settings. It's just a convenience class to make OffsetManager
- * easier to work with.
- */
-case class OffsetSetting(
-  /**
-   * The metadata for the SystemStream.
-   */
-  metadata: SystemStreamMetadata,
-
-  /**
-   * The default offset (oldest, newest, or upcoming) for the SystemStream.
-   * This setting is used when no checkpoint is available for a SystemStream
-   * if the job is starting for the first time, or the SystemStream has been
-   * reset (see resetOffsets, below).
-   */
-  defaultOffset: OffsetType,
-
-  /**
-   * Whether the SystemStream's offset should be reset or not. Determines
-   * whether an offset should be ignored at initialization time, even if a
-   * checkpoint is available. This is useful for jobs that wish to restart
-   * reading from a stream at a different position than where they last
-   * checkpointed. If this is true, then defaultOffset will be used to find
-   * the new starting position in the stream.
-   */
-  resetOffset: Boolean)
-
-/**
- * OffsetManager object is a helper that does wiring to build an OffsetManager
- * from a config object.
- */
-object OffsetManager extends Logging {
-  def apply(
-    systemStreamMetadata: Map[SystemStream, SystemStreamMetadata],
-    config: Config,
-    checkpointManager: CheckpointManager = null,
-    systemAdmins: Map[String, SystemAdmin] = Map()) = {
-
-    debug("Building offset manager for %s." format systemStreamMetadata)
-
-    val offsetSettings = systemStreamMetadata
-      .map {
-        case (systemStream, systemStreamMetadata) =>
-          // Get default offset.
-          val streamDefaultOffset = config.getDefaultStreamOffset(systemStream)
-          val systemDefaultOffset = config.getDefaultSystemOffset(systemStream.getSystem)
-          val defaultOffsetType = if (streamDefaultOffset.isDefined) {
-            OffsetType.valueOf(streamDefaultOffset.get.toUpperCase)
-          } else if (systemDefaultOffset.isDefined) {
-            OffsetType.valueOf(systemDefaultOffset.get.toUpperCase)
-          } else {
-            info("No default offset for %s defined. Using newest." format systemStream)
-            OffsetType.UPCOMING
-          }
-          debug("Using default offset %s for %s." format (defaultOffsetType, systemStream))
-
-          // Get reset offset.
-          val resetOffset = config.getResetOffset(systemStream)
-          debug("Using reset offset %s for %s." format (resetOffset, systemStream))
-
-          // Build OffsetSetting so we can create a map for OffsetManager.
-          (systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset))
-      }.toMap
-    new OffsetManager(offsetSettings, checkpointManager, systemAdmins)
-  }
-}
-
-/**
- * OffsetManager does several things:
- *
- * <ul>
- * <li>Loads last checkpointed offset for all input SystemStreamPartitions in a
- * SamzaContainer.</li>
- * <li>Uses last checkpointed offset to figure out the next offset to start
- * reading from for each input SystemStreamPartition in a SamzaContainer</li>
- * <li>Keep track of the last processed offset for each SystemStreamPartitions
- * in a SamzaContainer.</li>
- * <li>Checkpoints the last processed offset for each SystemStreamPartitions
- * in a SamzaContainer periodically to the CheckpointManager.</li>
- * </ul>
- *
- * All partitions must be registered before start is called, and start must be
- * called before get/update/checkpoint/stop are called.
- */
-class OffsetManager(
-
-  /**
-   * Offset settings for all streams that the OffsetManager is managing.
-   */
-  val offsetSettings: Map[SystemStream, OffsetSetting] = Map(),
-
-  /**
-   * Optional checkpoint manager for checkpointing offsets whenever
-   * checkpoint is called.
-   */
-  val checkpointManager: CheckpointManager = null,
-
-  /**
-   * SystemAdmins that are used to get next offsets from last checkpointed
-   * offsets. Map is from system name to SystemAdmin class for the system.
-   */
-  val systemAdmins: Map[String, SystemAdmin] = Map()) extends Logging {
-
-  /**
-   * Last offsets processed for each SystemStreamPartition.
-   */
-  var lastProcessedOffsets = Map[SystemStreamPartition, String]()
-
-  /**
-   * Offsets to start reading from for each SystemStreamPartition. This
-   * variable is populated after all checkpoints have been restored.
-   */
-  var startingOffsets = Map[SystemStreamPartition, String]()
-
-  /**
-   * The set of system stream partitions that have been registered with the
-   * OffsetManager. These are the SSPs that will be tracked within the offset
-   * manager.
-   */
-  var systemStreamPartitions = Set[SystemStreamPartition]()
-
-  def register(systemStreamPartition: SystemStreamPartition) {
-    systemStreamPartitions += systemStreamPartition
-  }
-
-  def start {
-    registerCheckpointManager
-    loadOffsetsFromCheckpointManager
-    stripResetStreams
-    loadStartingOffsets
-    loadDefaults
-
-    info("Successfully loaded last processed offsets: %s" format lastProcessedOffsets)
-    info("Successfully loaded starting offsets: %s" format startingOffsets)
-  }
-
-  /**
-   * Set the last processed offset for a given SystemStreamPartition.
-   */
-  def update(systemStreamPartition: SystemStreamPartition, offset: String) {
-    lastProcessedOffsets += systemStreamPartition -> offset
-  }
-
-  /**
-   * Get the last processed offset for a SystemStreamPartition.
-   */
-  def getLastProcessedOffset(systemStreamPartition: SystemStreamPartition) = {
-    lastProcessedOffsets.get(systemStreamPartition)
-  }
-
-  /**
-   * Get the starting offset for a SystemStreamPartition. This is the offset 
-   * where a SamzaContainer begins reading from when it starts up.
-   */
-  def getStartingOffset(systemStreamPartition: SystemStreamPartition) = {
-    startingOffsets.get(systemStreamPartition)
-  }
-
-  /**
-   * Checkpoint all offsets for a given partition using the CheckpointManager.
-   */
-  def checkpoint(partition: Partition) {
-    if (checkpointManager != null) {
-      debug("Checkpointing offsets for partition %s." format partition)
-
-      val partitionOffsets = lastProcessedOffsets
-        .filterKeys(_.getPartition.equals(partition))
-        .map { case (systemStreamPartition, offset) => (systemStreamPartition.getSystemStream, offset) }
-        .toMap
-
-      checkpointManager.writeCheckpoint(partition, new Checkpoint(partitionOffsets))
-    } else {
-      debug("Skipping checkpointing for partition %s because no checkpoint manager is defined." format partition)
-    }
-  }
-
-  def stop {
-    if (checkpointManager != null) {
-      debug("Shutting down checkpoint manager.")
-
-      checkpointManager.stop
-    } else {
-      debug("Skipping checkpoint manager shutdown because no checkpoint manager is defined.")
-    }
-  }
-
-  /**
-   * Returns a set of partitions that have been registered with this offset
-   * manager.
-   */
-  private def getPartitions = {
-    systemStreamPartitions
-      .map(_.getPartition)
-      .toSet
-  }
-
-  /**
-   * Register all partitions with the CheckpointManager.
-   */
-  private def registerCheckpointManager {
-    if (checkpointManager != null) {
-      debug("Registering checkpoint manager.")
-
-      getPartitions.foreach(checkpointManager.register)
-    } else {
-      debug("Skipping checkpoint manager registration because no manager was defined.")
-    }
-  }
-
-  /**
-   * Loads last processed offsets from checkpoint manager for all registered
-   * partitions.
-   */
-  private def loadOffsetsFromCheckpointManager {
-    if (checkpointManager != null) {
-      debug("Loading offsets from checkpoint manager.")
-
-      checkpointManager.start
-
-      lastProcessedOffsets ++= getPartitions.flatMap(restoreOffsetsFromCheckpoint(_))
-    } else {
-      debug("Skipping offset load from checkpoint manager because no manager was defined.")
-    }
-  }
-
-  /**
-   * Loads last processed offsets for a single partition.
-   */
-  private def restoreOffsetsFromCheckpoint(partition: Partition): Map[SystemStreamPartition, String] = {
-    debug("Loading checkpoints for partition: %s." format partition)
-
-    checkpointManager
-      .readLastCheckpoint(partition)
-      .getOffsets
-      .map { case (systemStream, offset) => (new SystemStreamPartition(systemStream, partition), offset) }
-      .toMap
-  }
-
-  /**
-   * Removes offset settings for all SystemStreams that are to be forcibly
-   * reset using resetOffsets.
-   */
-  private def stripResetStreams {
-    val systemStreamPartitionsToReset = getSystemStreamPartitionsToReset(lastProcessedOffsets.keys)
-
-    systemStreamPartitionsToReset.foreach(systemStreamPartition => {
-      val offset = lastProcessedOffsets(systemStreamPartition)
-      info("Got offset %s for %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStreamPartition))
-    })
-
-    lastProcessedOffsets --= systemStreamPartitionsToReset
-  }
-
-  /**
-   * Returns a set of all SystemStreamPartitions in lastProcessedOffsets that need to be reset
-   */
-  private def getSystemStreamPartitionsToReset(systemStreamPartitions: Iterable[SystemStreamPartition]): Set[SystemStreamPartition] = {
-    systemStreamPartitions
-      .filter(systemStreamPartition => {
-        val systemStream = systemStreamPartition.getSystemStream
-        offsetSettings
-          .getOrElse(systemStream, throw new SamzaException("Attempting to reset a stream that doesn't have offset settings %s." format systemStream))
-          .resetOffset
-      }).toSet
-  }
-
-  /**
-   * Use last processed offsets to get next available offset for each
-   * SystemStreamPartition, and populate startingOffsets.
-   */
-  private def loadStartingOffsets {
-    startingOffsets ++= lastProcessedOffsets
-      // Group offset map according to systemName.
-      .groupBy(_._1.getSystem)
-      // Get next offsets for each system.
-      .flatMap {
-        case (systemName, systemStreamPartitionOffsets) =>
-          systemAdmins
-            .getOrElse(systemName, throw new SamzaException("Missing system admin for %s. Need system admin to load starting offsets." format systemName))
-            .getOffsetsAfter(systemStreamPartitionOffsets)
-      }
-  }
-
-  /**
-   * Use defaultOffsets to get a next offset for every SystemStreamPartition
-   * that was registered, but has no offset.
-   */
-  private def loadDefaults {
-    systemStreamPartitions.foreach(systemStreamPartition => {
-      if (!startingOffsets.contains(systemStreamPartition)) {
-        val systemStream = systemStreamPartition.getSystemStream
-        val partition = systemStreamPartition.getPartition
-        val offsetSetting = offsetSettings.getOrElse(systemStream, throw new SamzaException("Attempting to load defaults for stream %s, which has no offset settings." format systemStream))
-        val systemStreamMetadata = offsetSetting.metadata
-        val offsetType = offsetSetting.defaultOffset
-
-        debug("Got default offset type %s for %s" format (offsetType, systemStreamPartition))
-
-        val systemStreamPartitionMetadata = systemStreamMetadata
-          .getSystemStreamPartitionMetadata
-          .get(partition)
-
-        if (systemStreamPartitionMetadata != null) {
-          val nextOffset = systemStreamPartitionMetadata.getOffset(offsetType)
-
-          debug("Got next default offset %s for %s" format (nextOffset, systemStreamPartition))
-
-          startingOffsets += systemStreamPartition -> nextOffset
-        } else {
-          throw new SamzaException("No metadata available for partition %s." format systemStreamPartitionMetadata)
-        }
-      }
-    })
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index d71ead1..517e9ae 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -31,7 +31,6 @@ object StreamConfig {
   val MSG_SERDE = STREAM_PREFIX + "samza.msg.serde"
   val KEY_SERDE = STREAM_PREFIX + "samza.key.serde"
   val CONSUMER_RESET_OFFSET = STREAM_PREFIX + "samza.reset.offset"
-  val CONSUMER_OFFSET_DEFAULT = STREAM_PREFIX + "samza.offset.default"
 
   implicit def Config2Stream(config: Config) = new StreamConfig(config)
 }
@@ -65,9 +64,6 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
       case _ => false
     }
 
-  def getDefaultStreamOffset(systemStream: SystemStream) =
-    getOption(StreamConfig.CONSUMER_OFFSET_DEFAULT format (systemStream.getSystem, systemStream.getStream))
-
   /**
    * Returns a list of all SystemStreams that have a serde defined from the config file.
    */

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
index 5bb17c7..ce63a8a 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
@@ -29,7 +29,6 @@ object SystemConfig {
   val SYSTEM_FACTORY = "systems.%s.samza.factory"
   val KEY_SERDE = "systems.%s.samza.key.serde"
   val MSG_SERDE = "systems.%s.samza.msg.serde"
-  val CONSUMER_OFFSET_DEFAULT = SYSTEM_PREFIX + "samza.offset.default"
 
   implicit def Config2System(config: Config) = new SystemConfig(config)
 }
@@ -41,8 +40,6 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
 
   def getSystemMsgSerde(name: String) = getOption(SystemConfig.MSG_SERDE format name)
 
-  def getDefaultSystemOffset(systemName: String) = getOption(SystemConfig.CONSUMER_OFFSET_DEFAULT format (systemName))
-
   /**
    * Returns a list of all system names from the config file. Useful for
    * getting individual systems.

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index c101b59..9c23244 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -62,7 +62,6 @@ import org.apache.samza.system.chooser.RoundRobinChooserFactory
 import scala.collection.JavaConversions._
 import org.apache.samza.system.SystemAdmin
 import org.apache.samza.system.SystemStreamMetadata
-import org.apache.samza.checkpoint.OffsetManager
 
 object SamzaContainer extends Logging {
   def main(args: Array[String]) {
@@ -101,6 +100,12 @@ object SamzaContainer extends Logging {
 
     info("Got system names: %s" format systemNames)
 
+    val resetInputStreams = systemNames.flatMap(systemName => {
+      config.getResetOffsetMap(systemName)
+    }).toMap
+
+    info("Got input stream resets: %s" format resetInputStreams)
+
     val serdeStreams = systemNames.foldLeft(Set[SystemStream]())(_ ++ config.getSerdeStreams(_))
 
     debug("Got serde streams: %s" format serdeStreams)
@@ -274,10 +279,6 @@ object SamzaContainer extends Logging {
 
     info("Got checkpoint manager: %s" format checkpointManager)
 
-    val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins)
-
-    info("Got offset manager: %s" format offsetManager)
-
     val consumerMultiplexer = new SystemConsumers(
       // TODO add config values for no new message timeout and max msgs per stream partition
       chooser = chooser,
@@ -419,11 +420,12 @@ object SamzaContainer extends Logging {
         metrics = taskInstanceMetrics,
         consumerMultiplexer = consumerMultiplexer,
         producerMultiplexer = producerMultiplexer,
-        offsetManager = offsetManager,
         storageManager = storageManager,
+        checkpointManager = checkpointManager,
         reporters = reporters,
         listeners = listeners,
         inputStreams = inputStreamsForThisPartition,
+        resetInputStreams = resetInputStreams,
         windowMs = taskWindowMs,
         commitMs = taskCommitMs,
         collector = collector)
@@ -438,8 +440,8 @@ object SamzaContainer extends Logging {
       config = config,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
-      offsetManager = offsetManager,
       metrics = samzaContainerMetrics,
+      checkpointManager = checkpointManager,
       reporters = reporters,
       jvm = jvm)
   }
@@ -480,7 +482,7 @@ class SamzaContainer(
   consumerMultiplexer: SystemConsumers,
   producerMultiplexer: SystemProducers,
   metrics: SamzaContainerMetrics,
-  offsetManager: OffsetManager = new OffsetManager,
+  checkpointManager: CheckpointManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
   jvm: JvmMetrics = null) extends Runnable with Logging {
 
@@ -489,7 +491,7 @@ class SamzaContainer(
       info("Starting container.")
 
       startMetrics
-      startOffsetManager
+      startCheckpoints
       startStores
       startTask
       startProducers
@@ -522,7 +524,7 @@ class SamzaContainer(
       shutdownProducers
       shutdownTask
       shutdownStores
-      shutdownOffsetManager
+      shutdownCheckpoints
       shutdownMetrics
 
       info("Shutdown complete.")
@@ -548,14 +550,18 @@ class SamzaContainer(
     })
   }
 
-  def startOffsetManager {
-    info("Registering task instances with offsets.")
+  def startCheckpoints {
+    info("Registering task instances with checkpoints.")
 
-    taskInstances.values.foreach(_.registerOffsets)
+    taskInstances.values.foreach(_.registerCheckpoints)
 
-    info("Starting offset manager.")
+    if (checkpointManager != null) {
+      info("Registering checkpoint manager.")
 
-    offsetManager.start
+      checkpointManager.start
+    } else {
+      warn("No checkpoint manager defined. No consumer offsets will be maintained for this job.")
+    }
   }
 
   def startStores {
@@ -660,10 +666,13 @@ class SamzaContainer(
     taskInstances.values.foreach(_.shutdownStores)
   }
 
-  def shutdownOffsetManager {
-    info("Shutting down offset manager.")
-
-    offsetManager.stop
+  def shutdownCheckpoints {
+    if (checkpointManager != null) {
+      info("Shutting down checkpoint manager.")
+      checkpointManager.stop
+    } else {
+      info("No checkpoint manager defined, so skipping checkpoint manager stop.")
+    }
   }
 
   def shutdownMetrics {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index c4b135c..5127595 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -1,4 +1,5 @@
 /*
+
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -42,8 +43,6 @@ import org.apache.samza.system.SystemConsumers
 import org.apache.samza.system.SystemProducers
 import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.metrics.Gauge
-import org.apache.samza.checkpoint.OffsetManager
-import org.apache.samza.SamzaException
 
 class TaskInstance(
   task: StreamTask,
@@ -52,16 +51,18 @@ class TaskInstance(
   metrics: TaskInstanceMetrics,
   consumerMultiplexer: SystemConsumers,
   producerMultiplexer: SystemProducers,
-  offsetManager: OffsetManager = new OffsetManager,
   storageManager: TaskStorageManager = null,
+  checkpointManager: CheckpointManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
   listeners: Seq[TaskLifecycleListener] = Seq(),
   inputStreams: Set[SystemStream] = Set(),
+  resetInputStreams: Map[SystemStream, Boolean] = Map(),
   windowMs: Long = -1,
   commitMs: Long = 60000,
   clock: () => Long = { System.currentTimeMillis },
   collector: ReadableCollector = new ReadableCollector) extends Logging {
 
+  var offsets = Map[SystemStream, String]()
   var lastWindowMs = 0L
   var lastCommitMs = 0L
   val isInitableTask = task.isInstanceOf[InitableTask]
@@ -85,12 +86,14 @@ class TaskInstance(
     reporters.values.foreach(_.register(metrics.source, metrics.registry))
   }
 
-  def registerOffsets {
-    debug("Registering offsets for partition: %s." format partition)
+  def registerCheckpoints {
+    if (checkpointManager != null) {
+      debug("Registering checkpoint manager for partition: %s." format partition)
 
-    inputStreams.foreach(systemStream => {
-      offsetManager.register(new SystemStreamPartition(systemStream, partition))
-    })
+      checkpointManager.register(partition)
+    } else {
+      debug("Skipping checkpoint manager registration for partition: %s." format partition)
+    }
   }
 
   def startStores {
@@ -124,19 +127,34 @@ class TaskInstance(
   }
 
   def registerConsumers {
+    if (checkpointManager != null) {
+      debug("Loading checkpoints for partition: %s." format partition)
+
+      val checkpoint = checkpointManager.readLastCheckpoint(partition)
+
+      if (checkpoint != null) {
+        for ((systemStream, offset) <- checkpoint.getOffsets) {
+          if (!resetInputStreams.getOrElse(systemStream, false)) {
+            offsets += systemStream -> offset
+
+            metrics.addOffsetGauge(systemStream, () => offsets(systemStream))
+          } else {
+            info("Got offset %s for %s and partition %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStream, partition))
+          }
+        }
+
+        info("Successfully loaded offsets for partition: %s, %s" format (partition, offsets))
+      } else {
+        warn("No checkpoint found for partition: %s. This is allowed if this is your first time running the job, but if it's not, you've probably lost data." format partition)
+      }
+    }
+
     debug("Registering consumers for partition: %s." format partition)
 
-    inputStreams.foreach(systemStream => {
-      val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-      val offset = offsetManager.getStartingOffset(systemStreamPartition)
-        .getOrElse(throw new SamzaException("No offset defined for partition %s: %s" format (partition, systemStream)))
-      consumerMultiplexer.register(systemStreamPartition, offset)
-      metrics.addOffsetGauge(systemStream, () => {
-        offsetManager
-          .getLastProcessedOffset(systemStreamPartition)
-          .getOrElse(null)
-      })
-    })
+    inputStreams.foreach(stream =>
+      consumerMultiplexer.register(
+        new SystemStreamPartition(stream, partition),
+        offsets.get(stream).getOrElse(null)))
   }
 
   def process(envelope: IncomingMessageEnvelope, coordinator: ReadableCoordinator) {
@@ -152,7 +170,7 @@ class TaskInstance(
 
     trace("Updating offset map for partition: %s, %s, %s" format (partition, envelope.getSystemStreamPartition, envelope.getOffset))
 
-    offsetManager.update(envelope.getSystemStreamPartition, envelope.getOffset)
+    offsets += envelope.getSystemStreamPartition.getSystemStream -> envelope.getOffset
   }
 
   def window(coordinator: ReadableCoordinator) {
@@ -203,9 +221,11 @@ class TaskInstance(
 
       producerMultiplexer.flush(metrics.source)
 
-      trace("Committing offset manager for partition: %s" format partition)
+      if (checkpointManager != null) {
+        trace("Committing checkpoint manager for partition: %s" format partition)
 
-      offsetManager.checkpoint(partition)
+        checkpointManager.writeCheckpoint(partition, new Checkpoint(offsets))
+      }
 
       lastCommitMs = clock()
     } else {
@@ -238,9 +258,9 @@ class TaskInstance(
       debug("Skipping storage manager shutdown for partition: %s" format partition)
     }
   }
-
+  
   override def toString() = "TaskInstance for class %s and partition %s." format (task.getClass.getName, partition)
-
-  def toDetailedString() = "TaskInstance [windowable=%s, window_time=%s, commit_time=%s, closable=%s, collector_size=%s]" format (isWindowableTask, lastWindowMs, lastCommitMs, isClosableTask, collector.envelopes.size)
-
+ 
+  def toDetailedString() = "TaskInstance [windowable=%s, window_time=%s, commit_time=%s, closable=%s, collector_size=%s]" format (isWindowableTask, lastWindowMs, lastCommitMs, isClosableTask, collector.envelopes.size) 
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
index 91c1813..f7d3c8b 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -30,7 +30,6 @@ import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemStreamMetadata
 import scala.collection.JavaConversions._
 import org.apache.samza.SamzaException
-import org.apache.samza.system.SystemStreamMetadata.OffsetType
 
 /**
  * BootstrappingChooser is a composable MessageChooser that only chooses
@@ -115,7 +114,7 @@ class BootstrappingChooser(
     // offset for this system stream partition, then we've already read all
     // messages in the stream, and we're at head for this system stream 
     // partition.
-    checkOffset(systemStreamPartition, offset, OffsetType.UPCOMING)
+    checkOffset(systemStreamPartition, offset, Upcoming)
 
     wrapped.register(systemStreamPartition, offset)
   }
@@ -166,7 +165,7 @@ class BootstrappingChooser(
         // If the offset we just read is the same as the offset for the last 
         // message (newest) in this system stream partition, then we have read 
         // all messages, and can mark this SSP as bootstrapped.
-        checkOffset(systemStreamPartition, offset, OffsetType.NEWEST)
+        checkOffset(systemStreamPartition, offset, Newest)
       }
 
       envelope
@@ -208,7 +207,7 @@ class BootstrappingChooser(
    *                         Upcoming is useful during the registration phase,
    *                         and newest is useful during the choosing phase.
    */
-  private def checkOffset(systemStreamPartition: SystemStreamPartition, offset: String, offsetType: OffsetType) {
+  private def checkOffset(systemStreamPartition: SystemStreamPartition, offset: String, newestOrUpcoming: OffsetType) {
     val systemStream = systemStreamPartition.getSystemStream
     val systemStreamMetadata = bootstrapStreamMetadata.getOrElse(systemStreamPartition.getSystemStream, null)
     // Metadata for system/stream, and system/stream/partition are allowed to 
@@ -225,11 +224,15 @@ class BootstrappingChooser(
       // null. A null partition metadata implies that the stream is not a 
       // bootstrap stream, and therefore, there is no need to check its offset.
       null
+    } else if (Newest.equals(newestOrUpcoming)) {
+      systemStreamPartitionMetadata.getNewestOffset
+    } else if (Upcoming.equals(newestOrUpcoming)) {
+      systemStreamPartitionMetadata.getUpcomingOffset
     } else {
-      systemStreamPartitionMetadata.getOffset(offsetType)
+      throw new SamzaException("Got unknown offset type %s" format newestOrUpcoming)
     }
 
-    trace("Check %s offset %s against %s for %s." format (offsetType, offset, offsetToCheck, systemStreamPartition))
+    trace("Check %s offset %s against %s for %s." format (newestOrUpcoming.getClass.getSimpleName, offset, offsetToCheck, systemStreamPartition))
 
     // The SSP is no longer lagging if the envelope's offset equals the 
     // latest offset. 
@@ -269,3 +272,7 @@ class BootstrappingChooserMetrics(val registry: MetricsRegistry = new MetricsReg
     newGauge("%s-%s-lagging-partitions" format (systemStream.getSystem, systemStream.getStream), getValue)
   }
 }
+
+private sealed abstract class OffsetType
+private object Upcoming extends OffsetType
+private object Newest extends OffsetType

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
deleted file mode 100644
index 5844695..0000000
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint
-
-import scala.collection.JavaConversions._
-import org.apache.samza.Partition
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamMetadata
-import org.apache.samza.system.SystemStreamMetadata.OffsetType
-import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.SystemStreamPartition
-import org.junit.Assert.assertEquals
-import org.junit.Assert.assertTrue
-import org.junit.Test
-import org.apache.samza.SamzaException
-import org.apache.samza.util.TestUtil._
-import org.apache.samza.config.MapConfig
-import org.apache.samza.system.SystemAdmin
-
-class TestOffsetManager {
-  @Test
-  def testSystemShouldUseDefaults {
-    val systemStream = new SystemStream("test-system", "test-stream")
-    val partition = new Partition(0)
-    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
-    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
-    val config = new MapConfig(Map("systems.test-system.samza.offset.default" -> "oldest"))
-    val offsetManager = OffsetManager(systemStreamMetadata, config)
-    offsetManager.register(systemStreamPartition)
-    offsetManager.start
-    assertTrue(!offsetManager.getLastProcessedOffset(systemStreamPartition).isDefined)
-    assertTrue(offsetManager.getStartingOffset(systemStreamPartition).isDefined)
-    assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get)
-  }
-
-  @Test
-  def testShouldLoadFromAndSaveWithCheckpointManager {
-    val systemStream = new SystemStream("test-system", "test-stream")
-    val partition = new Partition(0)
-    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
-    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
-    val config = new MapConfig
-    val checkpointManager = getCheckpointManager(systemStreamPartition)
-    val systemAdmins = Map("test-system" -> getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
-    offsetManager.register(systemStreamPartition)
-    offsetManager.start
-    assertTrue(checkpointManager.isStarted)
-    assertEquals(1, checkpointManager.registered.size)
-    assertEquals(partition, checkpointManager.registered.head)
-    assertEquals(checkpointManager.checkpoints.head._2, checkpointManager.readLastCheckpoint(partition))
-    // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
-    assertEquals("46", offsetManager.getStartingOffset(systemStreamPartition).get)
-    assertEquals("45", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
-    offsetManager.update(systemStreamPartition, "46")
-    assertEquals("46", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
-    offsetManager.update(systemStreamPartition, "47")
-    assertEquals("47", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
-    // Should never update starting offset.
-    assertEquals("46", offsetManager.getStartingOffset(systemStreamPartition).get)
-    offsetManager.checkpoint(partition)
-    val expectedCheckpoint = new Checkpoint(Map(systemStream -> "47"))
-    assertEquals(expectedCheckpoint, checkpointManager.readLastCheckpoint(partition))
-  }
-
-  @Test
-  def testShouldResetStreams {
-    val systemStream = new SystemStream("test-system", "test-stream")
-    val partition = new Partition(0)
-    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
-    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
-    val defaultOffsets = Map(systemStream -> OffsetType.OLDEST)
-    val checkpoint = new Checkpoint(Map(systemStream -> "45"))
-    val checkpointManager = getCheckpointManager(systemStreamPartition)
-    val config = new MapConfig(Map(
-      "systems.test-system.samza.offset.default" -> "oldest",
-      "systems.test-system.streams.test-stream.samza.reset.offset" -> "true"))
-    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager)
-    offsetManager.register(systemStreamPartition)
-    offsetManager.start
-    assertTrue(checkpointManager.isStarted)
-    assertEquals(1, checkpointManager.registered.size)
-    assertEquals(partition, checkpointManager.registered.head)
-    assertEquals(checkpoint, checkpointManager.readLastCheckpoint(partition))
-    // Should be zero even though the checkpoint has an offset of 45, since we're forcing a reset.
-    assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get)
-  }
-
-  @Test
-  def testShouldFailWhenMissingMetadata {
-    val systemStream = new SystemStream("test-system", "test-stream")
-    val partition = new Partition(0)
-    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-    val offsetManager = new OffsetManager
-    offsetManager.register(systemStreamPartition)
-
-    expect(classOf[SamzaException], Some("Attempting to load defaults for stream SystemStream [system=test-system, stream=test-stream], which has no offset settings.")) {
-      offsetManager.start
-    }
-  }
-
-  @Test
-  def testShouldFailWhenMissingDefault {
-    val systemStream = new SystemStream("test-system", "test-stream")
-    val partition = new Partition(0)
-    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
-    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
-    val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig(Map[String, String]()))
-    offsetManager.register(systemStreamPartition)
-
-    expect(classOf[SamzaException], Some("No default offeset defined for SystemStream [system=test-system, stream=test-stream]. Unable to load a default.")) {
-      offsetManager.start
-    }
-  }
-
-  @Test
-  def testDefaultSystemShouldFailWhenFailIsSpecified {
-    val systemStream = new SystemStream("test-system", "test-stream")
-    val partition = new Partition(0)
-    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
-    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
-    val config = new MapConfig(Map("systems.test-system.samza.offset.default" -> "fail"))
-    expect(classOf[IllegalArgumentException]) {
-      OffsetManager(systemStreamMetadata, config)
-    }
-  }
-
-  @Test
-  def testDefaultStreamShouldFailWhenFailIsSpecified {
-    val systemStream = new SystemStream("test-system", "test-stream")
-    val partition = new Partition(0)
-    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
-    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
-    val config = new MapConfig(Map("systems.test-system.streams.test-stream.samza.offset.default" -> "fail"))
-    expect(classOf[IllegalArgumentException]) {
-      OffsetManager(systemStreamMetadata, config)
-    }
-  }
-
-  private def getCheckpointManager(systemStreamPartition: SystemStreamPartition) = {
-    val checkpoint = new Checkpoint(Map(systemStreamPartition.getSystemStream -> "45"))
-
-    new CheckpointManager {
-      var isStarted = false
-      var isStopped = false
-      var registered = Set[Partition]()
-      var checkpoints = Map(systemStreamPartition.getPartition -> checkpoint)
-      def start { isStarted = true }
-      def register(partition: Partition) { registered += partition }
-      def writeCheckpoint(partition: Partition, checkpoint: Checkpoint) { checkpoints += partition -> checkpoint }
-      def readLastCheckpoint(partition: Partition) = checkpoints(partition)
-      def stop { isStopped = true }
-    }
-  }
-
-  private def getSystemAdmin = {
-    new SystemAdmin {
-      def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) =
-        offsets.mapValues(offset => (offset.toLong + 1).toString)
-
-      def getSystemStreamMetadata(streamNames: java.util.Set[String]) =
-        Map[String, SystemStreamMetadata]()
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 27b4ca5..5e9dd07 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -36,10 +36,6 @@ import org.apache.samza.system.SystemConsumer
 import org.apache.samza.system.SystemStream
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.checkpoint.OffsetManager
-import org.apache.samza.system.SystemStreamMetadata
-import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import scala.collection.JavaConversions._
 
 class TestTaskInstance {
   @Test
@@ -57,24 +53,20 @@ class TestTaskInstance {
     val producerMultiplexer = new SystemProducers(
       Map[String, SystemProducer](),
       new SerdeManager)
-    val systemStream = new SystemStream("test-system", "test-stream")
-    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-    // Pretend our last checkpointed (next) offset was 2.
-    val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
-    val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       partition,
       config,
       new TaskInstanceMetrics,
-      consumerMultiplexer,
-      producerMultiplexer,
-      offsetManager)
-    // Pretend we got a message with offset 2 and next offset 3.
-    taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), new ReadableCoordinator)
-    // Check to see if the offset manager has been properly updated with offset 3.
-    val lastProcessedOffset = offsetManager.getLastProcessedOffset(systemStreamPartition)
-    assertTrue(lastProcessedOffset.isDefined)
-    assertEquals("2", lastProcessedOffset.get)
+      consumerMultiplexer: SystemConsumers,
+      producerMultiplexer: SystemProducers)
+    val systemStream = new SystemStream("test-system", "test-stream")
+    // Pretend our last checkpointed offset was 1.
+    taskInstance.offsets += systemStream -> "1"
+    // Pretend we got a message with offset 2.
+    taskInstance.process(new IncomingMessageEnvelope(new SystemStreamPartition("test-system", "test-stream", new Partition(0)), "2", null, null), new ReadableCoordinator)
+    // Check to see if the offset map has been properly updated with offset 2.
+    assertEquals(1, taskInstance.offsets.size)
+    assertEquals("2", taskInstance.offsets(systemStream))
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index dafc980..5325549 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -37,7 +37,6 @@ import grizzled.slf4j.Logging
 import java.util.UUID
 import scala.collection.JavaConversions._
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import kafka.consumer.ConsumerConfig
 
 object KafkaSystemAdmin extends Logging {
   /**
@@ -100,7 +99,7 @@ class KafkaSystemAdmin(
    * from Kafka. Equivalent to Kafka's socket.receive.buffer.bytes
    * configuration.
    */
-  bufferSize: Int = ConsumerConfig.SocketBufferSize,
+  bufferSize: Int = 1024000,
 
   /**
    * The client ID to use for the simple consumer when fetching metadata from
@@ -110,17 +109,6 @@ class KafkaSystemAdmin(
 
   import KafkaSystemAdmin._
 
-  /**
-   * Returns the offset for the message after the specified offset for each
-   * SystemStreamPartition that was passed in.
-   */
-  def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = {
-    // This is safe to do with Kafka, even if a topic is key-deduped. If the 
-    // offset doesn't exist on a compacted topic, Kafka will return the first 
-    // message AFTER the offset that was specified in the fetch request.
-    offsets.mapValues(offset => (offset.toLong + 1).toString)
-  }
-
   def getSystemStreamMetadata(streams: java.util.Set[String]) =
     getSystemStreamMetadata(streams, new ExponentialSleepStrategy(initialDelayMs = 500))
 
@@ -139,7 +127,7 @@ class KafkaSystemAdmin(
     var done = false
     var consumer: SimpleConsumer = null
 
-    debug("Fetching system stream metadata for: %s" format streams)
+    debug("Fetching offsets for: %s" format streams)
 
     while (!done) {
       try {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index cd9d926..eaa9e53 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -274,23 +274,12 @@ class TestKafkaSystemAdmin {
     val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic"))
     val metadata = initialOffsets.getOrElse("non-existent-topic", fail("missing metadata"))
     assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map(
-      new Partition(0) -> new SystemStreamPartitionMetadata(null, null, "0"))))
-  }
-
-  @Test
-  def testOffsetsAfter {
-    val systemAdmin = new KafkaSystemAdmin("test", brokers)
-    val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
-    val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(1))
-    val offsetsAfter = systemAdmin.getOffsetsAfter(Map(
-      ssp1 -> "1",
-      ssp2 -> "2"))
-    assertEquals("2", offsetsAfter(ssp1))
-    assertEquals("3", offsetsAfter(ssp2))
+      new Partition(0) -> new SystemStreamPartitionMetadata(null, null, "0")
+    )))
   }
 
   class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers) {
-    import kafka.api.{ TopicMetadata, TopicMetadataResponse }
+    import kafka.api.{TopicMetadata, TopicMetadataResponse}
 
     // Simulate Kafka telling us that the leader for the topic is not available
     override def getTopicMetadata(topics: Set[String]) = {
@@ -303,7 +292,7 @@ class TestKafkaSystemAdmin {
 
   class MockSleepStrategy(maxCalls: Int) extends ExponentialSleepStrategy {
     var countCalls = 0
-
+  
     override def sleep() = {
       if (countCalls >= maxCalls) throw new CallLimitReached
       countCalls += 1

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
index fa1d51b..2abe1c8 100644
--- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
+++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
@@ -25,7 +25,6 @@ import java.util.Set;
 import org.apache.samza.Partition;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
 
 /**
@@ -52,15 +51,4 @@ public class MockSystemAdmin implements SystemAdmin {
 
     return metadata;
   }
-
-  @Override
-  public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
-    Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>();
-
-    for (SystemStreamPartition systemStreamPartition : offsets.keySet()) {
-      offsetsAfter.put(systemStreamPartition, null);
-    }
-
-    return offsetsAfter;
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/2811c2a7/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 8177cbf..7e81387 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -211,16 +211,24 @@ class TestStatefulTask {
     "stores.mystore.factory" -> "org.apache.samza.storage.kv.KeyValueStorageEngineFactory",
     "stores.mystore.key.serde" -> "string",
     "stores.mystore.msg.serde" -> "string",
-    "stores.mystore.changelog" -> "kafka.mystore",
+    "stores.mystore.changelog" -> "kafka-state.mystore",
+
+    // TODO we don't need two different systems once SAMZA-157 is committed. 
+    // We will be able to do per-stream offset defaults.
 
     // Use smallest reset for input streams, so we can fix SAMZA-166.
     "systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory",
-    "systems.kafka.samza.offset.default" -> "oldest",
-    "systems.kafka.samza.msg.serde" -> "string",
     "systems.kafka.consumer.zookeeper.connect" -> zkConnect,
-    // Use largest offset for reset, so we can test SAMZA-142.
-    "systems.kafka.consumer.auto.offset.reset" -> "largest",
-    "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format port1))
+    "systems.kafka.consumer.auto.offset.reset" -> "smallest",
+    "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format port1),
+    "systems.kafka.samza.msg.serde" -> "string",
+
+    // Use largest offset for changelog stream, so we can test SAMZA-142.
+    "systems.kafka-state.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory",
+    "systems.kafka-state.consumer.zookeeper.connect" -> zkConnect,
+    "systems.kafka-state.consumer.auto.offset.reset" -> "smallest",
+    "systems.kafka-state.producer.metadata.broker.list" -> ("localhost:%s" format port1),
+    "systems.kafka-state.samza.msg.serde" -> "string")
 
   @Test
   def testShouldStartAndRestore {