You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/05/08 16:50:06 UTC

[samza] branch master updated: SAMZA-2179: Move the StartpointVisitor abstraction to SystemAdmin interface. (#1016)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d07cfd  SAMZA-2179: Move the StartpointVisitor abstraction to SystemAdmin interface. (#1016)
2d07cfd is described below

commit 2d07cfdaf36a33aaeed08d13cdb48c28b4f30bc8
Author: shanthoosh <sp...@usc.edu>
AuthorDate: Wed May 8 09:50:00 2019 -0700

    SAMZA-2179: Move the StartpointVisitor abstraction to SystemAdmin interface. (#1016)
    
    * Move StartpointVisitor to SystemAdmin.
---
 .../org/apache/samza/startpoint/Startpoint.java    |  15 +-
 .../apache/samza/startpoint/StartpointCustom.java  |  43 ----
 .../apache/samza/startpoint/StartpointOldest.java  |   6 +-
 .../samza/startpoint/StartpointSpecific.java       |   6 +-
 .../samza/startpoint/StartpointTimestamp.java      |  13 +-
 .../samza/startpoint/StartpointUpcoming.java       |   6 +-
 .../apache/samza/startpoint/StartpointVisitor.java |  56 +++--
 .../java/org/apache/samza/system/SystemAdmin.java  |  11 +-
 .../samza/startpoint/MockStartpointCustom.java     |  50 ----
 .../apache/samza/startpoint/TestStartpoint.java    |  32 +--
 .../samza/startpoint/TestStartpointManager.java    |  31 +--
 .../samza/startpoint/TestStartpointSerde.java      |  12 -
 .../samza/system/kafka/KafkaSystemAdmin.java       | 251 +++++++++++++++------
 .../samza/system/kafka/KafkaSystemConsumer.java    | 199 ++++------------
 .../scala/org/apache/samza/util/KafkaUtil.scala    |  18 +-
 .../system/kafka/TestKafkaSystemAdminJava.java     | 102 ++++++++-
 .../system/kafka/TestKafkaSystemConsumer.java      | 199 +++-------------
 17 files changed, 460 insertions(+), 590 deletions(-)

diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/Startpoint.java b/samza-api/src/main/java/org/apache/samza/startpoint/Startpoint.java
index a65dacf..fcca792 100644
--- a/samza-api/src/main/java/org/apache/samza/startpoint/Startpoint.java
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/Startpoint.java
@@ -22,13 +22,11 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import java.time.Instant;
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.system.SystemStreamPartition;
-
 
 /**
  * Startpoint represents a position in a stream partition.
  */
-@InterfaceStability.Unstable
+@InterfaceStability.Evolving
 public abstract class Startpoint {
 
   private final long creationTimestamp;
@@ -50,12 +48,13 @@ public abstract class Startpoint {
   }
 
   /**
-   * Apply the visitor {@link StartpointVisitor}'s register methods to the instance of this {@link Startpoint}
-   * class.
-   * @param systemStreamPartition The {@link SystemStreamPartition} needed to register with the {@link StartpointVisitor}
-   * @param startpointVisitor The visitor to register with.
+   * Applies the {@link StartpointVisitor}'s visit methods to the {@link Startpoint}
+   * and returns the result of that operation.
+   * @param input the metadata associated with the startpoint.
+   * @param startpointVisitor the visitor of the startpoint.
+   * @return the result of applying the visitor on startpoint.
    */
-  public abstract void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor);
+  public abstract <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor);
 
   @Override
   public String toString() {
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointCustom.java b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointCustom.java
deleted file mode 100644
index a52c974..0000000
--- a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointCustom.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.startpoint;
-
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * A {@link Startpoint} that represents a custom startpoint. This is for systems that have a non-generic option
- * for setting offsets. Startpoints are serialized to JSON in the {@link org.apache.samza.metadatastore.MetadataStore}
- * and it is recommended to maintain the subclass of this {@link StartpointCustom} as a simple POJO.
- */
-public abstract class StartpointCustom extends Startpoint {
-
-  StartpointCustom() {
-    super();
-  }
-
-  StartpointCustom(long creationTimestamp) {
-    super(creationTimestamp);
-  }
-
-  @Override
-  public void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor) {
-    startpointVisitor.visit(systemStreamPartition, this);
-  }
-}
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointOldest.java b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointOldest.java
index 53872db..6f98723 100644
--- a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointOldest.java
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointOldest.java
@@ -19,8 +19,6 @@
 package org.apache.samza.startpoint;
 
 import com.google.common.base.MoreObjects;
-import org.apache.samza.system.SystemStreamPartition;
-
 
 /**
  * A {@link Startpoint} that represents the earliest offset in a stream partition.
@@ -35,8 +33,8 @@ public final class StartpointOldest extends Startpoint {
   }
 
   @Override
-  public void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor) {
-    startpointVisitor.visit(systemStreamPartition, this);
+  public <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor) {
+    return startpointVisitor.visit(input, this);
   }
 
   @Override
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointSpecific.java b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointSpecific.java
index b544489..c56d8f0 100644
--- a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointSpecific.java
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointSpecific.java
@@ -20,8 +20,6 @@ package org.apache.samza.startpoint;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
-import org.apache.samza.system.SystemStreamPartition;
-
 
 /**
  * A {@link Startpoint} that represents a specific offset in a stream partition.
@@ -53,8 +51,8 @@ public final class StartpointSpecific extends Startpoint {
   }
 
   @Override
-  public void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor) {
-    startpointVisitor.visit(systemStreamPartition, this);
+  public <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor) {
+    return startpointVisitor.visit(input, this);
   }
 
   @Override
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointTimestamp.java b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointTimestamp.java
index 609015c..3792403 100644
--- a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointTimestamp.java
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointTimestamp.java
@@ -18,10 +18,9 @@
  */
 package org.apache.samza.startpoint;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
-import org.apache.samza.system.SystemStreamPartition;
-
 
 /**
  * A {@link Startpoint} that represents a timestamp offset in a stream partition.
@@ -44,6 +43,12 @@ public final class StartpointTimestamp extends Startpoint {
     this.timestampOffset = timestampOffset;
   }
 
+  @VisibleForTesting
+  StartpointTimestamp(Long timestampOffset, Long creationTimeStamp) {
+    super(creationTimeStamp);
+    this.timestampOffset = timestampOffset;
+  }
+
   /**
    * Getter for the timestamp offset.
    * @return the timestamp offset in milliseconds.
@@ -53,8 +58,8 @@ public final class StartpointTimestamp extends Startpoint {
   }
 
   @Override
-  public void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor) {
-    startpointVisitor.visit(systemStreamPartition, this);
+  public <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor) {
+    return startpointVisitor.visit(input, this);
   }
 
   @Override
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointUpcoming.java b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointUpcoming.java
index c477931..26af2ae 100644
--- a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointUpcoming.java
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointUpcoming.java
@@ -19,8 +19,6 @@
 package org.apache.samza.startpoint;
 
 import com.google.common.base.MoreObjects;
-import org.apache.samza.system.SystemStreamPartition;
-
 
 /**
  * A {@link Startpoint} that represents the latest offset in a stream partition.
@@ -35,8 +33,8 @@ public final class StartpointUpcoming extends Startpoint {
   }
 
   @Override
-  public void apply(SystemStreamPartition systemStreamPartition, StartpointVisitor startpointVisitor) {
-    startpointVisitor.visit(systemStreamPartition, this);
+  public <IN, OUT> OUT apply(IN input, StartpointVisitor<IN, OUT> startpointVisitor) {
+    return startpointVisitor.visit(input, this);
   }
 
   @Override
diff --git a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointVisitor.java b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointVisitor.java
index 99151ab..72b1552 100644
--- a/samza-api/src/main/java/org/apache/samza/startpoint/StartpointVisitor.java
+++ b/samza-api/src/main/java/org/apache/samza/startpoint/StartpointVisitor.java
@@ -18,54 +18,52 @@
  */
 package org.apache.samza.startpoint;
 
-import org.apache.samza.system.SystemStreamPartition;
-
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.system.SystemAdmin;
 
 /**
- * Visitor interface for system consumers to implement to support {@link Startpoint}s.
+ * A {@link SystemAdmin} implementation should implement this abstraction to support {@link Startpoint}.
  */
-public interface StartpointVisitor {
+@InterfaceStability.Evolving
+public interface StartpointVisitor<IN, OUT> {
 
   /**
-   * Seek to specific offset represented by {@link StartpointSpecific}
-   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
-   * @param startpointSpecific The {@link Startpoint} that represents the specific offset.
+   * Performs a sequence of operations using the {@link IN} and {@link StartpointSpecific} and returns the result of the execution.
+   * @param input the input metadata about the startpoint.
+   * @param startpointSpecific the {@link Startpoint} that represents the specific offset.
+   * @return the result of executing the operations defined by the visit method.
    */
-  void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific);
+  default OUT visit(IN input, StartpointSpecific startpointSpecific) {
+    throw new UnsupportedOperationException("StartpointSpecific is not supported.");
+  }
 
   /**
-   * Seek to timestamp offset represented by {@link StartpointTimestamp}
-   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
-   * @param startpointTimestamp The {@link Startpoint} that represents the timestamp offset.
+   * Performs a sequence of operations using the {@link IN} and {@link StartpointTimestamp} and returns the result of the execution.
+   * @param input the input metadata about the startpoint.
+   * @param startpointTimestamp the {@link Startpoint} that represents the timestamp.
+   * @return the result of executing the operations defined by the visit method.
    */
-  default void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
+  default OUT visit(IN input, StartpointTimestamp startpointTimestamp) {
     throw new UnsupportedOperationException("StartpointTimestamp is not supported.");
   }
 
   /**
-   * Seek to earliest offset represented by {@link StartpointOldest}
-   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
-   * @param startpointOldest The {@link Startpoint} that represents the earliest offset.
+   * Performs a sequence of operations using the {@link IN} and {@link StartpointOldest} and returns the result of the execution.
+   * @param input the input metadata about the startpoint.
+   * @param startpointOldest the {@link Startpoint} that represents the earliest offset.
+   * @return the result of executing the operations defined by the visit method.
    */
-  default void visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
+  default OUT visit(IN input, StartpointOldest startpointOldest) {
     throw new UnsupportedOperationException("StartpointOldest is not supported.");
   }
 
   /**
-   * Seek to latest offset represented by {@link StartpointUpcoming}
-   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
-   * @param startpointUpcoming The {@link Startpoint} that represents the latest offset.
+   * Performs a sequence of operations using the {@link IN} and {@link StartpointUpcoming} and returns the result of the execution.
+   * @param input the input metadata about the startpoint.
+   * @param startpointUpcoming the {@link Startpoint} that represents the latest offset.
+   * @return the result of executing the operations defined by the visit method.
    */
-  default void visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
+  default OUT visit(IN input, StartpointUpcoming startpointUpcoming) {
     throw new UnsupportedOperationException("StartpointUpcoming is not supported.");
   }
-
-  /**
-   * Bootstrap signal represented by {@link StartpointCustom}
-   * @param systemStreamPartition The {@link SystemStreamPartition} to seek the offset to.
-   * @param startpointCustom The {@link Startpoint} that represents the bootstrap signal.
-   */
-  default void visit(SystemStreamPartition systemStreamPartition, StartpointCustom startpointCustom) {
-    throw new UnsupportedOperationException(String.format("%s is not supported.", startpointCustom.getClass().getSimpleName()));
-  }
 }
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 8201b3d..1cd345e 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -23,7 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
-
+import org.apache.samza.startpoint.Startpoint;
 
 /**
  * Helper interface attached to an underlying system to fetch information about
@@ -164,4 +164,13 @@ public interface SystemAdmin {
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * Resolves the startpoint to a system specific offset.
+   * @param startpoint represents the startpoint.
+   * @param systemStreamPartition represents the system stream partition.
+   * @return the resolved offset.
+   */
+  default String resolveStartpointToOffset(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/samza-api/src/test/java/org/apache/samza/startpoint/MockStartpointCustom.java b/samza-api/src/test/java/org/apache/samza/startpoint/MockStartpointCustom.java
deleted file mode 100644
index de7b18a..0000000
--- a/samza-api/src/test/java/org/apache/samza/startpoint/MockStartpointCustom.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.startpoint;
-
-import java.time.Instant;
-
-
-public class MockStartpointCustom extends StartpointCustom {
-  private final String testInfo1;
-  private final long testInfo2;
-
-  // Default constructor needed for serde.
-  private MockStartpointCustom() {
-    this(null, 0);
-  }
-
-  public MockStartpointCustom(String testInfo1, long testInfo2) {
-    this(testInfo1, testInfo2, Instant.now().toEpochMilli());
-  }
-
-  public MockStartpointCustom(String testInfo1, long testInfo2, long creationTimestamp) {
-    super(creationTimestamp);
-    this.testInfo1 = testInfo1;
-    this.testInfo2 = testInfo2;
-  }
-
-  public String getTestInfo1() {
-    return testInfo1;
-  }
-
-  public long getTestInfo2() {
-    return testInfo2;
-  }
-}
diff --git a/samza-api/src/test/java/org/apache/samza/startpoint/TestStartpoint.java b/samza-api/src/test/java/org/apache/samza/startpoint/TestStartpoint.java
index b54e569..a3fabee 100644
--- a/samza-api/src/test/java/org/apache/samza/startpoint/TestStartpoint.java
+++ b/samza-api/src/test/java/org/apache/samza/startpoint/TestStartpoint.java
@@ -24,7 +24,6 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.junit.Assert;
 import org.junit.Test;
 
-
 public class TestStartpoint {
 
   @Test
@@ -69,44 +68,31 @@ public class TestStartpoint {
     Assert.assertEquals(StartpointUpcoming.class, mockStartpointVisitorConsumer.visitedClass);
   }
 
-  @Test
-  public void testStartpointCustom() {
-    MockStartpointCustom startpoint = new MockStartpointCustom("test12345", 12345);
-    Assert.assertEquals("test12345", startpoint.getTestInfo1());
-    Assert.assertEquals(12345, startpoint.getTestInfo2());
-    Assert.assertTrue(startpoint.getCreationTimestamp() <= Instant.now().toEpochMilli());
-
-    MockStartpointVisitor mockStartpointVisitorConsumer = new MockStartpointVisitor();
-    startpoint.apply(new SystemStreamPartition("sys", "stream", new Partition(1)), mockStartpointVisitorConsumer);
-    Assert.assertEquals(MockStartpointCustom.class, mockStartpointVisitorConsumer.visitedClass);
-  }
-
-  static class MockStartpointVisitor implements StartpointVisitor {
+  static class MockStartpointVisitor implements StartpointVisitor<SystemStreamPartition, String> {
     Class<? extends Startpoint> visitedClass;
 
     @Override
-    public void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) {
+    public String visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) {
       visitedClass = startpointSpecific.getClass();
+      return null;
     }
 
     @Override
-    public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
+    public String visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
       visitedClass = startpointTimestamp.getClass();
+      return null;
     }
 
     @Override
-    public void visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
+    public String visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
       visitedClass = startpointOldest.getClass();
+      return null;
     }
 
     @Override
-    public void visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
+    public String visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
       visitedClass = startpointUpcoming.getClass();
-    }
-
-    @Override
-    public void visit(SystemStreamPartition systemStreamPartition, StartpointCustom startpointCustom) {
-      visitedClass = startpointCustom.getClass();
+      return null;
     }
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
index cc86882..b4c33c3 100644
--- a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
+++ b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointManager.java
@@ -64,6 +64,22 @@ public class TestStartpointManager {
   }
 
   @Test
+  public void testStaleStartpoints() {
+    SystemStreamPartition ssp = new SystemStreamPartition("mockSystem", "mockStream", new Partition(2));
+    TaskName taskName = new TaskName("MockTask");
+
+    startpointManager.start();
+    long staleTimestamp = Instant.now().toEpochMilli() - StartpointManager.DEFAULT_EXPIRATION_DURATION.toMillis() - 2;
+    StartpointTimestamp startpoint = new StartpointTimestamp(staleTimestamp, staleTimestamp);
+
+    startpointManager.writeStartpoint(ssp, startpoint);
+    Assert.assertNull(startpointManager.readStartpoint(ssp));
+
+    startpointManager.writeStartpoint(ssp, taskName, startpoint);
+    Assert.assertNull(startpointManager.readStartpoint(ssp, taskName));
+  }
+
+  @Test
   public void testNoLongerUsableAfterStop() {
     StartpointManager startpointManager = new StartpointManager(coordinatorStreamStore);
     startpointManager.start();
@@ -171,21 +187,6 @@ public class TestStartpointManager {
   }
 
   @Test
-  public void testStaleStartpoints() {
-    SystemStreamPartition ssp = new SystemStreamPartition("mockSystem", "mockStream", new Partition(2));
-    TaskName taskName = new TaskName("MockTask");
-
-    startpointManager.start();
-    long staleTimestamp = Instant.now().toEpochMilli() - StartpointManager.DEFAULT_EXPIRATION_DURATION.toMillis() - 2;
-    MockStartpointCustom startpoint = new MockStartpointCustom("das boot", 42, staleTimestamp);
-    startpointManager.writeStartpoint(ssp, startpoint);
-    Assert.assertNull(startpointManager.readStartpoint(ssp));
-
-    startpointManager.writeStartpoint(ssp, taskName, startpoint);
-    Assert.assertNull(startpointManager.readStartpoint(ssp, taskName));
-  }
-
-  @Test
   public void testGroupStartpointsPerTask() {
     MapConfig config = new MapConfig();
     startpointManager.start();
diff --git a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointSerde.java b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointSerde.java
index c939680..9dfb784 100644
--- a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointSerde.java
+++ b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointSerde.java
@@ -21,7 +21,6 @@ package org.apache.samza.startpoint;
 import org.junit.Assert;
 import org.junit.Test;
 
-
 public class TestStartpointSerde {
   private final StartpointSerde startpointSerde = new StartpointSerde();
 
@@ -62,15 +61,4 @@ public class TestStartpointSerde {
     Assert.assertEquals(startpointUpcoming.getClass(), startpointFromSerde.getClass());
     Assert.assertEquals(startpointUpcoming.getCreationTimestamp(), startpointFromSerde.getCreationTimestamp());
   }
-
-  @Test
-  public void testStartpointCustomSerde() {
-    MockStartpointCustom startpointCustom = new MockStartpointCustom("das boot", 42);
-    Startpoint startpointFromSerde = startpointSerde.fromBytes(startpointSerde.toBytes(startpointCustom));
-
-    Assert.assertEquals(startpointCustom.getClass(), startpointFromSerde.getClass());
-    Assert.assertEquals(startpointCustom.getCreationTimestamp(), startpointFromSerde.getCreationTimestamp());
-    Assert.assertEquals(startpointCustom.getTestInfo1(), ((MockStartpointCustom) startpointFromSerde).getTestInfo1());
-    Assert.assertEquals(startpointCustom.getTestInfo2(), ((MockStartpointCustom) startpointFromSerde).getTestInfo2());
-  }
 }
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index a82a6b9..c18c82d 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -20,12 +20,15 @@
 package org.apache.samza.system.kafka;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -43,6 +46,7 @@ import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.TopicConfig;
@@ -55,6 +59,12 @@ import org.apache.samza.config.KafkaConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.config.SystemConfig;
+import org.apache.samza.startpoint.Startpoint;
+import org.apache.samza.startpoint.StartpointOldest;
+import org.apache.samza.startpoint.StartpointSpecific;
+import org.apache.samza.startpoint.StartpointTimestamp;
+import org.apache.samza.startpoint.StartpointUpcoming;
+import org.apache.samza.startpoint.StartpointVisitor;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.StreamValidationException;
 import org.apache.samza.system.SystemAdmin;
@@ -62,6 +72,7 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.ExponentialSleepStrategy;
+import org.apache.samza.util.KafkaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Function0;
@@ -90,7 +101,6 @@ public class KafkaSystemAdmin implements SystemAdmin {
   public static volatile boolean deleteMessageCalled = false;
 
   protected final String systemName;
-  private final Consumer metadataConsumer; // Need to synchronize all accesses, since KafkaConsumer is not thread-safe
   protected final Config config;
 
   // Custom properties to create a new coordinator stream.
@@ -112,6 +122,8 @@ public class KafkaSystemAdmin implements SystemAdmin {
   final AdminClient adminClient;
 
   private final AtomicBoolean stopped = new AtomicBoolean(false);
+  private final ThreadSafeKafkaConsumer threadSafeKafkaConsumer;
+  private final KafkaStartpointToOffsetResolver kafkaStartpointToOffsetResolver;
 
   public KafkaSystemAdmin(String systemName, Config config, Consumer metadataConsumer) {
     this.systemName = systemName;
@@ -121,7 +133,8 @@ public class KafkaSystemAdmin implements SystemAdmin {
       throw new SamzaException(
           "Cannot construct KafkaSystemAdmin for system " + systemName + " with null metadataConsumer");
     }
-    this.metadataConsumer = metadataConsumer;
+    this.threadSafeKafkaConsumer = new ThreadSafeKafkaConsumer(metadataConsumer);
+    this.kafkaStartpointToOffsetResolver = new KafkaStartpointToOffsetResolver(threadSafeKafkaConsumer);
 
     Properties props = createAdminClientProperties();
     LOG.info("New admin client with props:" + props);
@@ -176,9 +189,9 @@ public class KafkaSystemAdmin implements SystemAdmin {
   public void stop() {
     if (stopped.compareAndSet(false, true)) {
       try {
-        metadataConsumer.close();
+        threadSafeKafkaConsumer.close();
       } catch (Exception e) {
-        LOG.warn("metadataConsumer.close for system " + systemName + " failed with exception.", e);
+        LOG.warn(String.format("Exception occurred when closing consumer of system: %s.", systemName), e);
       }
     }
 
@@ -231,10 +244,7 @@ public class KafkaSystemAdmin implements SystemAdmin {
             streamNames.forEach(streamName -> {
               Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();
 
-              List<PartitionInfo> partitionInfos;
-              synchronized (metadataConsumer) {
-                partitionInfos = metadataConsumer.partitionsFor(streamName);
-              }
+              List<PartitionInfo> partitionInfos = threadSafeKafkaConsumer.execute(consumer -> consumer.partitionsFor(streamName));
               LOG.debug("Stream {} has partitions {}", streamName, partitionInfos);
               partitionInfos.forEach(
                   partitionInfo -> partitionMetadata.put(new Partition(partitionInfo.partition()), dummySspm));
@@ -364,39 +374,29 @@ public class KafkaSystemAdmin implements SystemAdmin {
   }
 
   /**
-   * Convert TopicPartition to SystemStreamPartition
-   * @param topicPartition the topic partition to be created
-   * @return an instance of SystemStreamPartition
-   */
-  private SystemStreamPartition toSystemStreamPartition(TopicPartition topicPartition) {
-    String topic = topicPartition.topic();
-    Partition partition = new Partition(topicPartition.partition());
-    return new SystemStreamPartition(systemName, topic, partition);
-  }
-
-  /**
-   * Uses {@code metadataConsumer} to fetch the metadata for the {@code topicPartitions}.
-   * Warning: If multiple threads call this with the same {@code metadataConsumer}, then this will not protect against
-   * concurrent access to the {@code metadataConsumer}.
+   * Uses the kafka consumer to fetch the metadata for the {@code topicPartitions}.
    */
   private OffsetsMaps fetchTopicPartitionsMetadata(List<TopicPartition> topicPartitions) {
     Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>();
     Map<SystemStreamPartition, String> newestOffsets = new HashMap<>();
     Map<SystemStreamPartition, String> upcomingOffsets = new HashMap<>();
-    Map<TopicPartition, Long> oldestOffsetsWithLong;
-    Map<TopicPartition, Long> upcomingOffsetsWithLong;
-
-    synchronized (metadataConsumer) {
-      oldestOffsetsWithLong = metadataConsumer.beginningOffsets(topicPartitions);
-      LOG.debug("Kafka-fetched beginningOffsets: {}", oldestOffsetsWithLong);
-      upcomingOffsetsWithLong = metadataConsumer.endOffsets(topicPartitions);
-      LOG.debug("Kafka-fetched endOffsets: {}", upcomingOffsetsWithLong);
-    }
+    final Map<TopicPartition, Long> oldestOffsetsWithLong = new HashMap<>();
+    final Map<TopicPartition, Long> upcomingOffsetsWithLong = new HashMap<>();
+
+    threadSafeKafkaConsumer.execute(consumer -> {
+      Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
+      LOG.debug("Beginning offsets for topic-partitions: {} is {}", topicPartitions, beginningOffsets);
+      oldestOffsetsWithLong.putAll(beginningOffsets);
+      Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
+      LOG.debug("End offsets for topic-partitions: {} is {}", topicPartitions, endOffsets);
+      upcomingOffsetsWithLong.putAll(endOffsets);
+      return Optional.empty();
+    });
 
-    oldestOffsetsWithLong.forEach((topicPartition, offset) -> oldestOffsets.put(toSystemStreamPartition(topicPartition), String.valueOf(offset)));
+    oldestOffsetsWithLong.forEach((topicPartition, offset) -> oldestOffsets.put(KafkaUtil.toSystemStreamPartition(systemName, topicPartition), String.valueOf(offset)));
 
     upcomingOffsetsWithLong.forEach((topicPartition, offset) -> {
-      upcomingOffsets.put(toSystemStreamPartition(topicPartition), String.valueOf(offset));
+      upcomingOffsets.put(KafkaUtil.toSystemStreamPartition(systemName, topicPartition), String.valueOf(offset));
 
       // Kafka's beginning Offset corresponds to the offset for the oldest message.
       // Kafka's end offset corresponds to the offset for the upcoming message, and it is the newest offset + 1.
@@ -409,9 +409,9 @@ public class KafkaSystemAdmin implements SystemAdmin {
         LOG.warn(
             "Empty Kafka topic partition {} with upcoming offset {}. Skipping newest offset and setting oldest offset to 0 to consume from beginning",
             topicPartition, offset);
-        oldestOffsets.put(toSystemStreamPartition(topicPartition), "0");
+        oldestOffsets.put(KafkaUtil.toSystemStreamPartition(systemName, topicPartition), "0");
       } else {
-        newestOffsets.put(toSystemStreamPartition(topicPartition), String.valueOf(offset - 1));
+        newestOffsets.put(KafkaUtil.toSystemStreamPartition(systemName, topicPartition), String.valueOf(offset - 1));
       }
     });
     return new OffsetsMaps(oldestOffsets, newestOffsets, upcomingOffsets);
@@ -430,23 +430,20 @@ public class KafkaSystemAdmin implements SystemAdmin {
     LOG.info("Fetching SystemStreamMetadata for topics {} on system {}", topics, systemName);
 
     topics.forEach(topic -> {
-      synchronized (metadataConsumer) {
-        List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
-
-        if (partitionInfos == null) {
-          String msg = String.format("Partition info not(yet?) available for system %s topic %s", systemName, topic);
-          throw new SamzaException(msg);
-        }
-
-        List<TopicPartition> topicPartitions = partitionInfos.stream()
-            .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
-            .collect(Collectors.toList());
-
-        OffsetsMaps offsetsForTopic = fetchTopicPartitionsMetadata(topicPartitions);
-        allOldestOffsets.putAll(offsetsForTopic.getOldestOffsets());
-        allNewestOffsets.putAll(offsetsForTopic.getNewestOffsets());
-        allUpcomingOffsets.putAll(offsetsForTopic.getUpcomingOffsets());
-      }
+      OffsetsMaps offsetsForTopic = threadSafeKafkaConsumer.execute(consumer -> {
+         List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+         if (partitionInfos == null) {
+           String msg = String.format("Partition info not(yet?) available for system %s topic %s", systemName, topic);
+           throw new SamzaException(msg);
+         }
+         List<TopicPartition> topicPartitions = partitionInfos.stream()
+          .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
+          .collect(Collectors.toList());
+         return fetchTopicPartitionsMetadata(topicPartitions);
+       });
+      allOldestOffsets.putAll(offsetsForTopic.getOldestOffsets());
+      allNewestOffsets.putAll(offsetsForTopic.getNewestOffsets());
+      allUpcomingOffsets.putAll(offsetsForTopic.getUpcomingOffsets());
     });
 
     return assembleMetadata(allOldestOffsets, allNewestOffsets, allUpcomingOffsets);
@@ -575,20 +572,6 @@ public class KafkaSystemAdmin implements SystemAdmin {
     }
   }
 
-  // get partition info for topic
-  Map<String, List<PartitionInfo>> getTopicMetadata(Set<String> topics) {
-    Map<String, List<PartitionInfo>> streamToPartitionsInfo = new HashMap<>();
-    List<PartitionInfo> partitionInfoList;
-    for (String topic : topics) {
-      synchronized (metadataConsumer) {
-        partitionInfoList = metadataConsumer.partitionsFor(topic);
-      }
-      streamToPartitionsInfo.put(topic, partitionInfoList);
-    }
-
-    return streamToPartitionsInfo;
-  }
-
   /**
    * Delete records up to (and including) the provided ssp offsets for
    * all system stream partitions specified in the map.
@@ -639,11 +622,12 @@ public class KafkaSystemAdmin implements SystemAdmin {
 
   @Override
   public Set<SystemStream> getAllSystemStreams() {
-    synchronized (metadataConsumer) {
-      return ((Set<String>) this.metadataConsumer.listTopics().keySet()).stream()
-          .map(x -> new SystemStream(systemName, x))
-          .collect(Collectors.toSet());
-    }
+    Map<String, List<PartitionInfo>> topicToPartitionInfoMap = threadSafeKafkaConsumer.execute(consumer -> consumer.listTopics());
+    Set<SystemStream> systemStreams = topicToPartitionInfoMap.keySet()
+                                                             .stream()
+                                                             .map(topic -> new SystemStream(systemName, topic))
+                                                             .collect(Collectors.toSet());
+    return systemStreams;
   }
 
   /**
@@ -719,6 +703,11 @@ public class KafkaSystemAdmin implements SystemAdmin {
     return coordinatorStreamProperties;
   }
 
+  @Override
+  public String resolveStartpointToOffset(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
+    return startpoint.apply(systemStreamPartition, kafkaStartpointToOffsetResolver);
+  }
+
   /**
    * Container for metadata about offsets.
    */
@@ -771,4 +760,124 @@ public class KafkaSystemAdmin implements SystemAdmin {
       return kafkaProperties;
     }
   }
+
+  /**
+   * Offers a kafka specific implementation of {@link StartpointVisitor} that resolves
+   * different types of {@link Startpoint} to samza offset.
+    */
+  @VisibleForTesting
+  static class KafkaStartpointToOffsetResolver implements StartpointVisitor<SystemStreamPartition, String> {
+
+    private final ThreadSafeKafkaConsumer threadSafeKafkaConsumer;
+
+    public KafkaStartpointToOffsetResolver(ThreadSafeKafkaConsumer threadSafeKafkaConsumer) {
+      this.threadSafeKafkaConsumer = threadSafeKafkaConsumer;
+    }
+
+    @VisibleForTesting
+    KafkaStartpointToOffsetResolver(Consumer consumer) {
+      this.threadSafeKafkaConsumer = new ThreadSafeKafkaConsumer(consumer);
+    }
+
+    @Override
+    public String visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) {
+      return startpointSpecific.getSpecificOffset();
+    }
+
+    @Override
+    public String visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
+      Preconditions.checkNotNull(startpointTimestamp, "Startpoint cannot be null");
+      Preconditions.checkNotNull(startpointTimestamp.getTimestampOffset(), "Timestamp field in startpoint cannot be null");
+      TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+
+      Map<TopicPartition, Long> topicPartitionToTimestamp = ImmutableMap.of(topicPartition, startpointTimestamp.getTimestampOffset());
+      LOG.info("Finding offset for timestamp: {} in topic partition: {}.", startpointTimestamp.getTimestampOffset(), topicPartition);
+      Map<TopicPartition, OffsetAndTimestamp> topicPartitionToOffsetTimestamps = threadSafeKafkaConsumer.execute(consumer -> consumer.offsetsForTimes(topicPartitionToTimestamp));
+
+      OffsetAndTimestamp offsetAndTimestamp = topicPartitionToOffsetTimestamps.get(topicPartition);
+      if (offsetAndTimestamp != null) {
+        return String.valueOf(offsetAndTimestamp.offset());
+      } else {
+        LOG.info("Offset for timestamp: {} does not exist for partition: {}. Falling back to end offset.", startpointTimestamp.getTimestampOffset(), topicPartition);
+        return getEndOffset(systemStreamPartition);
+      }
+    }
+
+    @Override
+    public String visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
+      TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+      Map<TopicPartition, Long> topicPartitionToOffsets = threadSafeKafkaConsumer.execute(consumer -> consumer.beginningOffsets(ImmutableSet.of(topicPartition)));
+      Long beginningOffset = topicPartitionToOffsets.get(topicPartition);
+      LOG.info("Beginning offset for topic partition: {} is {}.", topicPartition, beginningOffset);
+      return String.valueOf(beginningOffset);
+    }
+
+    @Override
+    public String visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
+      return getEndOffset(systemStreamPartition);
+    }
+
+    /**
+     * Converts the {@link SystemStreamPartition} to {@link TopicPartition}.
+     * @param systemStreamPartition the input system stream partition.
+     * @return the converted topic partition.
+     */
+    static TopicPartition toTopicPartition(SystemStreamPartition systemStreamPartition) {
+      Preconditions.checkNotNull(systemStreamPartition);
+      Preconditions.checkNotNull(systemStreamPartition.getPartition());
+      Preconditions.checkNotNull(systemStreamPartition.getStream());
+
+      return new TopicPartition(systemStreamPartition.getStream(), systemStreamPartition.getPartition().getPartitionId());
+    }
+
+    /**
+     * Determines the end offset of the {@param SystemStreamPartition}.
+     * @param systemStreamPartition represents the system stream partition.
+     * @return the end offset of the partition.
+     */
+    private String getEndOffset(SystemStreamPartition systemStreamPartition) {
+      TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+      Map<TopicPartition, Long> topicPartitionToOffsets = threadSafeKafkaConsumer.execute(consumer -> consumer.endOffsets(ImmutableSet.of(topicPartition)));
+      Long endOffset = topicPartitionToOffsets.get(topicPartition);
+      LOG.info("End offset for topic partition: {} is {}.", topicPartition, endOffset);
+      return String.valueOf(endOffset);
+    }
+  }
+
+  /**
+   * Offers thread-safe operations over the vanilla {@link Consumer}.
+   */
+  static class ThreadSafeKafkaConsumer {
+
+    private final Consumer kafkaConsumer;
+
+    ThreadSafeKafkaConsumer(Consumer kafkaConsumer) {
+      this.kafkaConsumer = kafkaConsumer;
+    }
+
+    /**
+     * Executes the lambda function comprised of kafka-consumer operations in a thread-safe manner
+     * and returns the result of the execution.
+     *
+     * @param function accepts the kafka consumer as argument and returns a result after executing a
+     *                 sequence of operations on a kafka-broker.
+     * @param <T> the return type of the lambda function.
+     * @return the result of executing the lambda function.
+     */
+    public <T> T execute(Function<Consumer, T> function) {
+      // Kafka consumer is not thread-safe
+      synchronized (kafkaConsumer) {
+        return function.apply(kafkaConsumer);
+      }
+    }
+
+    /**
+     * Closes the underlying kafka consumer.
+     */
+    public void close() {
+      synchronized (kafkaConsumer) {
+        kafkaConsumer.close();
+      }
+    }
+  }
 }
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index 0cb2bb6..015b76a 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -22,38 +22,26 @@
 
 package org.apache.samza.system.kafka;
 
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import kafka.common.TopicAndPartition;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.KafkaConfig;
-import org.apache.samza.startpoint.Startpoint;
-import org.apache.samza.startpoint.StartpointOldest;
-import org.apache.samza.startpoint.StartpointSpecific;
-import org.apache.samza.startpoint.StartpointTimestamp;
-import org.apache.samza.startpoint.StartpointUpcoming;
-import org.apache.samza.startpoint.StartpointVisitor;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.BlockingEnvelopeMap;
 import org.apache.samza.util.Clock;
+import org.apache.samza.util.KafkaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
@@ -73,17 +61,16 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
   private final Config config;
   private final boolean fetchThresholdBytesEnabled;
   private final KafkaSystemConsumerMetrics metrics;
-  private final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler;
 
   // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap.
   final KafkaConsumerMessageSink messageSink;
 
   // This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates
   // BlockingEnvelopMap's buffers.
-  private final KafkaConsumerProxy proxy;
+  private final KafkaConsumerProxy<K, V> proxy;
 
-  // Holds mapping from {@code TopicPartition} to registered {@code Startpoint}. This will be used in the start().
-  final Map<TopicPartition, Startpoint> topicPartitionToStartpointMap = new HashMap<>();
+  // Holds the mapping between the registered TopicPartition and offset until the consumer is started.
+  Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
 
   long perPartitionFetchThreshold;
   long perPartitionFetchThresholdBytes;
@@ -111,26 +98,11 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
 
     // create a sink for passing the messages between the proxy and the consumer
-    this.messageSink = new KafkaConsumerMessageSink();
+    messageSink = new KafkaConsumerMessageSink();
 
     // Create the proxy to do the actual message reading.
     proxy = kafkaConsumerProxyFactory.create(this.messageSink);
     LOG.info("{}: Created proxy {} ", this, proxy);
-    this.kafkaStartpointRegistrationHandler = new KafkaStartpointRegistrationHandler(kafkaConsumer, proxy);
-  }
-
-  @VisibleForTesting
-  KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId,
-      KafkaConsumerProxy<K, V> kafkaConsumerProxy, KafkaSystemConsumerMetrics metrics, Clock clock, KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler) {
-    this.kafkaConsumer = kafkaConsumer;
-    this.clientId = clientId;
-    this.systemName = systemName;
-    this.config = config;
-    this.metrics = metrics;
-    this.proxy = kafkaConsumerProxy;
-    this.kafkaStartpointRegistrationHandler = kafkaStartpointRegistrationHandler;
-    this.messageSink = new KafkaConsumerMessageSink();
-    fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
   }
 
   /**
@@ -167,12 +139,11 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
 
   private void startSubscription() {
     //subscribe to all the registered TopicPartitions
-    Set<TopicPartition> registeredTopicPartitions = new HashSet<>(topicPartitionToStartpointMap.keySet());
-    LOG.info("{}: Consumer subscribes to {}", this, registeredTopicPartitions);
+    LOG.info("{}: Consumer subscribes to {}", this, topicPartitionsToOffset.keySet());
     try {
       synchronized (kafkaConsumer) {
         // we are using assign (and not subscribe), so we need to specify both topic and partition
-        kafkaConsumer.assign(registeredTopicPartitions);
+        kafkaConsumer.assign(topicPartitionsToOffset.keySet());
       }
     } catch (Exception e) {
       throw new SamzaException("Consumer subscription failed for " + this, e);
@@ -186,14 +157,29 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
    */
   void startConsumer() {
     // set the offset for each TopicPartition
-    if (topicPartitionToStartpointMap.size() <= 0) {
+    if (topicPartitionsToOffset.size() <= 0) {
       LOG.error ("{}: Consumer is not subscribed to any SSPs", this);
     }
 
-    topicPartitionToStartpointMap.forEach((topicPartition, startpoint) -> {
-      Partition partition = new Partition(topicPartition.partition());
-      SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, topicPartition.topic(), partition);
-      startpoint.apply(systemStreamPartition, kafkaStartpointRegistrationHandler);
+    topicPartitionsToOffset.forEach((topicPartition, startingOffsetString) -> {
+      long startingOffset = Long.valueOf(startingOffsetString);
+
+      try {
+        synchronized (kafkaConsumer) {
+          kafkaConsumer.seek(topicPartition, startingOffset);
+        }
+      } catch (Exception e) {
+        // all recoverable execptions are handled by the client.
+        // if we get here there is nothing left to do but bail out.
+        String msg = String.format("%s: Got Exception while seeking to %s for partition %s", this, startingOffsetString, topicPartition);
+        LOG.error(msg, e);
+        throw new SamzaException(msg, e);
+      }
+
+      LOG.info("{}: Changing consumer's starting offset for partition {} to {}", this, topicPartition, startingOffsetString);
+
+      // add the partition to the proxy
+      proxy.addTopicPartition(KafkaUtil.toSystemStreamPartition(systemName, topicPartition), startingOffset);
     });
 
     // start the proxy thread
@@ -219,7 +205,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
       fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get());
     }
 
-    int numPartitions = topicPartitionToStartpointMap.size();
+    int numPartitions = topicPartitionsToOffset.size();
 
     if (numPartitions > 0) {
       perPartitionFetchThreshold = fetchThreshold / numPartitions;
@@ -265,13 +251,8 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
    */
   @Override
   public void register(SystemStreamPartition systemStreamPartition, String offset) {
-    register(systemStreamPartition, new StartpointSpecific(offset));
-  }
-
-  @Override
-  public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
     if (started.get()) {
-      String exceptionMessage = String.format("KafkaSystemConsumer: %s had started. Registration of ssp: %s, startpoint: %s failed.", this, systemStreamPartition, startpoint);
+      String exceptionMessage = String.format("KafkaSystemConsumer: %s had started. Registration of ssp: %s, offset: %s failed.", this, systemStreamPartition, offset);
       throw new SamzaException(exceptionMessage);
     }
 
@@ -279,16 +260,30 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
       LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition);
       return;
     }
+    LOG.info("{}: Registering ssp: {} with offset: {}", this, systemStreamPartition, offset);
 
-    LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", systemStreamPartition, startpoint);
-
-    super.register(systemStreamPartition, startpoint);
+    super.register(systemStreamPartition, offset);
 
     TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
-    topicPartitionToStartpointMap.put(topicPartition, startpoint);
+
+    String existingOffset = topicPartitionsToOffset.get(topicPartition);
+    // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages.
+    if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) {
+      topicPartitionsToOffset.put(topicPartition, offset);
+    }
+
     metrics.registerTopicAndPartition(toTopicAndPartition(topicPartition));
   }
 
+  /**
+   * Compare two String offsets.
+   * Note. There is a method in KafkaSystemAdmin that does that, but that would require instantiation of systemadmin for each consumer.
+   * @return see {@link Long#compareTo(Long)}
+   */
+  private static int compareOffsets(String offset1, String offset2) {
+    return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
+  }
+
   @Override
   public String toString() {
     return String.format("%s:%s", systemName, clientId);
@@ -309,11 +304,11 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     return super.poll(systemStreamPartitions, timeout);
   }
 
-  static TopicAndPartition toTopicAndPartition(TopicPartition tp) {
-    return new TopicAndPartition(tp.topic(), tp.partition());
+  protected static TopicAndPartition toTopicAndPartition(TopicPartition topicPartition) {
+    return new TopicAndPartition(topicPartition.topic(), topicPartition.partition());
   }
 
-  static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
+  protected static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
     return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
   }
 
@@ -325,100 +320,6 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     return systemName;
   }
 
-  @VisibleForTesting
-  static class KafkaStartpointRegistrationHandler implements StartpointVisitor {
-
-    private final KafkaConsumerProxy proxy;
-    private final Consumer kafkaConsumer;
-
-    KafkaStartpointRegistrationHandler(Consumer consumer, KafkaConsumerProxy proxy) {
-      this.proxy = proxy;
-      this.kafkaConsumer = consumer;
-    }
-
-    @Override
-    public void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) {
-      TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
-      long offsetInStartpoint = Long.parseLong(startpointSpecific.getSpecificOffset());
-      LOG.info("Updating the consumer fetch offsets of topic partition: {} to {}.", topicPartition, offsetInStartpoint);
-
-      // KafkaConsumer is not thread-safe.
-      synchronized (kafkaConsumer) {
-        kafkaConsumer.seek(topicPartition, offsetInStartpoint);
-
-        // add the partition to the proxy
-        proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
-      }
-    }
-
-    @Override
-    public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
-      Long timestampInStartpoint = startpointTimestamp.getTimestampOffset();
-      TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
-      Map<TopicPartition, Long> topicPartitionsToTimeStamps = ImmutableMap.of(topicPartition, timestampInStartpoint);
-
-      // Look up the offset by timestamp.
-      LOG.info("Looking up the offsets of the topic partition: {} by timestamp: {}.", topicPartition, timestampInStartpoint);
-      Map<TopicPartition, OffsetAndTimestamp> topicPartitionToOffsetTimestamps = new HashMap<>();
-      synchronized (kafkaConsumer) {
-        topicPartitionToOffsetTimestamps = kafkaConsumer.offsetsForTimes(topicPartitionsToTimeStamps);
-      }
-
-      // If the timestamp does not exist for the partition, then seek the consumer to end.
-      if (topicPartitionToOffsetTimestamps.get(topicPartition) == null) {
-        LOG.info("Timestamp does not exist for partition: {}. Seeking the kafka consumer to the end offset.", topicPartition);
-
-        // KafkaConsumer is not thread-safe.
-        synchronized (kafkaConsumer) {
-          kafkaConsumer.seekToEnd(ImmutableList.of(topicPartition));
-
-          // add the partition to the proxy
-          proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
-        }
-      } else {
-
-        // KafkaConsumer is not thread-safe.
-        synchronized (kafkaConsumer) {
-          // Update the consumer fetch offsets.
-          OffsetAndTimestamp offsetAndTimeStamp = topicPartitionToOffsetTimestamps.get(topicPartition);
-          LOG.info("Updating the consumer fetch offsets of the topic partition: {} to {}.", topicPartition, offsetAndTimeStamp.offset());
-          kafkaConsumer.seek(topicPartition, offsetAndTimeStamp.offset());
-
-          // add the partition to the proxy
-          proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
-        }
-      }
-    }
-
-    @Override
-    public void visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
-      TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
-      Collection<TopicPartition> topicPartitions = ImmutableList.of(topicPartition);
-      LOG.info("Seeking the kafka consumer to the first offset for the topic partition: {}.", topicPartitions);
-
-      // KafkaConsumer is not thread-safe.
-      synchronized (kafkaConsumer) {
-        kafkaConsumer.seekToBeginning(topicPartitions);
-        // add the partition to the proxy
-        proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
-      }
-    }
-
-    @Override
-    public void visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
-      TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
-      Collection<TopicPartition> topicPartitions = ImmutableList.of(topicPartition);
-      LOG.info("Seeking the kafka consumer to the end offset for the topic partition: {}.", topicPartitions);
-
-      // KafkaConsumer is not thread-safe.
-      synchronized (kafkaConsumer) {
-        kafkaConsumer.seekToEnd(topicPartitions);
-        // add the partition to the proxy
-        proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
-      }
-    }
-  }
-
   public class KafkaConsumerMessageSink {
 
     public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean isAtHighWatermark) {
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index cb91a76..ea6476e 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -24,12 +24,14 @@ import java.util.concurrent.atomic.AtomicLong
 import kafka.admin.AdminUtils
 import kafka.utils.ZkUtils
 import org.apache.kafka.common.PartitionInfo
-import org.apache.samza.config.{Config, ConfigException}
-import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.kafka.common.TopicPartition
+import org.apache.samza.config.Config
 import org.apache.samza.execution.StreamManager
 import org.apache.samza.system.OutgoingMessageEnvelope
+import org.apache.samza.system.SystemStreamPartition
 import org.apache.kafka.common.errors.ReplicaNotAvailableException
 import org.apache.kafka.common.protocol.Errors
+import org.apache.samza.Partition
 
 object KafkaUtil extends Logging {
   /**
@@ -52,6 +54,18 @@ object KafkaUtil extends Logging {
   }
 
   /**
+   * Converts the TopicPartition to SystemStreamPartition.
+   *
+   * @param topicPartition represents the TopicPartition.
+   * @return an instance of the SystemStreamPartition.
+   */
+  def toSystemStreamPartition(systemName: String, topicPartition: TopicPartition): SystemStreamPartition = {
+    val topic = topicPartition.topic()
+    val partition = new Partition(topicPartition.partition)
+    new SystemStreamPartition(systemName, topic, partition)
+  }
+
+  /**
    * Exactly the same as Kafka's ErrorMapping.maybeThrowException
    * implementation, except suppresses ReplicaNotAvailableException exceptions.
    * According to the Kafka
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 9169b44..b76f4e7 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -25,16 +25,24 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.samza.Partition;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.startpoint.StartpointOldest;
+import org.apache.samza.startpoint.StartpointSpecific;
+import org.apache.samza.startpoint.StartpointTimestamp;
+import org.apache.samza.startpoint.StartpointUpcoming;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.StreamValidationException;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.kafka.KafkaSystemAdmin.KafkaStartpointToOffsetResolver;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -42,10 +50,16 @@ import org.mockito.Mockito;
 import static org.apache.samza.system.kafka.KafkaSystemAdmin.*;
 import static org.junit.Assert.*;
 
-
 public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
   private static final String SYSTEM = "kafka";
   private static final String TOPIC = "input";
+  private static final String TEST_SYSTEM = "test-system";
+  private static final String TEST_STREAM = "test-stream";
+  private static final Integer TEST_PARTITION_ID = 0;
+  private static final TopicPartition TEST_TOPIC_PARTITION = new TopicPartition(TEST_STREAM, TEST_PARTITION_ID);
+  private static final Partition TEST_PARTITION = new Partition(TEST_PARTITION_ID);
+  private static final SystemStreamPartition TEST_SYSTEM_STREAM_PARTITION = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, TEST_PARTITION);
+  private static final String TEST_OFFSET = "10";
 
   @Test
   public void testGetOffsetsAfter() {
@@ -337,4 +351,90 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
     assertEquals(expectedSystemStream2Partition0Metadata, stream2PartitionMetadata.get(new Partition(0)));
     assertEquals(expectedSystemStream2Partition1Metadata, stream2PartitionMetadata.get(new Partition(1)));
   }
+
+  @Test
+  public void testStartpointSpecificOffsetVisitorShouldResolveToCorrectOffset() {
+    final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+    final KafkaStartpointToOffsetResolver kafkaStartpointToOffsetResolver = new KafkaStartpointToOffsetResolver(consumer);
+
+    final StartpointSpecific testStartpointSpecific = new StartpointSpecific(TEST_OFFSET);
+
+    // Invoke the consumer with startpoint.
+    String resolvedOffset = kafkaStartpointToOffsetResolver.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
+    Assert.assertEquals(TEST_OFFSET, resolvedOffset);
+  }
+
+  @Test
+  public void testStartpointTimestampVisitorShouldResolveToCorrectOffset() {
+    // Define dummy variables for testing.
+    final Long testTimeStamp = 10L;
+
+    final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+
+    final KafkaStartpointToOffsetResolver kafkaStartpointToOffsetResolver = new KafkaStartpointToOffsetResolver(consumer);
+
+    final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp);
+    final Map<TopicPartition, OffsetAndTimestamp> offsetForTimesResult = ImmutableMap.of(
+        TEST_TOPIC_PARTITION, new OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp));
+
+    // Mock the consumer interactions.
+    Mockito.when(consumer.offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, testTimeStamp))).thenReturn(offsetForTimesResult);
+    Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
+
+    String resolvedOffset = kafkaStartpointToOffsetResolver.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp);
+    Assert.assertEquals(TEST_OFFSET, resolvedOffset);
+  }
+
+  @Test
+  public void testStartpointTimestampVisitorShouldResolveToCorrectOffsetWhenTimestampDoesNotExist() {
+    final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+    final KafkaStartpointToOffsetResolver kafkaStartpointToOffsetResolver = new KafkaStartpointToOffsetResolver(consumer);
+
+    final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(0L);
+    final Map<TopicPartition, OffsetAndTimestamp> offsetForTimesResult = new HashMap<>();
+    offsetForTimesResult.put(TEST_TOPIC_PARTITION, null);
+
+    // Mock the consumer interactions.
+    Mockito.when(consumer.offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, 0L))).thenReturn(offsetForTimesResult);
+    Mockito.when(consumer.endOffsets(ImmutableSet.of(TEST_TOPIC_PARTITION))).thenReturn(ImmutableMap.of(TEST_TOPIC_PARTITION, 10L));
+
+    String resolvedOffset = kafkaStartpointToOffsetResolver.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp);
+    Assert.assertEquals(TEST_OFFSET, resolvedOffset);
+
+    // Mock verifications.
+    Mockito.verify(consumer).offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, 0L));
+  }
+
+  @Test
+  public void testStartpointOldestVisitorShouldResolveToCorrectOffset() {
+    // Define dummy variables for testing.
+    final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+    final KafkaStartpointToOffsetResolver kafkaStartpointToOffsetResolver = new KafkaStartpointToOffsetResolver(consumer);
+
+    final StartpointOldest testStartpointSpecific = new StartpointOldest();
+
+    // Mock the consumer interactions.
+    Mockito.when(consumer.beginningOffsets(ImmutableSet.of(TEST_TOPIC_PARTITION))).thenReturn(ImmutableMap.of(TEST_TOPIC_PARTITION, 10L));
+
+    // Invoke the consumer with startpoint.
+    String resolvedOffset = kafkaStartpointToOffsetResolver.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
+    Assert.assertEquals(TEST_OFFSET, resolvedOffset);
+  }
+
+  @Test
+  public void testStartpointUpcomingVisitorShouldResolveToCorrectOffset() {
+    // Define dummy variables for testing.
+    final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+
+    final KafkaStartpointToOffsetResolver kafkaStartpointToOffsetResolver = new KafkaStartpointToOffsetResolver(consumer);
+
+    final StartpointUpcoming testStartpointSpecific = new StartpointUpcoming();
+
+    // Mock the consumer interactions.
+    Mockito.when(consumer.endOffsets(ImmutableSet.of(TEST_TOPIC_PARTITION))).thenReturn(ImmutableMap.of(TEST_TOPIC_PARTITION, 10L));
+
+    // Invoke the consumer with startpoint.
+    String resolvedOffset = kafkaStartpointToOffsetResolver.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
+    Assert.assertEquals(TEST_OFFSET, resolvedOffset);
+  }
 }
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
index 91444c7..5f74e38 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
@@ -21,14 +21,11 @@
 
 package org.apache.samza.system.kafka;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.samza.Partition;
@@ -37,16 +34,11 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.KafkaConfig;
 import org.apache.samza.config.KafkaConsumerConfig;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.startpoint.StartpointOldest;
-import org.apache.samza.startpoint.StartpointSpecific;
-import org.apache.samza.startpoint.StartpointTimestamp;
-import org.apache.samza.startpoint.StartpointUpcoming;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.system.kafka.KafkaSystemConsumer.KafkaStartpointRegistrationHandler;
-import org.apache.samza.testUtils.TestClock;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.NoOpMetricsRegistry;
+import org.apache.samza.util.SystemClock;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -59,16 +51,10 @@ public class TestKafkaSystemConsumer {
   private static final String TEST_STREAM = "test-stream";
   private static final String TEST_JOB = "test-job";
   private static final String TEST_CLIENT_ID = "testClientId";
-  private static final String BOOTSTRAP_SERVER = "127.0.0.1:8888";
+  private static final String BOOTSTRAP_SERVERS = "127.0.0.1:8888";
   private static final String FETCH_THRESHOLD_MSGS = "50000";
   private static final String FETCH_THRESHOLD_BYTES = "100000";
 
-  private static final Integer TEST_PARTITION_ID = 0;
-  private static final TopicPartition TEST_TOPIC_PARTITION = new TopicPartition(TEST_STREAM, TEST_PARTITION_ID);
-  private static final Partition TEST_PARTITION = new Partition(TEST_PARTITION_ID);
-  private static final SystemStreamPartition TEST_SYSTEM_STREAM_PARTITION = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, TEST_PARTITION);
-  private static final String TEST_OFFSET = "10";
-
   private KafkaSystemConsumer createConsumer(String fetchMsg, String fetchBytes) {
     final Map<String, String> map = new HashMap<>();
 
@@ -76,8 +62,7 @@ public class TestKafkaSystemConsumer {
 
     map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD(), TEST_SYSTEM), fetchMsg);
     map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES(), TEST_SYSTEM), fetchBytes);
-    map.put(String.format("systems.%s.consumer.%s", TEST_SYSTEM, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
-        BOOTSTRAP_SERVER);
+    map.put(String.format("systems.%s.consumer.%s", TEST_SYSTEM, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), BOOTSTRAP_SERVERS);
     map.put(JobConfig.JOB_NAME(), "jobName");
 
     Config config = new MapConfig(map);
@@ -121,7 +106,8 @@ public class TestKafkaSystemConsumer {
   }
 
   @Test
-  public void testConsumerShouldRegisterTheLatestOffsetForSSP() {
+  public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() {
+
     KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
 
     SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
@@ -134,15 +120,14 @@ public class TestKafkaSystemConsumer {
     consumer.register(ssp1, "3");
     consumer.register(ssp2, "0");
 
-    consumer.start();
-
-    assertEquals("5", ((StartpointSpecific) consumer.topicPartitionToStartpointMap.get(KafkaSystemConsumer.toTopicPartition(ssp0))).getSpecificOffset());
-    assertEquals("3", ((StartpointSpecific) consumer.topicPartitionToStartpointMap.get(KafkaSystemConsumer.toTopicPartition(ssp1))).getSpecificOffset());
-    assertEquals("0", ((StartpointSpecific) consumer.topicPartitionToStartpointMap.get(KafkaSystemConsumer.toTopicPartition(ssp2))).getSpecificOffset());
+    assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp0)));
+    assertEquals("2", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp1)));
+    assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp2)));
   }
 
   @Test
   public void testFetchThresholdBytes() {
+
     SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
     SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
     int partitionsNum = 2;
@@ -223,158 +208,32 @@ public class TestKafkaSystemConsumer {
   }
 
   @Test
-  public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
-    final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
-    final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
-    final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler =  new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
-
-    final StartpointSpecific testStartpointSpecific = new StartpointSpecific(TEST_OFFSET);
-
-    // Mock the consumer interactions.
-    Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET));
-    Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
-
-    // Invoke the consumer with startpoint.
-    kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
-
-    // Mock verifications.
-    Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET));
-    Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
-  }
-
-  @Test
-  public void testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() {
-    // Define dummy variables for testing.
-    final Long testTimeStamp = 10L;
-
-    final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
-    final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
-    final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler =  new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
-
-    final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp);
-    final Map<TopicPartition, OffsetAndTimestamp> offsetForTimesResult = ImmutableMap.of(
-        TEST_TOPIC_PARTITION, new OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp));
-
-    // Mock the consumer interactions.
-    Mockito.when(consumer.offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, testTimeStamp))).thenReturn(offsetForTimesResult);
-    Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET));
-    Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
+  public void testStartConsumer() {
+    final Consumer consumer = Mockito.mock(Consumer.class);
+    final KafkaConsumerProxyFactory kafkaConsumerProxyFactory = Mockito.mock(KafkaConsumerProxyFactory.class);
 
-    kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp);
-
-    // Mock verifications.
-    Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET));
-    Mockito.verify(consumer).offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, testTimeStamp));
-    Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
-  }
-
-  @Test
-  public void testStartpointTimestampVisitorShouldMoveTheConsumerToEndWhenTimestampDoesNotExist() {
-    final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
-    final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
-    final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler =  new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
-
-    final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(0L);
-    final Map<TopicPartition, OffsetAndTimestamp> offsetForTimesResult = new HashMap<>();
-    offsetForTimesResult.put(TEST_TOPIC_PARTITION, null);
-
-    // Mock the consumer interactions.
-    Mockito.when(consumer.offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, 0L))).thenReturn(offsetForTimesResult);
-    Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
-
-    kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp);
-
-    // Mock verifications.
-    Mockito.verify(consumer).seekToEnd(ImmutableList.of(TEST_TOPIC_PARTITION));
-    Mockito.verify(consumer).offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, 0L));
-    Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
-  }
-
-  @Test
-  public void testStartpointOldestVisitorShouldUpdateTheFetchOffsetInConsumer() {
-    // Define dummy variables for testing.
-    final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
-    final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
-    final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler =  new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
-
-    final StartpointOldest testStartpointSpecific = new StartpointOldest();
-
-    // Mock the consumer interactions.
-    Mockito.doNothing().when(consumer).seekToBeginning(ImmutableList.of(TEST_TOPIC_PARTITION));
-    Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
-
-    // Invoke the consumer with startpoint.
-    kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
-
-    // Mock verifications.
-    Mockito.verify(consumer).seekToBeginning(ImmutableList.of(TEST_TOPIC_PARTITION));
-    Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
-  }
-
-  @Test
-  public void testStartpointUpcomingVisitorShouldUpdateTheFetchOffsetInConsumer() {
-    // Define dummy variables for testing.
-    final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+    final KafkaSystemConsumerMetrics kafkaSystemConsumerMetrics = new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry());
+    final SystemStreamPartition testSystemStreamPartition1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
+    final SystemStreamPartition testSystemStreamPartition2 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
+    final String testOffset = "1";
     final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
-    final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler =  new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
 
-    final StartpointUpcoming testStartpointSpecific = new StartpointUpcoming();
+    Mockito.when(kafkaConsumerProxyFactory.create(Mockito.anyObject())).thenReturn(kafkaConsumerProxy);
+    Mockito.doNothing().when(consumer).seek(new TopicPartition(TEST_STREAM, 0), 1);
+    Mockito.doNothing().when(consumer).seek(new TopicPartition(TEST_STREAM, 1), 1);
 
-    // Mock the consumer interactions.
-    Mockito.doNothing().when(consumer).seekToEnd(ImmutableList.of(TEST_TOPIC_PARTITION));
-    Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
+    KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxyFactory,
+                                                                      kafkaSystemConsumerMetrics, new SystemClock());
+    kafkaSystemConsumer.register(testSystemStreamPartition1, testOffset);
+    kafkaSystemConsumer.register(testSystemStreamPartition2, testOffset);
 
-    // Invoke the consumer with startpoint.
-    kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
-
-    // Mock verifications.
-    Mockito.verify(consumer).seekToEnd(ImmutableList.of(TEST_TOPIC_PARTITION));
-    Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
-  }
-
-  @Test
-  public void testStartInvocationAfterStartPointsRegistrationShouldInvokeTheStartPointApplyMethod() {
-    // Initialize the constants required for the test.
-    final Consumer mockConsumer = Mockito.mock(Consumer.class);
-    final KafkaSystemConsumerMetrics kafkaSystemConsumerMetrics = new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry());
+    kafkaSystemConsumer.startConsumer();
 
-    // Test system stream partitions.
-    SystemStreamPartition testSystemStreamPartition1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
-    SystemStreamPartition testSystemStreamPartition2 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
-    SystemStreamPartition testSystemStreamPartition3 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(2));
-    SystemStreamPartition testSystemStreamPartition4 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(3));
-
-    // Different kinds of {@code Startpoint}.
-    StartpointSpecific startPointSpecific = new StartpointSpecific("100");
-    StartpointTimestamp startpointTimestamp = new StartpointTimestamp(100L);
-    StartpointOldest startpointOldest = new StartpointOldest();
-    StartpointUpcoming startpointUpcoming = new StartpointUpcoming();
-
-    // Mock the visit methods of KafkaStartpointRegistrationHandler.
-    KafkaSystemConsumer.KafkaStartpointRegistrationHandler mockStartPointVisitor = Mockito.mock(KafkaStartpointRegistrationHandler.class);
-    Mockito.doNothing().when(mockStartPointVisitor).visit(testSystemStreamPartition1, startPointSpecific);
-    Mockito.doNothing().when(mockStartPointVisitor).visit(testSystemStreamPartition2, startpointTimestamp);
-    Mockito.doNothing().when(mockStartPointVisitor).visit(testSystemStreamPartition3, startpointOldest);
-    Mockito.doNothing().when(mockStartPointVisitor).visit(testSystemStreamPartition4, startpointUpcoming);
-
-    // Instantiate KafkaSystemConsumer for testing.
-    KafkaConsumerProxy proxy = Mockito.mock(KafkaConsumerProxy.class);
-    KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(mockConsumer, TEST_SYSTEM, new MapConfig(),
-                                                                      TEST_CLIENT_ID, proxy, kafkaSystemConsumerMetrics, new TestClock(), mockStartPointVisitor);
-
-
-    // Invoke the KafkaSystemConsumer register API with different type of startpoints.
-    kafkaSystemConsumer.register(testSystemStreamPartition1, startPointSpecific);
-    kafkaSystemConsumer.register(testSystemStreamPartition2, startpointTimestamp);
-    kafkaSystemConsumer.register(testSystemStreamPartition3, startpointOldest);
-    kafkaSystemConsumer.register(testSystemStreamPartition4, startpointUpcoming);
-    kafkaSystemConsumer.start();
-
-    // Mock verifications.
-    Mockito.verify(mockStartPointVisitor).visit(testSystemStreamPartition1, startPointSpecific);
-    Mockito.verify(mockStartPointVisitor).visit(testSystemStreamPartition2, startpointTimestamp);
-    Mockito.verify(mockStartPointVisitor).visit(testSystemStreamPartition3, startpointOldest);
-    Mockito.verify(mockStartPointVisitor).visit(testSystemStreamPartition4, startpointUpcoming);
+    Mockito.verify(consumer).seek(new TopicPartition(TEST_STREAM, 0), 1);
+    Mockito.verify(consumer).seek(new TopicPartition(TEST_STREAM, 1), 1);
+    Mockito.verify(kafkaConsumerProxy).start();
+    Mockito.verify(kafkaConsumerProxy).addTopicPartition(testSystemStreamPartition1, Long.valueOf(testOffset));
+    Mockito.verify(kafkaConsumerProxy).addTopicPartition(testSystemStreamPartition2, Long.valueOf(testOffset));
   }
 
   // mock kafkaConsumer and SystemConsumer