You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/12/21 22:11:15 UTC

[1/2] samza git commit: SAMZA-1985: Startpoint and StartpointManager implementation.

Repository: samza
Updated Branches:
  refs/heads/master 091caef99 -> 0937c1dc0


http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index 7a16099..31ff9cf 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -20,14 +20,17 @@
 package org.apache.samza.checkpoint
 
 import java.util
+
+import org.apache.samza.{Partition, SamzaException}
+import org.apache.samza.config.MapConfig
 import org.apache.samza.container.TaskName
-import org.apache.samza.Partition
-import org.apache.samza.system._
+import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory
+import org.apache.samza.startpoint.{StartpointManager, StartpointOldest, StartpointUpcoming}
 import org.apache.samza.system.SystemStreamMetadata.{OffsetType, SystemStreamPartitionMetadata}
+import org.apache.samza.system._
+import org.apache.samza.util.NoOpMetricsRegistry
 import org.junit.Assert._
 import org.junit.Test
-import org.apache.samza.SamzaException
-import org.apache.samza.config.MapConfig
 import org.mockito.Mockito.{mock, when}
 import org.scalatest.Assertions.intercept
 
@@ -61,10 +64,13 @@ class TestOffsetManager {
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
     val config = new MapConfig
     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
+    val startpointManager = getStartpointManager()
     val systemAdmins = mock(classOf[SystemAdmins])
     when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, getStartpointManager(), systemAdmins, Map(), new OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition))
+    startpointManager.writeStartpoint(systemStreamPartition, taskName, new StartpointOldest)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, taskName))
     offsetManager.start
     assertTrue(checkpointManager.isStarted)
     assertEquals(1, checkpointManager.registered.size)
@@ -82,9 +88,41 @@ class TestOffsetManager {
     // Should not update null offset
     offsetManager.update(taskName, systemStreamPartition, null)
     checkpoint(offsetManager, taskName)
+    assertNull(startpointManager.readStartpoint(systemStreamPartition, taskName)) // Startpoint should delete after checkpoint commit
     val expectedCheckpoint = new Checkpoint(Map(systemStreamPartition -> "47").asJava)
     assertEquals(expectedCheckpoint, checkpointManager.readLastCheckpoint(taskName))
   }
+  @Test
+  def testGetAndSetStartpoint {
+    val taskName1 = new TaskName("c")
+    val taskName2 = new TaskName("d")
+    val systemStream = new SystemStream("test-system", "test-stream")
+    val partition = new Partition(0)
+    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
+    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
+    val config = new MapConfig
+    val checkpointManager = getCheckpointManager(systemStreamPartition, taskName1)
+    val startpointManager = getStartpointManager()
+    val systemAdmins = mock(classOf[SystemAdmins])
+    when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, getStartpointManager(), systemAdmins, Map(), new OffsetManagerMetrics)
+
+    offsetManager.register(taskName1, Set(systemStreamPartition))
+    val startpoint1 = new StartpointOldest
+    startpointManager.writeStartpoint(systemStreamPartition, taskName1, startpoint1)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, taskName1))
+    offsetManager.start
+    val startpoint2 = new StartpointUpcoming
+    offsetManager.setStartpoint(taskName2, systemStreamPartition, startpoint2)
+
+    assertEquals(Option(startpoint1), offsetManager.getStartpoint(taskName1, systemStreamPartition))
+    assertEquals(Option(startpoint2), offsetManager.getStartpoint(taskName2, systemStreamPartition))
+
+    assertEquals(startpoint1, startpointManager.readStartpoint(systemStreamPartition, taskName1))
+    // Startpoint written to offset manager, but not directly to startpoint manager.
+    assertNull(startpointManager.readStartpoint(systemStreamPartition, taskName2))
+  }
 
   @Test
   def testGetCheckpointedOffsetMetric{
@@ -96,20 +134,33 @@ class TestOffsetManager {
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
     val config = new MapConfig
     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
+    val startpointManager = getStartpointManager()
     val systemAdmins = mock(classOf[SystemAdmins])
     when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, getStartpointManager(), systemAdmins, Map(), new OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition))
+
+    // Pre-populate startpoint
+    startpointManager.writeStartpoint(systemStreamPartition, taskName, new StartpointOldest)
     offsetManager.start
     // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, taskName))
     checkpoint(offsetManager, taskName)
+    assertNull(startpointManager.readStartpoint(systemStreamPartition, taskName)) // Startpoint should delete after checkpoint commit
     assertEquals("45", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
     offsetManager.update(taskName, systemStreamPartition, "46")
+
     offsetManager.update(taskName, systemStreamPartition, "47")
+    startpointManager.writeStartpoint(systemStreamPartition, taskName, new StartpointOldest)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, taskName))
     checkpoint(offsetManager, taskName)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, taskName)) // Startpoint should only be deleted at first checkpoint
     assertEquals("47", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
+
     offsetManager.update(taskName, systemStreamPartition, "48")
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, taskName))
     checkpoint(offsetManager, taskName)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, taskName)) // Startpoint should only be deleted at first checkpoint
     assertEquals("48", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
   }
 
@@ -153,18 +204,22 @@ class TestOffsetManager {
     val checkpoint = new Checkpoint(Map(systemStreamPartition1 -> "45").asJava)
     // Checkpoint manager only has partition 1.
     val checkpointManager = getCheckpointManager(systemStreamPartition1, taskName1)
+    val startpointManager = getStartpointManager()
     val config = new MapConfig
     val systemAdmins = mock(classOf[SystemAdmins])
     when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, getStartpointManager(), systemAdmins)
     // Register both partitions. Partition 2 shouldn't have a checkpoint.
     offsetManager.register(taskName1, Set(systemStreamPartition1))
     offsetManager.register(taskName2, Set(systemStreamPartition2))
+    startpointManager.writeStartpoint(systemStreamPartition1, taskName1, new StartpointOldest)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition1, taskName1))
     offsetManager.start
     assertTrue(checkpointManager.isStarted)
     assertEquals(2, checkpointManager.registered.size)
     assertEquals(checkpoint, checkpointManager.readLastCheckpoint(taskName1))
     assertNull(checkpointManager.readLastCheckpoint(taskName2))
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition1, taskName1)) // no checkpoint commit so this should still be there
   }
 
   @Test
@@ -256,6 +311,7 @@ class TestOffsetManager {
     val checkpointManager = getCheckpointManager1(systemStreamPartition,
                                                  new Checkpoint(Map(systemStreamPartition -> "45", systemStreamPartition2 -> "100").asJava),
                                                  taskName)
+    val startpointManager = getStartpointManager()
     val consumer = new SystemConsumerWithCheckpointCallback
     val systemAdmins = mock(classOf[SystemAdmins])
     when(systemAdmins.getSystemAdmin(systemName)).thenReturn(getSystemAdmin)
@@ -266,13 +322,16 @@ class TestOffsetManager {
     else
       Map()
 
-    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins,
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, getStartpointManager(), systemAdmins,
       checkpointListeners, new OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition, systemStreamPartition2))
 
+    startpointManager.writeStartpoint(systemStreamPartition, taskName, new StartpointOldest)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, taskName))
     offsetManager.start
     // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
     checkpoint(offsetManager, taskName)
+    assertNull(startpointManager.readStartpoint(systemStreamPartition, taskName)) // Startpoint be deleted at first checkpoint
     assertEquals("45", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
     assertEquals("100", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition2).getValue)
     assertEquals("45", consumer.recentCheckpoint.get(systemStreamPartition))
@@ -281,7 +340,10 @@ class TestOffsetManager {
 
     offsetManager.update(taskName, systemStreamPartition, "46")
     offsetManager.update(taskName, systemStreamPartition, "47")
+    startpointManager.writeStartpoint(systemStreamPartition, taskName, new StartpointOldest)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, taskName))
     checkpoint(offsetManager, taskName)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, taskName)) // Startpoint should only be deleted at first checkpoint
     assertEquals("47", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
     assertEquals("100", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition2).getValue)
     assertEquals("47", consumer.recentCheckpoint.get(systemStreamPartition))
@@ -289,7 +351,10 @@ class TestOffsetManager {
 
     offsetManager.update(taskName, systemStreamPartition, "48")
     offsetManager.update(taskName, systemStreamPartition2, "101")
+    startpointManager.writeStartpoint(systemStreamPartition, taskName, new StartpointOldest)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, taskName))
     checkpoint(offsetManager, taskName)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, taskName)) // Startpoint should only be deleted at first checkpoint
     assertEquals("48", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
     assertEquals("101", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition2).getValue)
     assertEquals("48", consumer.recentCheckpoint.get(systemStreamPartition))
@@ -312,27 +377,35 @@ class TestOffsetManager {
     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
+    val startpointManager = getStartpointManager()
     val systemAdmins = mock(classOf[SystemAdmins])
     when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig, checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
+    val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig, checkpointManager, getStartpointManager(), systemAdmins, Map(), new OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition))
+    startpointManager.writeStartpoint(systemStreamPartition, taskName, new StartpointOldest)
     offsetManager.start
 
     // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, taskName))
     checkpoint(offsetManager, taskName)
+    assertNull(startpointManager.readStartpoint(systemStreamPartition, taskName)) // Startpoint be deleted at first checkpoint
     assertEquals("45", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
 
+    startpointManager.writeStartpoint(systemStreamPartition, taskName, new StartpointOldest)
+
     offsetManager.update(taskName, systemStreamPartition, "46")
     // Get checkpoint snapshot like we do at the beginning of TaskInstance.commit()
     val checkpoint46 = offsetManager.buildCheckpoint(taskName)
     offsetManager.update(taskName, systemStreamPartition, "47") // Offset updated before checkpoint
     offsetManager.writeCheckpoint(taskName, checkpoint46)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, taskName)) // Startpoint should only be deleted at first checkpoint
     assertEquals(Some("47"), offsetManager.getLastProcessedOffset(taskName, systemStreamPartition))
     assertEquals("46", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
 
     // Now write the checkpoint for the latest offset
     val checkpoint47 = offsetManager.buildCheckpoint(taskName)
     offsetManager.writeCheckpoint(taskName, checkpoint47)
+    assertNotNull(startpointManager.readStartpoint(systemStreamPartition, taskName)) // Startpoint should only be deleted at first checkpoint
     assertEquals(Some("47"), offsetManager.getLastProcessedOffset(taskName, systemStreamPartition))
     assertEquals("47", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
   }
@@ -380,6 +453,12 @@ class TestOffsetManager {
     }
   }
 
+  private def getStartpointManager() = {
+    val startpointManager = new StartpointManager(new InMemoryMetadataStoreFactory, new MapConfig, new NoOpMetricsRegistry)
+    startpointManager.start
+    startpointManager
+  }
+
   private def getSystemAdmin: SystemAdmin = {
     new SystemAdmin {
       def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) =

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index e1ca2e6..bfea961 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -45,6 +45,7 @@ import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 import org.apache.samza.context.ExternalContext;
 import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory;
 import org.apache.samza.operators.KV;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
@@ -106,6 +107,7 @@ public class TestRunner {
     configs.put(JobConfig.JOB_NAME(), JOB_NAME);
     configs.put(JobConfig.PROCESSOR_ID(), "1");
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
+    configs.put(JobConfig.STARTPOINT_METADATA_STORE_FACTORY(), InMemoryMetadataStoreFactory.class.getCanonicalName());
     configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
     // Changing the base directory for non-changelog stores used by Samza application to separate the
     // on-disk store locations for concurrently executing tests


[2/2] samza git commit: SAMZA-1985: Startpoint and StartpointManager implementation.

Posted by ja...@apache.org.
SAMZA-1985: Startpoint and StartpointManager implementation.

This is the first PR for [SEP-18](https://cwiki.apache.org/confluence/display/SAMZA/SEP-18%3A+Startpoints+-+Manipulating+Starting+Offsets+for+Input+Streams). Please see updated SEP-18 for details.
This PR implements the StartpointManager and Startpoint model and the initial integration with the OffsetManager. The OffsetManager manages the deletion of Startpoints when the initial checkpoint commits happen per task after start-up.

The immediate follow-ons to this PR are:
1. Have the various `JobCoordinators` to re-map the Startpoints appropriately to each task by utilizing the `StartpointManager#groupStartpointsPerTask(SystemStreamPartition, JobModel)` method implemented in this PR. SEP-18 describes this in more detail.
2. Implement `StartpointConsumerVisitor` for each of the provided `SystemConsumer`s.

Author: Daniel Nishimura <dn...@linkedin.com>

Reviewers: Jake Maes<jm...@linkedin.com>, Cameron L<ca...@linkedin.com>

Closes #860 from dnishimura/samza-1985-startpoint-manager


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0937c1dc
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0937c1dc
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0937c1dc

Branch: refs/heads/master
Commit: 0937c1dc040cabcf54846862bbb764cde83a6379
Parents: 091caef
Author: Daniel Nishimura <dn...@linkedin.com>
Authored: Fri Dec 21 14:11:07 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Fri Dec 21 14:11:07 2018 -0800

----------------------------------------------------------------------
 .../metadatastore/InMemoryMetadataStore.java    |  61 +++++
 .../InMemoryMetadataStoreFactory.java           |  39 +++
 .../org/apache/samza/startpoint/Startpoint.java |  80 ++++++
 .../samza/startpoint/StartpointCustom.java      |  43 ++++
 .../samza/startpoint/StartpointOldest.java      |  46 ++++
 .../samza/startpoint/StartpointSpecific.java    |  83 ++++++
 .../samza/startpoint/StartpointTimestamp.java   |  83 ++++++
 .../samza/startpoint/StartpointUpcoming.java    |  46 ++++
 .../samza/startpoint/StartpointVisitor.java     |  71 +++++
 .../samza/startpoint/MockStartpointCustom.java  |  50 ++++
 .../apache/samza/startpoint/TestStartpoint.java | 112 ++++++++
 .../apache/samza/startpoint/StartpointKey.java  |  72 ++++++
 .../samza/startpoint/StartpointManager.java     | 241 +++++++++++++++++
 .../samza/startpoint/StartpointSerde.java       |  59 +++++
 .../apache/samza/checkpoint/OffsetManager.scala | 114 +++++++-
 .../org/apache/samza/config/JobConfig.scala     |   3 +
 .../apache/samza/container/SamzaContainer.scala |  18 +-
 .../samza/startpoint/TestStartpointKey.java     |  64 +++++
 .../samza/startpoint/TestStartpointManager.java | 257 +++++++++++++++++++
 .../samza/startpoint/TestStartpointSerde.java   |  76 ++++++
 .../samza/checkpoint/TestOffsetManager.scala    |  97 ++++++-
 .../apache/samza/test/framework/TestRunner.java |   2 +
 22 files changed, 1698 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-api/src/main/java/org/apache/samza/metadatastore/InMemoryMetadataStore.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metadatastore/InMemoryMetadataStore.java b/samza-api/src/main/java/org/apache/samza/metadatastore/InMemoryMetadataStore.java
new file mode 100644
index 0000000..eadc712
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metadatastore/InMemoryMetadataStore.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metadatastore;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * In-memory {@link MetadataStore} with no persistence on disk.
+ */
+public class InMemoryMetadataStore implements MetadataStore {
+
+  private final ConcurrentHashMap<String, byte[]> memStore = new ConcurrentHashMap<>();
+
+  @Override
+  public void init() { }
+
+  @Override
+  public byte[] get(String key) {
+    return memStore.get(key);
+  }
+
+  @Override
+  public void put(String key, byte[] value) {
+    memStore.put(key, value);
+  }
+
+  @Override
+  public void delete(String key) {
+    memStore.remove(key);
+  }
+
+  @Override
+  public Map<String, byte[]> all() {
+    return ImmutableMap.copyOf(memStore);
+  }
+
+  @Override
+  public void flush() { }
+
+  @Override
+  public void close() { }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-api/src/main/java/org/apache/samza/metadatastore/InMemoryMetadataStoreFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metadatastore/InMemoryMetadataStoreFactory.java b/samza-api/src/main/java/org/apache/samza/metadatastore/InMemoryMetadataStoreFactory.java
new file mode 100644
index 0000000..6d07287
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metadatastore/InMemoryMetadataStoreFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metadatastore;
+
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+/**
+ * Factory for an in-memory {@link MetadataStore}. Data is not persisted outside of memory and will be cleared on
+ * subsequent restarts.
+ */
+public class InMemoryMetadataStoreFactory implements MetadataStoreFactory {
+
+  private static final ConcurrentHashMap<String, InMemoryMetadataStore> NAMESPACED_STORES = new ConcurrentHashMap<>();
+
+  @Override
+  public MetadataStore getMetadataStore(String namespace, Config config, MetricsRegistry metricsRegistry) {
+    NAMESPACED_STORES.putIfAbsent(namespace, new InMemoryMetadataStore());
+    return NAMESPACED_STORES.get(namespace);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-api/src/main/java/org/apache/samza/startpoint/Startpoint.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/Startpoint.java b/samza-api/src/main/java/org/apache/samza/startpoint/Startpoint.java
new file mode 100644
index 0000000..e305023
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/Startpoint.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.startpoint;
+
+import com.google.common.base.Objects;
+import java.time.Instant;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * Startpoint represents a position in a stream partition.
+ */
+@InterfaceStability.Unstable
+public abstract class Startpoint {
+
+  private final long creationTimestamp;
+
+  Startpoint() {
+    this(Instant.now().toEpochMilli());
+  }
+
+  Startpoint(long creationTimestamp) {
+    this.creationTimestamp = creationTimestamp;
+  }
+
+  /**
+   * The timestamp when this {@link Startpoint} was written to the storage layer.
+   * @return a timestamp in epoch milliseconds.
+   */
+  public long getCreationTimestamp() {
+    return creationTimestamp;
+  }
+
+  /**
+   * Apply the visitor {@link StartpointVisitor}'s register methods to the instance of this {@link Startpoint}
+   * class.
+   * @param systemStreamPartition The {@link SystemStreamPartition} needed to register with the {@link StartpointVisitor}
+   * @param startpointVisitor The visitor to register with.
+   */
+  public abstract void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor);
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this).toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    Startpoint that = (Startpoint) o;
+    return Objects.equal(creationTimestamp, that.creationTimestamp);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(creationTimestamp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-api/src/main/java/org/apache/samza/startpoint/StartpointCustom.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointCustom.java b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointCustom.java
new file mode 100644
index 0000000..a52c974
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointCustom.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.startpoint;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * A {@link Startpoint} that represents a custom startpoint. This is for systems that have a non-generic option
+ * for setting offsets. Startpoints are serialized to JSON in the {@link org.apache.samza.metadatastore.MetadataStore}
+ * and it is recommended to maintain the subclass of this {@link StartpointCustom} as a simple POJO.
+ */
+public abstract class StartpointCustom extends Startpoint {
+
+  StartpointCustom() {
+    super();
+  }
+
+  StartpointCustom(long creationTimestamp) {
+    super(creationTimestamp);
+  }
+
+  @Override
+  public void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor) {
+    startpointVisitor.visit(systemStreamPartition, this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-api/src/main/java/org/apache/samza/startpoint/StartpointOldest.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointOldest.java b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointOldest.java
new file mode 100644
index 0000000..1e5662c
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointOldest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.startpoint;
+
+import com.google.common.base.Objects;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * A {@link Startpoint} that represents the earliest offset in a stream partition.
+ */
+public final class StartpointOldest extends Startpoint {
+
+  /**
+   * Constructs a {@link Startpoint} that represents the earliest offset in a stream partition.
+   */
+  public StartpointOldest() {
+    super();
+  }
+
+  @Override
+  public void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor) {
+    startpointVisitor.visit(systemStreamPartition, this);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-api/src/main/java/org/apache/samza/startpoint/StartpointSpecific.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointSpecific.java b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointSpecific.java
new file mode 100644
index 0000000..b4e1d4f
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointSpecific.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.startpoint;
+
+import com.google.common.base.Objects;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * A {@link Startpoint} that represents a specific offset in a stream partition.
+ */
+public final class StartpointSpecific extends Startpoint {
+
+  private final String specificOffset;
+
+  // Default constructor needed by serde.
+  private StartpointSpecific() {
+    this(null);
+  }
+
+  /**
+   * Constructs a {@link Startpoint} that represents a specific offset in a stream partition.
+   * @param specificOffset Specific offset in a stream partition.
+   */
+  public StartpointSpecific(String specificOffset) {
+    super();
+    this.specificOffset = specificOffset;
+  }
+
+  /**
+   * Getter for the specific offset.
+   * @return the specific offset.
+   */
+  public String getSpecificOffset() {
+    return specificOffset;
+  }
+
+  @Override
+  public void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor) {
+    startpointVisitor.visit(systemStreamPartition, this);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this).add("specificOffset", specificOffset).toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    StartpointSpecific that = (StartpointSpecific) o;
+    return Objects.equal(specificOffset, that.specificOffset);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(super.hashCode(), specificOffset);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-api/src/main/java/org/apache/samza/startpoint/StartpointTimestamp.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointTimestamp.java b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointTimestamp.java
new file mode 100644
index 0000000..33020bf
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointTimestamp.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.startpoint;
+
+import com.google.common.base.Objects;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * A {@link Startpoint} that represents a timestamp offset in a stream partition.
+ */
+public final class StartpointTimestamp extends Startpoint {
+
+  private final Long timestampOffset;
+
+  // Default constructor needed by serde.
+  private StartpointTimestamp() {
+    this(null);
+  }
+
+  /**
+   * Constructs a {@link Startpoint} that represents a timestamp offset in a stream partition.
+   * @param timestampOffset in a stream partition in milliseconds.
+   */
+  public StartpointTimestamp(Long timestampOffset) {
+    super();
+    this.timestampOffset = timestampOffset;
+  }
+
+  /**
+   * Getter for the timestamp offset.
+   * @return the timestamp offset in milliseconds.
+   */
+  public Long getTimestampOffset() {
+    return timestampOffset;
+  }
+
+  @Override
+  public void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor) {
+    startpointVisitor.visit(systemStreamPartition, this);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this).add("timestampOffset", timestampOffset).toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    StartpointTimestamp that = (StartpointTimestamp) o;
+    return Objects.equal(timestampOffset, that.timestampOffset);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(super.hashCode(), timestampOffset);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-api/src/main/java/org/apache/samza/startpoint/StartpointUpcoming.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointUpcoming.java b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointUpcoming.java
new file mode 100644
index 0000000..5d02068
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointUpcoming.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.startpoint;
+
+import com.google.common.base.Objects;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * A {@link Startpoint} that represents the latest offset in a stream partition.
+ */
+public final class StartpointUpcoming extends Startpoint {
+
+  /**
+   * Constructs a {@link Startpoint} that represents the latest offset in a stream partition.
+   */
+  public StartpointUpcoming() {
+    super();
+  }
+
+  @Override
+  public void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor) {
+    startpointVisitor.visit(systemStreamPartition, this);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-api/src/main/java/org/apache/samza/startpoint/StartpointVisitor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointVisitor.java b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointVisitor.java
new file mode 100644
index 0000000..99151ab
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointVisitor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.startpoint;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * Visitor interface for system consumers to implement to support {@link Startpoint}s.
+ */
+public interface StartpointVisitor {
+
+  /**
+   * Seek to specific offset represented by {@link StartpointSpecific}
+   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
+   * @param startpointSpecific The {@link Startpoint} that represents the specific offset.
+   */
+  void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific);
+
+  /**
+   * Seek to timestamp offset represented by {@link StartpointTimestamp}
+   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
+   * @param startpointTimestamp The {@link Startpoint} that represents the timestamp offset.
+   */
+  default void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
+    throw new UnsupportedOperationException("StartpointTimestamp is not supported.");
+  }
+
+  /**
+   * Seek to earliest offset represented by {@link StartpointOldest}
+   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
+   * @param startpointOldest The {@link Startpoint} that represents the earliest offset.
+   */
+  default void visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
+    throw new UnsupportedOperationException("StartpointOldest is not supported.");
+  }
+
+  /**
+   * Seek to latest offset represented by {@link StartpointUpcoming}
+   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
+   * @param startpointUpcoming The {@link Startpoint} that represents the latest offset.
+   */
+  default void visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
+    throw new UnsupportedOperationException("StartpointUpcoming is not supported.");
+  }
+
+  /**
+   * Bootstrap signal represented by {@link StartpointCustom}
+   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
+   * @param startpointCustom The {@link Startpoint} that represents the bootstrap signal.
+   */
+  default void visit(SystemStreamPartition systemStreamPartition, StartpointCustom startpointCustom) {
+    throw new UnsupportedOperationException(String.format("%s is not supported.", startpointCustom.getClass().getSimpleName()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-api/src/test/java/org/apache/samza/startpoint/MockStartpointCustom.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/startpoint/MockStartpointCustom.java b/samza-api/src/test/java/org/apache/samza/startpoint/MockStartpointCustom.java
new file mode 100644
index 0000000..de7b18a
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/startpoint/MockStartpointCustom.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.startpoint;
+
+import java.time.Instant;
+
+
+public class MockStartpointCustom extends StartpointCustom {
+  private final String testInfo1;
+  private final long testInfo2;
+
+  // Default constructor needed for serde.
+  private MockStartpointCustom() {
+    this(null, 0);
+  }
+
+  public MockStartpointCustom(String testInfo1, long testInfo2) {
+    this(testInfo1, testInfo2, Instant.now().toEpochMilli());
+  }
+
+  public MockStartpointCustom(String testInfo1, long testInfo2, long creationTimestamp) {
+    super(creationTimestamp);
+    this.testInfo1 = testInfo1;
+    this.testInfo2 = testInfo2;
+  }
+
+  public String getTestInfo1() {
+    return testInfo1;
+  }
+
+  public long getTestInfo2() {
+    return testInfo2;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-api/src/test/java/org/apache/samza/startpoint/TestStartpoint.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/startpoint/TestStartpoint.java b/samza-api/src/test/java/org/apache/samza/startpoint/TestStartpoint.java
new file mode 100644
index 0000000..b54e569
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/startpoint/TestStartpoint.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.startpoint;
+
+import java.time.Instant;
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestStartpoint {
+
+  @Test
+  public void testStartpointSpecific() {
+    StartpointSpecific startpoint = new StartpointSpecific("123");
+    Assert.assertEquals("123", startpoint.getSpecificOffset());
+    Assert.assertTrue(startpoint.getCreationTimestamp() <= Instant.now().toEpochMilli());
+
+    MockStartpointVisitor mockStartpointVisitorConsumer = new MockStartpointVisitor();
+    startpoint.apply(new SystemStreamPartition("sys", "stream", new Partition(1)), mockStartpointVisitorConsumer);
+    Assert.assertEquals(StartpointSpecific.class, mockStartpointVisitorConsumer.visitedClass);
+  }
+
+  @Test
+  public void testStartpointTimestamp() {
+    StartpointTimestamp startpoint = new StartpointTimestamp(2222222L);
+    Assert.assertEquals(2222222L, startpoint.getTimestampOffset().longValue());
+    Assert.assertTrue(startpoint.getCreationTimestamp() <= Instant.now().toEpochMilli());
+
+    MockStartpointVisitor mockStartpointVisitorConsumer = new MockStartpointVisitor();
+    startpoint.apply(new SystemStreamPartition("sys", "stream", new Partition(1)), mockStartpointVisitorConsumer);
+    Assert.assertEquals(StartpointTimestamp.class, mockStartpointVisitorConsumer.visitedClass);
+  }
+
+  @Test
+  public void testStartpointEarliest() {
+    StartpointOldest startpoint = new StartpointOldest();
+    Assert.assertTrue(startpoint.getCreationTimestamp() <= Instant.now().toEpochMilli());
+
+    MockStartpointVisitor mockStartpointVisitorConsumer = new MockStartpointVisitor();
+    startpoint.apply(new SystemStreamPartition("sys", "stream", new Partition(1)), mockStartpointVisitorConsumer);
+    Assert.assertEquals(StartpointOldest.class, mockStartpointVisitorConsumer.visitedClass);
+  }
+
+  @Test
+  public void testStartpointLatest() {
+    StartpointUpcoming startpoint = new StartpointUpcoming();
+    Assert.assertTrue(startpoint.getCreationTimestamp() <= Instant.now().toEpochMilli());
+
+    MockStartpointVisitor mockStartpointVisitorConsumer = new MockStartpointVisitor();
+    startpoint.apply(new SystemStreamPartition("sys", "stream", new Partition(1)), mockStartpointVisitorConsumer);
+    Assert.assertEquals(StartpointUpcoming.class, mockStartpointVisitorConsumer.visitedClass);
+  }
+
+  @Test
+  public void testStartpointCustom() {
+    MockStartpointCustom startpoint = new MockStartpointCustom("test12345", 12345);
+    Assert.assertEquals("test12345", startpoint.getTestInfo1());
+    Assert.assertEquals(12345, startpoint.getTestInfo2());
+    Assert.assertTrue(startpoint.getCreationTimestamp() <= Instant.now().toEpochMilli());
+
+    MockStartpointVisitor mockStartpointVisitorConsumer = new MockStartpointVisitor();
+    startpoint.apply(new SystemStreamPartition("sys", "stream", new Partition(1)), mockStartpointVisitorConsumer);
+    Assert.assertEquals(MockStartpointCustom.class, mockStartpointVisitorConsumer.visitedClass);
+  }
+
+  static class MockStartpointVisitor implements StartpointVisitor {
+    Class<? extends Startpoint> visitedClass;
+
+    @Override
+    public void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) {
+      visitedClass = startpointSpecific.getClass();
+    }
+
+    @Override
+    public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
+      visitedClass = startpointTimestamp.getClass();
+    }
+
+    @Override
+    public void visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
+      visitedClass = startpointOldest.getClass();
+    }
+
+    @Override
+    public void visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
+      visitedClass = startpointUpcoming.getClass();
+    }
+
+    @Override
+    public void visit(SystemStreamPartition systemStreamPartition, StartpointCustom startpointCustom) {
+      visitedClass = startpointCustom.getClass();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-core/src/main/java/org/apache/samza/startpoint/StartpointKey.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointKey.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointKey.java
new file mode 100644
index 0000000..9c4e49c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointKey.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.startpoint;
+
+import com.google.common.base.Objects;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+
+
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
+class StartpointKey {
+  private final SystemStreamPartition systemStreamPartition;
+  private final TaskName taskName;
+
+  // Constructs a startpoint key with SSP. This means the key will apply to all tasks that are mapped to this SSP
+  StartpointKey(SystemStreamPartition systemStreamPartition) {
+    this(systemStreamPartition, null);
+  }
+
+  // Constructs a startpoint key with SSP and a task.
+  StartpointKey(SystemStreamPartition systemStreamPartition, TaskName taskName) {
+    this.systemStreamPartition = systemStreamPartition;
+    this.taskName = taskName;
+  }
+
+  SystemStreamPartition getSystemStreamPartition() {
+    return systemStreamPartition;
+  }
+
+  TaskName getTaskName() {
+    return taskName;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this).toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    StartpointKey that = (StartpointKey) o;
+    return Objects.equal(systemStreamPartition, that.systemStreamPartition) && Objects.equal(taskName, that.taskName);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(systemStreamPartition, taskName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
new file mode 100644
index 0000000..6005c92
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.startpoint;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metadatastore.MetadataStoreFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The StartpointManager reads and writes {@link Startpoint} to the {@link MetadataStore} defined by
+ * the configuration task.startpoint.metadata.store.factory.
+ *
+ * Startpoints are keyed in the MetadataStore by two different formats:
+ * 1) Only by {@link SystemStreamPartition}
+ * 2) A combination of {@link SystemStreamPartition} and {@link TaskName}
+ *
+ * The intention for the StartpointManager is to maintain a strong contract between the caller
+ * and how Startpoints are stored in the underlying MetadataStore.
+ */
+public class StartpointManager {
+  private static final Logger LOG = LoggerFactory.getLogger(StartpointManager.class);
+  private static final String NAMESPACE = "samza-startpoint-v1";
+
+  static final Duration DEFAULT_EXPIRATION_DURATION = Duration.ofHours(12);
+
+  private final MetadataStore metadataStore;
+  private final StartpointSerde startpointSerde = new StartpointSerde();
+
+  private boolean started = false;
+
+  /**
+   * Constructs a {@link StartpointManager} instance with the provided {@link MetadataStoreFactory}
+   * @param metadataStoreFactory {@link MetadataStoreFactory} used to construct the underlying store.
+   * @param config {@link Config} required for the underlying store.
+   * @param metricsRegistry {@link MetricsRegistry} to hook into the underlying store.
+   */
+  public StartpointManager(MetadataStoreFactory metadataStoreFactory, Config config, MetricsRegistry metricsRegistry) {
+    Preconditions.checkNotNull(metadataStoreFactory, "MetadataStoreFactory cannot be null");
+    Preconditions.checkNotNull(config, "Config cannot be null");
+    Preconditions.checkNotNull(metricsRegistry, "MetricsRegistry cannot be null");
+
+    this.metadataStore = metadataStoreFactory.getMetadataStore(NAMESPACE, config, metricsRegistry);
+    LOG.info("StartpointManager created with metadata store: {}", metadataStore.getClass().getCanonicalName());
+  }
+
+  /**
+   * Starts the underlying {@link MetadataStore}
+   */
+  public void start() {
+    if (!started) {
+      metadataStore.init();
+      started = true;
+    } else {
+      LOG.warn("StartpointManager already started");
+    }
+  }
+
+  /**
+   * Writes a {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}.
+   * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
+   * @param startpoint Reference to a Startpoint object.
+   */
+  public void writeStartpoint(SystemStreamPartition ssp, Startpoint startpoint) {
+    writeStartpoint(ssp, null, startpoint);
+  }
+
+  /**
+   * Writes a {@link Startpoint} that defines the start position for a {@link SystemStreamPartition} and {@link TaskName}.
+   * @param ssp The {@link SystemStreamPartition} to map the {@link Startpoint} against.
+   * @param taskName The {@link TaskName} to map the {@link Startpoint} against.
+   * @param startpoint Reference to a Startpoint object.
+   */
+  public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName, Startpoint startpoint) {
+    Preconditions.checkState(started, "Underlying metadata store not available");
+    Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
+    Preconditions.checkNotNull(startpoint, "Startpoint cannot be null");
+
+    try {
+      metadataStore.put(toStoreKey(ssp, taskName), startpointSerde.toBytes(startpoint));
+    } catch (Exception ex) {
+      throw new SamzaException(String.format(
+          "Startpoint for SSP: %s and task: %s may not have been written to the metadata store.", ssp, taskName), ex);
+    }
+  }
+
+  /**
+   * Returns the last {@link Startpoint} that defines the start position for a {@link SystemStreamPartition}.
+   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
+   * @return {@link Startpoint} for the {@link SystemStreamPartition}, or null if it does not exist or if it is too stale
+   */
+  public Startpoint readStartpoint(SystemStreamPartition ssp) {
+    return readStartpoint(ssp, null);
+  }
+
+  /**
+   * Returns the {@link Startpoint} for a {@link SystemStreamPartition} and {@link TaskName}.
+   * @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
+   * @param taskName The {@link TaskName} to fetch the {@link Startpoint} for.
+   * @return {@link Startpoint} for the {@link SystemStreamPartition}, or null if it does not exist or if it is too stale.
+   */
+  public Startpoint readStartpoint(SystemStreamPartition ssp, TaskName taskName) {
+    Preconditions.checkState(started, "Underlying metadata store not available");
+    Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
+
+    byte[] startpointBytes = metadataStore.get(toStoreKey(ssp, taskName));
+
+    if (Objects.nonNull(startpointBytes)) {
+      Startpoint startpoint = startpointSerde.fromBytes(startpointBytes);
+      if (Instant.now().minus(DEFAULT_EXPIRATION_DURATION).isBefore(Instant.ofEpochMilli(startpoint.getCreationTimestamp()))) {
+        return startpoint; // return if deserializable and if not stale
+      }
+      LOG.warn("Stale Startpoint: {} was read. Ignoring.", startpoint);
+    }
+
+    return null;
+  }
+
+  /**
+   * Deletes the {@link Startpoint} for a {@link SystemStreamPartition}
+   * @param ssp The {@link SystemStreamPartition} to delete the {@link Startpoint} for.
+   */
+  public void deleteStartpoint(SystemStreamPartition ssp) {
+    deleteStartpoint(ssp, null);
+  }
+
+  /**
+   * Deletes the {@link Startpoint} for a {@link SystemStreamPartition} and {@link TaskName}.
+   * @param ssp ssp The {@link SystemStreamPartition} to delete the {@link Startpoint} for.
+   * @param taskName ssp The {@link TaskName} to delete the {@link Startpoint} for.
+   */
+  public void deleteStartpoint(SystemStreamPartition ssp, TaskName taskName) {
+    Preconditions.checkState(started, "Underlying metadata store not available");
+    Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");
+
+    metadataStore.delete(toStoreKey(ssp, taskName));
+  }
+
+  /**
+   * For {@link Startpoint}s keyed only by {@link SystemStreamPartition}, this method re-maps the Startpoints from
+   * SystemStreamPartition to SystemStreamPartition+{@link TaskName} for all tasks provided by the {@link JobModel}
+   * This method is not atomic or thread-safe. The intent is for the Samza Processor's coordinator to use this
+   * method to assign the Startpoints to the appropriate tasks.
+   * @param jobModel The {@link JobModel} is used to determine which {@link TaskName} each {@link SystemStreamPartition} maps to.
+   * @return The list of {@link SystemStreamPartition}s that were fanned out to SystemStreamPartition+TaskName.
+   */
+  public Set<SystemStreamPartition> fanOutStartpointsToTasks(JobModel jobModel) {
+    Preconditions.checkState(started, "Underlying metadata store not available");
+    Preconditions.checkNotNull(jobModel, "JobModel cannot be null");
+
+    HashSet<SystemStreamPartition> sspsToDelete = new HashSet<>();
+
+    // Inspect the job model for TaskName-to-SSPs mapping and re-map startpoints from SSP-only keys to SSP+TaskName keys.
+    for (ContainerModel containerModel: jobModel.getContainers().values()) {
+      for (TaskModel taskModel : containerModel.getTasks().values()) {
+        TaskName taskName = taskModel.getTaskName();
+        for (SystemStreamPartition ssp : taskModel.getSystemStreamPartitions()) {
+          Startpoint startpoint = readStartpoint(ssp); // Read SSP-only key
+          if (startpoint == null) {
+            LOG.debug("No Startpoint for SSP: {} in task: {}", ssp, taskName);
+            continue;
+          }
+
+          LOG.info("Grouping Startpoint keyed on SSP: {} to tasks determined by the job model.", ssp);
+          Startpoint startpointForTask = readStartpoint(ssp, taskName);
+          if (startpointForTask == null || startpointForTask.getCreationTimestamp() < startpoint.getCreationTimestamp()) {
+            writeStartpoint(ssp, taskName, startpoint);
+            sspsToDelete.add(ssp); // Mark for deletion
+            LOG.info("Startpoint for SSP: {} remapped with task: {}.", ssp, taskName);
+          } else {
+            LOG.info("Startpoint for SSP: {} and task: {} already exists and will not be overwritten.", ssp, taskName);
+          }
+
+        }
+      }
+    }
+
+    // Delete SSP-only keys
+    sspsToDelete.forEach(ssp -> {
+        deleteStartpoint(ssp);
+        LOG.info("All Startpoints for SSP: {} have been grouped to the appropriate tasks and the SSP was deleted.");
+      });
+
+    return ImmutableSet.copyOf(sspsToDelete);
+  }
+
+  /**
+   * Relinquish resources held by the underlying {@link MetadataStore}
+   */
+  public void stop() {
+    if (started) {
+      metadataStore.close();
+      started = false;
+    } else {
+      LOG.warn("StartpointManager already stopped.");
+    }
+  }
+
+  @VisibleForTesting
+  MetadataStore getMetadataStore() {
+    return metadataStore;
+  }
+
+  private static String toStoreKey(SystemStreamPartition ssp, TaskName taskName) {
+    return new String(new JsonSerdeV2<>().toBytes(new StartpointKey(ssp, taskName)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-core/src/main/java/org/apache/samza/startpoint/StartpointSerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointSerde.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointSerde.java
new file mode 100644
index 0000000..361aef3
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointSerde.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.startpoint;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import org.apache.samza.SamzaException;
+import org.apache.samza.serializers.Serde;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+class StartpointSerde implements Serde<Startpoint> {
+  private static final String STARTPOINT_CLASS = "startpointClass";
+  private static final String STARTPOINT_OBJ = "startpointObj";
+
+  private final ObjectMapper mapper = new ObjectMapper();
+
+  @Override
+  public Startpoint fromBytes(byte[] bytes) {
+    try {
+      LinkedHashMap<String, String> deserialized = mapper.readValue(bytes, LinkedHashMap.class);
+      Class<? extends Startpoint> startpointClass =
+          (Class<? extends Startpoint>) Class.forName(deserialized.get(STARTPOINT_CLASS));
+      return mapper.readValue(deserialized.get(STARTPOINT_OBJ), startpointClass);
+    } catch (Exception e) {
+      throw new SamzaException(String.format("Exception in de-serializing startpoint bytes: %s",
+          Arrays.toString(bytes)), e);
+    }
+  }
+
+  @Override
+  public byte[] toBytes(Startpoint startpoint) {
+    try {
+      ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
+      mapBuilder.put(STARTPOINT_CLASS, startpoint.getClass().getCanonicalName());
+      mapBuilder.put(STARTPOINT_OBJ, mapper.writeValueAsString(startpoint));
+      return mapper.writeValueAsBytes(mapBuilder.build());
+    } catch (Exception e) {
+      throw new SamzaException(String.format("Exception in serializing: %s", startpoint), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 18d2b93..d47458d 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -19,20 +19,17 @@
 
 package org.apache.samza.checkpoint
 
-
-
 import java.util.HashMap
 import java.util.concurrent.ConcurrentHashMap
 
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.SystemAdmins
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, JavaSystemConfig}
+import org.apache.samza.annotation.InterfaceStability
 import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.{Config, JavaSystemConfig}
 import org.apache.samza.container.TaskName
+import org.apache.samza.startpoint.{Startpoint, StartpointManager}
 import org.apache.samza.system.SystemStreamMetadata.OffsetType
-import org.apache.samza.system.{SystemAdmin, SystemStream, SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.system._
 import org.apache.samza.util.Logging
 
 import scala.collection.JavaConverters._
@@ -76,6 +73,7 @@ object OffsetManager extends Logging {
     systemStreamMetadata: Map[SystemStream, SystemStreamMetadata],
     config: Config,
     checkpointManager: CheckpointManager = null,
+    startpointManager: StartpointManager = null,
     systemAdmins: SystemAdmins = SystemAdmins.empty(),
     checkpointListeners: Map[String, CheckpointListener] = Map(),
     offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics) = {
@@ -104,7 +102,7 @@ object OffsetManager extends Logging {
           // Build OffsetSetting so we can create a map for OffsetManager.
           (systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset))
       }.toMap
-    new OffsetManager(offsetSettings, checkpointManager, systemAdmins, checkpointListeners, offsetManagerMetrics)
+    new OffsetManager(offsetSettings, checkpointManager, startpointManager, systemAdmins, checkpointListeners, offsetManagerMetrics)
   }
 }
 
@@ -139,6 +137,11 @@ class OffsetManager(
   val checkpointManager: CheckpointManager = null,
 
   /**
+    * Optional startpoint manager for overrided offsets.
+    */
+  val startpointManager: StartpointManager = null,
+
+  /**
    * SystemAdmins that are used to get next offsets from last checkpointed
    * offsets. Map is from system name to SystemAdmin class for the system.
    */
@@ -167,6 +170,11 @@ class OffsetManager(
   var startingOffsets = Map[TaskName, Map[SystemStreamPartition, String]]()
 
   /**
+    * Startpoints loaded for each SystemStreamPartition.
+    */
+  var startpoints = Map[TaskName, Map[SystemStreamPartition, Startpoint]]()
+
+  /**
    * The set of system stream partitions that have been registered with the
    * OffsetManager, grouped by the taskName they belong to. These are the SSPs
    * that will be tracked within the offset manager.
@@ -184,6 +192,7 @@ class OffsetManager(
     loadOffsetsFromCheckpointManager
     stripResetStreams
     loadStartingOffsets
+    loadStartpoints
     loadDefaults
 
     info("Successfully loaded last processed offsets: %s" format lastProcessedOffsets)
@@ -223,6 +232,34 @@ class OffsetManager(
   }
 
   /**
+    * Gets Startpoints that are loaded into the OffsetManager after OffsetManager#start is called.
+    */
+  @InterfaceStability.Unstable
+  def getStartpoint(taskName: TaskName, systemStreamPartition: SystemStreamPartition): Option[Startpoint] = {
+    // Startpoints already loaded when this method is available for use. Similar to getStartingOffset above.
+    startpoints.get(taskName) match {
+      case Some(sspToStartpoint) => sspToStartpoint.get(systemStreamPartition)
+      case None => None
+    }
+  }
+
+  /**
+    * Sets the Startpoint into the OffsetManager. Does not write directly to the StartpointManager. This is to be
+    * used by the TaskContext only for setting Startpoints during processor initialization, similar to the
+    * OffsetManager#setStartingOffset method. To write Startpoints to the metadata store, use the StartpointManager.
+    */
+  @InterfaceStability.Unstable
+  def setStartpoint(taskName: TaskName, ssp: SystemStreamPartition, startpoint: Startpoint) = {
+    // Startpoints already loaded when this method is available for use. Similar to setStartingOffset above.
+    startpoints += {
+      startpoints.get(taskName) match {
+        case Some(sspToStartpont) => taskName -> (sspToStartpont + (ssp -> startpoint))
+        case None => taskName -> new ConcurrentHashMap[SystemStreamPartition, Startpoint]() { put(ssp, startpoint) }.asScala
+      }
+    }
+  }
+
+  /**
     * Gets a snapshot of all the current offsets for the specified task. This is useful to
     * ensure there are no concurrent updates to the offsets between when this method is
     * invoked and the corresponding call to [[OffsetManager.writeCheckpoint()]]
@@ -250,7 +287,7 @@ class OffsetManager(
   }
 
   /**
-    * Write the specified checkpoint for the given task.
+    * Write the specified checkpoint for the given task and delete the corresponding startpoint, if available.
     */
   def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
     if (checkpoint != null && (checkpointManager != null || checkpointListeners.nonEmpty)) {
@@ -274,6 +311,25 @@ class OffsetManager(
         }
       }
     }
+
+    // delete corresponding startpoints after checkpoint is supposed to be committed
+    if (startpointManager != null && startpoints.contains(taskName)) {
+      val sspStartpoints = checkpoint.getOffsets.keySet.asScala
+        .intersect(startpoints.getOrElse(taskName, Map.empty[SystemStreamPartition, Startpoint]).keySet)
+
+      // delete startpoints for this task and the intersection of SSPs between checkpoint and startpoint.
+      sspStartpoints.foreach(ssp => {
+        startpointManager.deleteStartpoint(ssp, taskName)
+        info("Deleted startpoint for SSP: %s and task: %s" format (ssp, taskName))
+      })
+      startpoints -= taskName
+
+      if (startpoints.isEmpty) {
+        // Stop startpoint manager after last startpoint is deleted
+        startpointManager.stop()
+        info("No more startpoints left to consume. Stopped the startpoint manager.")
+      }
+    }
   }
 
   def stop {
@@ -284,6 +340,14 @@ class OffsetManager(
     } else {
       debug("Skipping checkpoint manager shutdown because no checkpoint manager is defined.")
     }
+
+    if (startpointManager != null) {
+      debug("Ensuring startpoint manager has shut down.")
+
+      startpointManager.stop
+    } else {
+      debug("Skipping startpoint manager shutdown because no checkpoint manager is defined.")
+    }
   }
 
   /**
@@ -405,6 +469,38 @@ class OffsetManager(
   }
 
   /**
+    * Load Startpoints for each SystemStreamPartition+TaskName
+    */
+  private def loadStartpoints: Unit = {
+    if (startpointManager != null) {
+      info("Starting startpoint manager.")
+      startpointManager.start
+      val taskNameToSSPs: Map[TaskName, Set[SystemStreamPartition]] = systemStreamPartitions
+
+      taskNameToSSPs.foreach {
+        case (taskName, systemStreamPartitionSet) => {
+          val sspToStartpoint = systemStreamPartitionSet
+            .map(ssp => (ssp, startpointManager.readStartpoint(ssp, taskName)))
+            .filter(_._2 != null)
+            .toMap
+
+          if (!sspToStartpoint.isEmpty) {
+            startpoints += taskName -> sspToStartpoint
+          }
+        }
+      }
+
+      if (startpoints.isEmpty) {
+        info("No startpoints to consume. Stopping startpoint manager.")
+        startpointManager.stop
+      } else {
+        startpoints
+          .foreach(taskMap => taskMap._2
+            .foreach(sspMap => info("Loaded startpoint: %s for SSP: %s and task: %s" format (sspMap._2, sspMap._1, taskMap._1))))
+      }
+    }
+  }
+  /**
    * Use defaultOffsets to get a next offset for every SystemStreamPartition
    * that was registered, but has no offset.
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 1b55fad..2120c49 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -93,6 +93,7 @@ object JobConfig {
   val JOB_SECURITY_MANAGER_FACTORY = "job.security.manager.factory"
 
   val METADATA_STORE_FACTORY = "metadata.store.factory"
+  val STARTPOINT_METADATA_STORE_FACTORY = "startpoint.metadata.store.factory"
   val LOCATION_ID_PROVIDER_FACTORY = "locationid.provider.factory"
 
   // Processor Config Constants
@@ -252,6 +253,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
 
   def getMetadataStoreFactory = getOption(JobConfig.METADATA_STORE_FACTORY).getOrElse(classOf[CoordinatorStreamMetadataStoreFactory].getCanonicalName)
 
+  def getStartpointMetadataStoreFactory = getOption(JobConfig.STARTPOINT_METADATA_STORE_FACTORY).getOrElse(getMetadataStoreFactory)
+
   def getDiagnosticsEnabled = { getBoolean(JobConfig.JOB_DIAGNOSTICS_ENABLED, false) }
 
   def getDiagnosticsAppenderClass = {

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 4aa2ed8..e38a451 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -45,9 +45,11 @@ import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, DiskSpaceMonitor
 import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatisticsMonitor}
 import org.apache.samza.context._
 import org.apache.samza.job.model.{ContainerModel, JobModel}
+import org.apache.samza.metadatastore.MetadataStoreFactory
 import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter}
 import org.apache.samza.serializers._
 import org.apache.samza.serializers.model.SamzaObjectMapper
+import org.apache.samza.startpoint.StartpointManager
 import org.apache.samza.storage._
 import org.apache.samza.system._
 import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, RoundRobinChooserFactory}
@@ -418,12 +420,26 @@ object SamzaContainer extends Logging {
       .orNull
     info("Got checkpoint manager: %s" format checkpointManager)
 
+    val metadataStoreFactory = Option(config.getStartpointMetadataStoreFactory)
+      .map(Util.getObj(_, classOf[MetadataStoreFactory]))
+      .orNull
+    val startpointManager = {
+      try {
+        Option(new StartpointManager(metadataStoreFactory, config, samzaContainerMetrics.registry))
+      } catch {
+        case e: Exception => {
+          error("Unable to get an instance of the StartpointManager. Continuing without one.", e)
+          None
+        }
+      }
+    }
+
     // create a map of consumers with callbacks to pass to the OffsetManager
     val checkpointListeners = consumers.filter(_._2.isInstanceOf[CheckpointListener])
       .map { case (system, consumer) => (system, consumer.asInstanceOf[CheckpointListener])}
     info("Got checkpointListeners : %s" format checkpointListeners)
 
-    val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, checkpointListeners, offsetManagerMetrics)
+    val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, startpointManager.getOrElse(null), systemAdmins, checkpointListeners, offsetManagerMetrics)
     info("Got offset manager: %s" format offsetManager)
 
     val dropDeserializationError = config.getDropDeserializationErrors

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointKey.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointKey.java b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointKey.java
new file mode 100644
index 0000000..1bd7d8c
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointKey.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.startpoint;
+
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestStartpointKey {
+  @Test
+  public void testStartpointKey() {
+    SystemStreamPartition ssp1 = new SystemStreamPartition("system", "stream", new Partition(2));
+    SystemStreamPartition ssp2 = new SystemStreamPartition("system", "stream", new Partition(3));
+
+    StartpointKey startpointKey1 = new StartpointKey(ssp1);
+    StartpointKey startpointKey2 = new StartpointKey(ssp1);
+    StartpointKey startpointKeyWithDifferentSSP = new StartpointKey(ssp2);
+    StartpointKey startpointKeyWithTask1 = new StartpointKey(ssp1, new TaskName("t1"));
+    StartpointKey startpointKeyWithTask2 = new StartpointKey(ssp1, new TaskName("t1"));
+    StartpointKey startpointKeyWithDifferentTask = new StartpointKey(ssp1, new TaskName("t2"));
+
+    Assert.assertEquals(startpointKey1, startpointKey2);
+    Assert.assertEquals(new String(new JsonSerdeV2<>().toBytes(startpointKey1)),
+        new String(new JsonSerdeV2<>().toBytes(startpointKey2)));
+    Assert.assertEquals(startpointKeyWithTask1, startpointKeyWithTask2);
+    Assert.assertEquals(new String(new JsonSerdeV2<>().toBytes(startpointKeyWithTask1)),
+        new String(new JsonSerdeV2<>().toBytes(startpointKeyWithTask2)));
+
+    Assert.assertNotEquals(startpointKey1, startpointKeyWithTask1);
+    Assert.assertNotEquals(new String(new JsonSerdeV2<>().toBytes(startpointKey1)),
+        new String(new JsonSerdeV2<>().toBytes(startpointKeyWithTask1)));
+
+    Assert.assertNotEquals(startpointKey1, startpointKeyWithDifferentSSP);
+    Assert.assertNotEquals(new String(new JsonSerdeV2<>().toBytes(startpointKey1)),
+        new String(new JsonSerdeV2<>().toBytes(startpointKeyWithDifferentSSP)));
+    Assert.assertNotEquals(startpointKeyWithTask1, startpointKeyWithDifferentTask);
+    Assert.assertNotEquals(new String(new JsonSerdeV2<>().toBytes(startpointKeyWithTask1)),
+        new String(new JsonSerdeV2<>().toBytes(startpointKeyWithDifferentTask)));
+
+    Assert.assertNotEquals(startpointKeyWithTask1, startpointKeyWithDifferentTask);
+    Assert.assertNotEquals(new String(new JsonSerdeV2<>().toBytes(startpointKeyWithTask1)),
+        new String(new JsonSerdeV2<>().toBytes(startpointKeyWithDifferentTask)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
new file mode 100644
index 0000000..cb20148
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.startpoint;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metadatastore.InMemoryMetadataStore;
+import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestStartpointManager {
+
+  @Test
+  public void testDefaultMetadataStore() {
+    MapConfig config = new MapConfig();
+    StartpointManager startpointManager = new StartpointManager(new InMemoryMetadataStoreFactory(), config, new NoOpMetricsRegistry());
+    Assert.assertNotNull(startpointManager);
+    Assert.assertEquals(InMemoryMetadataStore.class, startpointManager.getMetadataStore().getClass());
+  }
+
+  @Test
+  public void testNoLongerUsableAfterStop() {
+    MapConfig config = new MapConfig();
+    StartpointManager startpointManager = new StartpointManager(new InMemoryMetadataStoreFactory(), config, new NoOpMetricsRegistry());
+    startpointManager.start();
+    SystemStreamPartition ssp =
+        new SystemStreamPartition("mockSystem", "mockStream", new Partition(2));
+    TaskName taskName = new TaskName("MockTask");
+    Startpoint startpoint = new StartpointOldest();
+
+    startpointManager.stop();
+
+    try {
+      startpointManager.writeStartpoint(ssp, startpoint);
+      Assert.fail("Expected precondition exception.");
+    } catch (IllegalStateException ex) { }
+
+    try {
+      startpointManager.writeStartpoint(ssp, taskName, startpoint);
+      Assert.fail("Expected precondition exception.");
+    } catch (IllegalStateException ex) { }
+
+    try {
+      startpointManager.readStartpoint(ssp);
+      Assert.fail("Expected precondition exception.");
+    } catch (IllegalStateException ex) { }
+
+    try {
+      startpointManager.readStartpoint(ssp, taskName);
+      Assert.fail("Expected precondition exception.");
+    } catch (IllegalStateException ex) { }
+
+    try {
+      startpointManager.deleteStartpoint(ssp);
+      Assert.fail("Expected precondition exception.");
+    } catch (IllegalStateException ex) { }
+
+    try {
+      startpointManager.deleteStartpoint(ssp, taskName);
+      Assert.fail("Expected precondition exception.");
+    } catch (IllegalStateException ex) { }
+
+    try {
+      startpointManager.fanOutStartpointsToTasks(new JobModel(new MapConfig(), new HashMap<>()));
+      Assert.fail("Expected precondition exception.");
+    } catch (IllegalStateException ex) { }
+  }
+
+  @Test
+  public void testBasics() {
+    StartpointManager startpointManager = new StartpointManager(new InMemoryMetadataStoreFactory(), new MapConfig(), new NoOpMetricsRegistry());
+    startpointManager.start();
+    SystemStreamPartition ssp =
+        new SystemStreamPartition("mockSystem", "mockStream", new Partition(2));
+    TaskName taskName = new TaskName("MockTask");
+    StartpointTimestamp startpoint1 = new StartpointTimestamp(111111111L);
+    StartpointTimestamp startpoint2 = new StartpointTimestamp(222222222L);
+    StartpointSpecific startpoint3 = new StartpointSpecific("1");
+    StartpointSpecific startpoint4 = new StartpointSpecific("2");
+
+    // Test createdTimestamp field is not null by default
+    Assert.assertNotNull(startpoint1.getCreationTimestamp());
+    Assert.assertNotNull(startpoint2.getCreationTimestamp());
+    Assert.assertNotNull(startpoint3.getCreationTimestamp());
+    Assert.assertNotNull(startpoint4.getCreationTimestamp());
+
+    // Test reads on non-existent keys
+    Assert.assertNull(startpointManager.readStartpoint(ssp));
+    Assert.assertNull(startpointManager.readStartpoint(ssp, taskName));
+
+    // Test writes
+    Startpoint startpointFromStore;
+    startpointManager.writeStartpoint(ssp, startpoint1);
+    startpointManager.writeStartpoint(ssp, taskName, startpoint2);
+    startpointFromStore = startpointManager.readStartpoint(ssp);
+    Assert.assertEquals(StartpointTimestamp.class, startpointFromStore.getClass());
+    Assert.assertEquals(startpoint1.getTimestampOffset(), ((StartpointTimestamp) startpointFromStore).getTimestampOffset());
+    Assert.assertTrue(startpointFromStore.getCreationTimestamp() <= Instant.now().toEpochMilli());
+    startpointFromStore = startpointManager.readStartpoint(ssp, taskName);
+    Assert.assertEquals(StartpointTimestamp.class, startpointFromStore.getClass());
+    Assert.assertEquals(startpoint2.getTimestampOffset(), ((StartpointTimestamp) startpointFromStore).getTimestampOffset());
+    Assert.assertTrue(startpointFromStore.getCreationTimestamp() <= Instant.now().toEpochMilli());
+
+    // Test overwrites
+    startpointManager.writeStartpoint(ssp, startpoint3);
+    startpointManager.writeStartpoint(ssp, taskName, startpoint4);
+    startpointFromStore = startpointManager.readStartpoint(ssp);
+    Assert.assertEquals(StartpointSpecific.class, startpointFromStore.getClass());
+    Assert.assertEquals(startpoint3.getSpecificOffset(), ((StartpointSpecific) startpointFromStore).getSpecificOffset());
+    Assert.assertTrue(startpointFromStore.getCreationTimestamp() <= Instant.now().toEpochMilli());
+    startpointFromStore = startpointManager.readStartpoint(ssp, taskName);
+    Assert.assertEquals(StartpointSpecific.class, startpointFromStore.getClass());
+    Assert.assertEquals(startpoint4.getSpecificOffset(), ((StartpointSpecific) startpointFromStore).getSpecificOffset());
+    Assert.assertTrue(startpointFromStore.getCreationTimestamp() <= Instant.now().toEpochMilli());
+
+    // Test deletes on SSP keys does not affect SSP+TaskName keys
+    startpointManager.deleteStartpoint(ssp);
+    Assert.assertNull(startpointManager.readStartpoint(ssp));
+    Assert.assertNotNull(startpointManager.readStartpoint(ssp, taskName));
+
+    // Test deletes on SSP+TaskName keys does not affect SSP keys
+    startpointManager.writeStartpoint(ssp, startpoint3);
+    startpointManager.deleteStartpoint(ssp, taskName);
+    Assert.assertNull(startpointManager.readStartpoint(ssp, taskName));
+    Assert.assertNotNull(startpointManager.readStartpoint(ssp));
+
+    startpointManager.stop();
+  }
+
+  @Test
+  public void testStaleStartpoints() throws InterruptedException {
+    StartpointManager startpointManager = new StartpointManager(new InMemoryMetadataStoreFactory(), new MapConfig(), new NoOpMetricsRegistry());
+    SystemStreamPartition ssp =
+        new SystemStreamPartition("mockSystem", "mockStream", new Partition(2));
+    TaskName taskName = new TaskName("MockTask");
+
+    startpointManager.start();
+    long staleTimestamp = Instant.now().toEpochMilli() - StartpointManager.DEFAULT_EXPIRATION_DURATION.toMillis() - 2;
+    MockStartpointCustom startpoint = new MockStartpointCustom("das boot", 42, staleTimestamp);
+    startpointManager.writeStartpoint(ssp, startpoint);
+    Assert.assertNull(startpointManager.readStartpoint(ssp));
+
+    startpointManager.writeStartpoint(ssp, taskName, startpoint);
+    Assert.assertNull(startpointManager.readStartpoint(ssp, taskName));
+  }
+
+  @Test
+  public void testGroupStartpointsPerTask() {
+    MapConfig config = new MapConfig();
+    StartpointManager startpointManager = new StartpointManager(new InMemoryMetadataStoreFactory(), new MapConfig(), new NoOpMetricsRegistry());
+    startpointManager.start();
+    SystemStreamPartition sspBroadcast =
+        new SystemStreamPartition("mockSystem1", "mockStream1", new Partition(2));
+    SystemStreamPartition sspBroadcast2 =
+        new SystemStreamPartition("mockSystem3", "mockStream3", new Partition(4));
+    SystemStreamPartition sspSingle =
+        new SystemStreamPartition("mockSystem2", "mockStream2", new Partition(3));
+
+    List<TaskName> tasks =
+        ImmutableList.of(new TaskName("t0"), new TaskName("t1"), new TaskName("t2"), new TaskName("t3"), new TaskName("t4"), new TaskName("t5"));
+
+    Map<TaskName, TaskModel> taskModelMap = tasks.stream()
+        .map(task -> new TaskModel(task, task.getTaskName().equals("t1") ? ImmutableSet.of(sspBroadcast, sspBroadcast2, sspSingle) : ImmutableSet.of(sspBroadcast, sspBroadcast2), new Partition(1)))
+        .collect(Collectors.toMap(taskModel -> taskModel.getTaskName(), taskModel -> taskModel));
+    ContainerModel containerModel = new ContainerModel("container 0", taskModelMap);
+    JobModel jobModel = new JobModel(config, ImmutableMap.of(containerModel.getId(), containerModel));
+
+    StartpointSpecific startpoint42 = new StartpointSpecific("42");
+
+    startpointManager.writeStartpoint(sspBroadcast, startpoint42);
+    startpointManager.writeStartpoint(sspSingle, startpoint42);
+
+    // startpoint42 should remap with key sspBroadcast to all tasks + sspBroadcast
+    Set<SystemStreamPartition> systemStreamPartitions = startpointManager.fanOutStartpointsToTasks(jobModel);
+    Assert.assertEquals(2, systemStreamPartitions.size());
+    Assert.assertTrue(systemStreamPartitions.containsAll(ImmutableSet.of(sspBroadcast, sspSingle)));
+
+    for (TaskName taskName : tasks) {
+      // startpoint42 should be mapped to all tasks for sspBroadcast
+      Startpoint startpointFromStore = startpointManager.readStartpoint(sspBroadcast, taskName);
+      Assert.assertEquals(StartpointSpecific.class, startpointFromStore.getClass());
+      Assert.assertEquals(startpoint42.getSpecificOffset(), ((StartpointSpecific) startpointFromStore).getSpecificOffset());
+
+      // startpoint 42 should be mapped only to task "t1" for sspSingle
+      startpointFromStore = startpointManager.readStartpoint(sspSingle, taskName);
+      if (taskName.getTaskName().equals("t1")) {
+        Assert.assertEquals(StartpointSpecific.class, startpointFromStore.getClass());
+        Assert.assertEquals(startpoint42.getSpecificOffset(), ((StartpointSpecific) startpointFromStore).getSpecificOffset());
+      } else {
+        Assert.assertNull(startpointFromStore);
+      }
+    }
+    Assert.assertNull(startpointManager.readStartpoint(sspBroadcast));
+    Assert.assertNull(startpointManager.readStartpoint(sspSingle));
+
+    // Test startpoints that were explicit assigned to an sspBroadcast2+TaskName will not be overwritten from fanOutStartpointsToTasks
+
+    StartpointSpecific startpoint1024 = new StartpointSpecific("1024");
+
+    startpointManager.writeStartpoint(sspBroadcast2, startpoint42);
+    startpointManager.writeStartpoint(sspBroadcast2, tasks.get(1), startpoint1024);
+    startpointManager.writeStartpoint(sspBroadcast2, tasks.get(3), startpoint1024);
+
+    Set<SystemStreamPartition> sspsDeleted = startpointManager.fanOutStartpointsToTasks(jobModel);
+    Assert.assertEquals(1, sspsDeleted.size());
+    Assert.assertTrue(sspsDeleted.contains(sspBroadcast2));
+
+    StartpointSpecific startpointFromStore = (StartpointSpecific) startpointManager.readStartpoint(sspBroadcast2, tasks.get(0));
+    Assert.assertEquals(startpoint42.getSpecificOffset(), startpointFromStore.getSpecificOffset());
+    startpointFromStore = (StartpointSpecific) startpointManager.readStartpoint(sspBroadcast2, tasks.get(1));
+    Assert.assertEquals(startpoint1024.getSpecificOffset(), startpointFromStore.getSpecificOffset());
+    startpointFromStore = (StartpointSpecific) startpointManager.readStartpoint(sspBroadcast2, tasks.get(2));
+    Assert.assertEquals(startpoint42.getSpecificOffset(), startpointFromStore.getSpecificOffset());
+    startpointFromStore = (StartpointSpecific) startpointManager.readStartpoint(sspBroadcast2, tasks.get(3));
+    Assert.assertEquals(startpoint1024.getSpecificOffset(), startpointFromStore.getSpecificOffset());
+    startpointFromStore = (StartpointSpecific) startpointManager.readStartpoint(sspBroadcast2, tasks.get(4));
+    Assert.assertEquals(startpoint42.getSpecificOffset(), startpointFromStore.getSpecificOffset());
+    startpointFromStore = (StartpointSpecific) startpointManager.readStartpoint(sspBroadcast2, tasks.get(5));
+    Assert.assertEquals(startpoint42.getSpecificOffset(), startpointFromStore.getSpecificOffset());
+    Assert.assertNull(startpointManager.readStartpoint(sspBroadcast2));
+
+    startpointManager.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/0937c1dc/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointSerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointSerde.java b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointSerde.java
new file mode 100644
index 0000000..c939680
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointSerde.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.startpoint;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestStartpointSerde {
+  private final StartpointSerde startpointSerde = new StartpointSerde();
+
+  @Test
+  public void testStartpointSpecificSerde() {
+    StartpointSpecific startpointSpecific = new StartpointSpecific("42");
+    Startpoint startpointFromSerde = startpointSerde.fromBytes(startpointSerde.toBytes(startpointSpecific));
+
+    Assert.assertEquals(startpointSpecific.getClass(), startpointFromSerde.getClass());
+    Assert.assertEquals(startpointSpecific.getCreationTimestamp(), startpointFromSerde.getCreationTimestamp());
+    Assert.assertEquals(startpointSpecific.getSpecificOffset(), ((StartpointSpecific) startpointFromSerde).getSpecificOffset());
+  }
+
+  @Test
+  public void testStartpointTimestampSerde() {
+    StartpointTimestamp startpointTimestamp = new StartpointTimestamp(123456L);
+    Startpoint startpointFromSerde = startpointSerde.fromBytes(startpointSerde.toBytes(startpointTimestamp));
+
+    Assert.assertEquals(startpointTimestamp.getClass(), startpointFromSerde.getClass());
+    Assert.assertEquals(startpointTimestamp.getCreationTimestamp(), startpointFromSerde.getCreationTimestamp());
+    Assert.assertEquals(startpointTimestamp.getTimestampOffset(), ((StartpointTimestamp) startpointFromSerde).getTimestampOffset());
+  }
+
+  @Test
+  public void testStartpointEarliestSerde() {
+    StartpointOldest startpointOldest = new StartpointOldest();
+    Startpoint startpointFromSerde = startpointSerde.fromBytes(startpointSerde.toBytes(startpointOldest));
+
+    Assert.assertEquals(startpointOldest.getClass(), startpointFromSerde.getClass());
+    Assert.assertEquals(startpointOldest.getCreationTimestamp(), startpointFromSerde.getCreationTimestamp());
+  }
+
+  @Test
+  public void testStartpointLatestSerde() {
+    StartpointUpcoming startpointUpcoming = new StartpointUpcoming();
+    Startpoint startpointFromSerde = startpointSerde.fromBytes(startpointSerde.toBytes(startpointUpcoming));
+
+    Assert.assertEquals(startpointUpcoming.getClass(), startpointFromSerde.getClass());
+    Assert.assertEquals(startpointUpcoming.getCreationTimestamp(), startpointFromSerde.getCreationTimestamp());
+  }
+
+  @Test
+  public void testStartpointCustomSerde() {
+    MockStartpointCustom startpointCustom = new MockStartpointCustom("das boot", 42);
+    Startpoint startpointFromSerde = startpointSerde.fromBytes(startpointSerde.toBytes(startpointCustom));
+
+    Assert.assertEquals(startpointCustom.getClass(), startpointFromSerde.getClass());
+    Assert.assertEquals(startpointCustom.getCreationTimestamp(), startpointFromSerde.getCreationTimestamp());
+    Assert.assertEquals(startpointCustom.getTestInfo1(), ((MockStartpointCustom) startpointFromSerde).getTestInfo1());
+    Assert.assertEquals(startpointCustom.getTestInfo2(), ((MockStartpointCustom) startpointFromSerde).getTestInfo2());
+  }
+}