You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/03/19 22:06:57 UTC

[samza] branch master updated: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. (#918)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 0967398  SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. (#918)
0967398 is described below

commit 0967398dae1825d8d72786910a7471f7b3f40346
Author: shanthoosh <sv...@linkedin.com>
AuthorDate: Tue Mar 19 15:06:53 2019 -0700

    SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. (#918)
    
    SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
---
 .../org/apache/samza/util/BlockingEnvelopeMap.java |  21 +-
 .../samza/system/kafka/KafkaSystemConsumer.java    | 215 +++++++++++++++------
 .../system/kafka/TestKafkaSystemConsumer.java      | 215 ++++++++++++++++++---
 3 files changed, 365 insertions(+), 86 deletions(-)

diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 0205a44..e284cef 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.Startpoint;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemStreamPartition;
@@ -48,7 +49,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * rather those extending Samza to consume from new types of stream providers
  * and other systems.
  * </p>
- * 
+ *
  * <p>
  * SystemConsumers that implement BlockingEnvelopeMap need to add messages using
  * {@link #put(org.apache.samza.system.SystemStreamPartition, org.apache.samza.system.IncomingMessageEnvelope) put}
@@ -97,7 +98,24 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
   /**
    * {@inheritDoc}
    */
+  @Override
   public void register(SystemStreamPartition systemStreamPartition, String offset) {
+    initializeInternalStateForSSP(systemStreamPartition);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
+    initializeInternalStateForSSP(systemStreamPartition);
+  }
+
+  /**
+   * Initializes the metrics and in-memory buffer for the {@param systemStreamPartition}.
+   * @param systemStreamPartition represents the input system stream partition.
+   */
+  private void initializeInternalStateForSSP(SystemStreamPartition systemStreamPartition) {
     metrics.initMetrics(systemStreamPartition);
     bufferedMessages.putIfAbsent(systemStreamPartition, newBlockingQueue());
     bufferedMessagesSize.putIfAbsent(systemStreamPartition, new AtomicLong(0));
@@ -110,6 +128,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
   /**
    * {@inheritDoc}
    */
+  @Override
   public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
     long stopTime = clock.currentTimeMillis() + timeout;
     Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messagesToReturn = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index ee51222..0cb2bb6 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -22,19 +22,33 @@
 
 package org.apache.samza.system.kafka;
 
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import kafka.common.TopicAndPartition;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.startpoint.Startpoint;
+import org.apache.samza.startpoint.StartpointOldest;
+import org.apache.samza.startpoint.StartpointSpecific;
+import org.apache.samza.startpoint.StartpointTimestamp;
+import org.apache.samza.startpoint.StartpointUpcoming;
+import org.apache.samza.startpoint.StartpointVisitor;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemStreamPartition;
@@ -44,7 +58,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
 
-
 public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements SystemConsumer {
 
   private static final Logger LOG = LoggerFactory.getLogger(KafkaSystemConsumer.class);
@@ -60,17 +73,17 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
   private final Config config;
   private final boolean fetchThresholdBytesEnabled;
   private final KafkaSystemConsumerMetrics metrics;
+  private final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler;
 
   // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap.
   final KafkaConsumerMessageSink messageSink;
 
   // This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates
   // BlockingEnvelopMap's buffers.
-  final private KafkaConsumerProxy<K, V> proxy;
+  private final KafkaConsumerProxy proxy;
 
-  // keep registration data until the start - mapping between registered SSPs and topicPartitions, and their offsets
-  final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
-  final Map<TopicPartition, SystemStreamPartition> topicPartitionsToSSP = new HashMap<>();
+  // Holds mapping from {@code TopicPartition} to registered {@code Startpoint}. This will be used in the start().
+  final Map<TopicPartition, Startpoint> topicPartitionToStartpointMap = new HashMap<>();
 
   long perPartitionFetchThreshold;
   long perPartitionFetchThresholdBytes;
@@ -98,12 +111,26 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
 
     // create a sink for passing the messages between the proxy and the consumer
-    messageSink = new KafkaConsumerMessageSink();
+    this.messageSink = new KafkaConsumerMessageSink();
 
     // Create the proxy to do the actual message reading.
-    String metricName = String.format("%s-%s", systemName, clientId);
     proxy = kafkaConsumerProxyFactory.create(this.messageSink);
     LOG.info("{}: Created proxy {} ", this, proxy);
+    this.kafkaStartpointRegistrationHandler = new KafkaStartpointRegistrationHandler(kafkaConsumer, proxy);
+  }
+
+  @VisibleForTesting
+  KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId,
+      KafkaConsumerProxy<K, V> kafkaConsumerProxy, KafkaSystemConsumerMetrics metrics, Clock clock, KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler) {
+    this.kafkaConsumer = kafkaConsumer;
+    this.clientId = clientId;
+    this.systemName = systemName;
+    this.config = config;
+    this.metrics = metrics;
+    this.proxy = kafkaConsumerProxy;
+    this.kafkaStartpointRegistrationHandler = kafkaStartpointRegistrationHandler;
+    this.messageSink = new KafkaConsumerMessageSink();
+    fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
   }
 
   /**
@@ -115,7 +142,6 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
    * @return KafkaConsumer newly created kafka consumer object
    */
   public static <K, V> KafkaConsumer<K, V> createKafkaConsumerImpl(String systemName, HashMap<String, Object> kafkaConsumerConfig) {
-
     LOG.info("Instantiating KafkaConsumer for systemName {} with properties {}", systemName, kafkaConsumerConfig);
     return new KafkaConsumer<>(kafkaConsumerConfig);
   }
@@ -141,11 +167,12 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
 
   private void startSubscription() {
     //subscribe to all the registered TopicPartitions
-    LOG.info("{}: Consumer subscribes to {}", this, topicPartitionsToSSP.keySet());
+    Set<TopicPartition> registeredTopicPartitions = new HashSet<>(topicPartitionToStartpointMap.keySet());
+    LOG.info("{}: Consumer subscribes to {}", this, registeredTopicPartitions);
     try {
       synchronized (kafkaConsumer) {
         // we are using assign (and not subscribe), so we need to specify both topic and partition
-        kafkaConsumer.assign(topicPartitionsToSSP.keySet());
+        kafkaConsumer.assign(registeredTopicPartitions);
       }
     } catch (Exception e) {
       throw new SamzaException("Consumer subscription failed for " + this, e);
@@ -159,30 +186,14 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
    */
   void startConsumer() {
     // set the offset for each TopicPartition
-    if (topicPartitionsToOffset.size() <= 0) {
+    if (topicPartitionToStartpointMap.size() <= 0) {
       LOG.error ("{}: Consumer is not subscribed to any SSPs", this);
     }
 
-    topicPartitionsToOffset.forEach((tp, startingOffsetString) -> {
-      long startingOffset = Long.valueOf(startingOffsetString);
-
-      try {
-        synchronized (kafkaConsumer) {
-          kafkaConsumer.seek(tp, startingOffset); // this value should already be the 'upcoming' value
-        }
-      } catch (Exception e) {
-        // all recoverable execptions are handled by the client.
-        // if we get here there is nothing left to do but bail out.
-        String msg =
-            String.format("%s: Got Exception while seeking to %s for partition %s", this, startingOffsetString, tp);
-        LOG.error(msg, e);
-        throw new SamzaException(msg, e);
-      }
-
-      LOG.info("{}: Changing consumer's starting offset for tp = {} to {}", this, tp, startingOffsetString);
-
-      // add the partition to the proxy
-      proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset);
+    topicPartitionToStartpointMap.forEach((topicPartition, startpoint) -> {
+      Partition partition = new Partition(topicPartition.partition());
+      SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, topicPartition.topic(), partition);
+      startpoint.apply(systemStreamPartition, kafkaStartpointRegistrationHandler);
     });
 
     // start the proxy thread
@@ -208,11 +219,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
       fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get());
     }
 
-    int numPartitions = topicPartitionsToSSP.size();
-    if (numPartitions != topicPartitionsToOffset.size()) {
-      throw new SamzaException("topicPartitionsToSSP.size() doesn't match topicPartitionsToOffset.size()");
-    }
-
+    int numPartitions = topicPartitionToStartpointMap.size();
 
     if (numPartitions > 0) {
       perPartitionFetchThreshold = fetchThreshold / numPartitions;
@@ -258,40 +265,28 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
    */
   @Override
   public void register(SystemStreamPartition systemStreamPartition, String offset) {
+    register(systemStreamPartition, new StartpointSpecific(offset));
+  }
+
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
     if (started.get()) {
-      String msg = String.format("%s: Trying to register partition after consumer has been started. ssp=%s", this,
-          systemStreamPartition);
-      throw new SamzaException(msg);
+      String exceptionMessage = String.format("KafkaSystemConsumer: %s had started. Registration of ssp: %s, startpoint: %s failed.", this, systemStreamPartition, startpoint);
+      throw new SamzaException(exceptionMessage);
     }
 
-    if (!systemStreamPartition.getSystem().equals(systemName)) {
+    if (!Objects.equals(systemStreamPartition.getSystem(), systemName)) {
       LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition);
       return;
     }
-    LOG.info("{}: Registering ssp = {} with offset {}", this, systemStreamPartition, offset);
-
-    super.register(systemStreamPartition, offset);
-
-    TopicPartition tp = toTopicPartition(systemStreamPartition);
-
-    topicPartitionsToSSP.put(tp, systemStreamPartition);
 
-    String existingOffset = topicPartitionsToOffset.get(tp);
-    // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages.
-    if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) {
-      topicPartitionsToOffset.put(tp, offset);
-    }
+    LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", systemStreamPartition, startpoint);
 
-    metrics.registerTopicAndPartition(toTopicAndPartition(tp));
-  }
+    super.register(systemStreamPartition, startpoint);
 
-  /**
-   * Compare two String offsets.
-   * Note. There is a method in KafkaSystemAdmin that does that, but that would require instantiation of systemadmin for each consumer.
-   * @return see {@link Long#compareTo(Long)}
-   */
-  private static int compareOffsets(String offset1, String offset2) {
-    return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
+    TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+    topicPartitionToStartpointMap.put(topicPartition, startpoint);
+    metrics.registerTopicAndPartition(toTopicAndPartition(topicPartition));
   }
 
   @Override
@@ -314,11 +309,11 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     return super.poll(systemStreamPartitions, timeout);
   }
 
-  public static TopicAndPartition toTopicAndPartition(TopicPartition tp) {
+  static TopicAndPartition toTopicAndPartition(TopicPartition tp) {
     return new TopicAndPartition(tp.topic(), tp.partition());
   }
 
-  public static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
+  static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
     return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
   }
 
@@ -330,6 +325,100 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
     return systemName;
   }
 
+  @VisibleForTesting
+  static class KafkaStartpointRegistrationHandler implements StartpointVisitor {
+
+    private final KafkaConsumerProxy proxy;
+    private final Consumer kafkaConsumer;
+
+    KafkaStartpointRegistrationHandler(Consumer consumer, KafkaConsumerProxy proxy) {
+      this.proxy = proxy;
+      this.kafkaConsumer = consumer;
+    }
+
+    @Override
+    public void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) {
+      TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+      long offsetInStartpoint = Long.parseLong(startpointSpecific.getSpecificOffset());
+      LOG.info("Updating the consumer fetch offsets of topic partition: {} to {}.", topicPartition, offsetInStartpoint);
+
+      // KafkaConsumer is not thread-safe.
+      synchronized (kafkaConsumer) {
+        kafkaConsumer.seek(topicPartition, offsetInStartpoint);
+
+        // add the partition to the proxy
+        proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
+      }
+    }
+
+    @Override
+    public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
+      Long timestampInStartpoint = startpointTimestamp.getTimestampOffset();
+      TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+      Map<TopicPartition, Long> topicPartitionsToTimeStamps = ImmutableMap.of(topicPartition, timestampInStartpoint);
+
+      // Look up the offset by timestamp.
+      LOG.info("Looking up the offsets of the topic partition: {} by timestamp: {}.", topicPartition, timestampInStartpoint);
+      Map<TopicPartition, OffsetAndTimestamp> topicPartitionToOffsetTimestamps = new HashMap<>();
+      synchronized (kafkaConsumer) {
+        topicPartitionToOffsetTimestamps = kafkaConsumer.offsetsForTimes(topicPartitionsToTimeStamps);
+      }
+
+      // If the timestamp does not exist for the partition, then seek the consumer to end.
+      if (topicPartitionToOffsetTimestamps.get(topicPartition) == null) {
+        LOG.info("Timestamp does not exist for partition: {}. Seeking the kafka consumer to the end offset.", topicPartition);
+
+        // KafkaConsumer is not thread-safe.
+        synchronized (kafkaConsumer) {
+          kafkaConsumer.seekToEnd(ImmutableList.of(topicPartition));
+
+          // add the partition to the proxy
+          proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
+        }
+      } else {
+
+        // KafkaConsumer is not thread-safe.
+        synchronized (kafkaConsumer) {
+          // Update the consumer fetch offsets.
+          OffsetAndTimestamp offsetAndTimeStamp = topicPartitionToOffsetTimestamps.get(topicPartition);
+          LOG.info("Updating the consumer fetch offsets of the topic partition: {} to {}.", topicPartition, offsetAndTimeStamp.offset());
+          kafkaConsumer.seek(topicPartition, offsetAndTimeStamp.offset());
+
+          // add the partition to the proxy
+          proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
+        }
+      }
+    }
+
+    @Override
+    public void visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
+      TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+      Collection<TopicPartition> topicPartitions = ImmutableList.of(topicPartition);
+      LOG.info("Seeking the kafka consumer to the first offset for the topic partition: {}.", topicPartitions);
+
+      // KafkaConsumer is not thread-safe.
+      synchronized (kafkaConsumer) {
+        kafkaConsumer.seekToBeginning(topicPartitions);
+        // add the partition to the proxy
+        proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
+      }
+    }
+
+    @Override
+    public void visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
+      TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+      Collection<TopicPartition> topicPartitions = ImmutableList.of(topicPartition);
+      LOG.info("Seeking the kafka consumer to the end offset for the topic partition: {}.", topicPartitions);
+
+      // KafkaConsumer is not thread-safe.
+      synchronized (kafkaConsumer) {
+        kafkaConsumer.seekToEnd(topicPartitions);
+        // add the partition to the proxy
+        proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
+      }
+    }
+  }
+
   public class KafkaConsumerMessageSink {
 
     public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean isAtHighWatermark) {
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
index e66c374..91444c7 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
@@ -21,11 +21,15 @@
 
 package org.apache.samza.system.kafka;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
@@ -33,25 +37,37 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.KafkaConfig;
 import org.apache.samza.config.KafkaConsumerConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.startpoint.StartpointOldest;
+import org.apache.samza.startpoint.StartpointSpecific;
+import org.apache.samza.startpoint.StartpointTimestamp;
+import org.apache.samza.startpoint.StartpointUpcoming;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.kafka.KafkaSystemConsumer.KafkaStartpointRegistrationHandler;
+import org.apache.samza.testUtils.TestClock;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.mock;
 
-
 public class TestKafkaSystemConsumer {
-  public final String TEST_SYSTEM = "test-system";
-  public final String TEST_STREAM = "test-stream";
-  public final String TEST_JOB = "test-job";
-  public final String TEST_PREFIX_ID = "testClientId";
-  public final String BOOTSTRAP_SERVER = "127.0.0.1:8888";
-  public final String FETCH_THRESHOLD_MSGS = "50000";
-  public final String FETCH_THRESHOLD_BYTES = "100000";
+  private static final String TEST_SYSTEM = "test-system";
+  private static final String TEST_STREAM = "test-stream";
+  private static final String TEST_JOB = "test-job";
+  private static final String TEST_CLIENT_ID = "testClientId";
+  private static final String BOOTSTRAP_SERVER = "127.0.0.1:8888";
+  private static final String FETCH_THRESHOLD_MSGS = "50000";
+  private static final String FETCH_THRESHOLD_BYTES = "100000";
+
+  private static final Integer TEST_PARTITION_ID = 0;
+  private static final TopicPartition TEST_TOPIC_PARTITION = new TopicPartition(TEST_STREAM, TEST_PARTITION_ID);
+  private static final Partition TEST_PARTITION = new Partition(TEST_PARTITION_ID);
+  private static final SystemStreamPartition TEST_SYSTEM_STREAM_PARTITION = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, TEST_PARTITION);
+  private static final String TEST_OFFSET = "10";
 
   private KafkaSystemConsumer createConsumer(String fetchMsg, String fetchBytes) {
     final Map<String, String> map = new HashMap<>();
@@ -65,14 +81,14 @@ public class TestKafkaSystemConsumer {
     map.put(JobConfig.JOB_NAME(), "jobName");
 
     Config config = new MapConfig(map);
-    String clientId = KafkaConsumerConfig.createClientId(TEST_PREFIX_ID, config);
+    String clientId = KafkaConsumerConfig.createClientId(TEST_CLIENT_ID, config);
     KafkaConsumerConfig consumerConfig =
         KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, clientId);
 
     final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig);
 
     MockKafkaSystemConsumer newKafkaSystemConsumer =
-        new MockKafkaSystemConsumer(kafkaConsumer, TEST_SYSTEM, config, TEST_PREFIX_ID,
+        new MockKafkaSystemConsumer(kafkaConsumer, TEST_SYSTEM, config, TEST_CLIENT_ID,
             new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry()), System::currentTimeMillis);
 
     return newKafkaSystemConsumer;
@@ -105,8 +121,7 @@ public class TestKafkaSystemConsumer {
   }
 
   @Test
-  public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() {
-
+  public void testConsumerShouldRegisterTheLatestOffsetForSSP() {
     KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
 
     SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
@@ -119,14 +134,15 @@ public class TestKafkaSystemConsumer {
     consumer.register(ssp1, "3");
     consumer.register(ssp2, "0");
 
-    assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp0)));
-    assertEquals("2", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp1)));
-    assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp2)));
+    consumer.start();
+
+    assertEquals("5", ((StartpointSpecific) consumer.topicPartitionToStartpointMap.get(KafkaSystemConsumer.toTopicPartition(ssp0))).getSpecificOffset());
+    assertEquals("3", ((StartpointSpecific) consumer.topicPartitionToStartpointMap.get(KafkaSystemConsumer.toTopicPartition(ssp1))).getSpecificOffset());
+    assertEquals("0", ((StartpointSpecific) consumer.topicPartitionToStartpointMap.get(KafkaSystemConsumer.toTopicPartition(ssp2))).getSpecificOffset());
   }
 
   @Test
   public void testFetchThresholdBytes() {
-
     SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
     SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
     int partitionsNum = 2;
@@ -147,13 +163,13 @@ public class TestKafkaSystemConsumer {
     consumer.start();
     consumer.messageSink.addMessage(ssp0, ime0);
     // queue for ssp0 should be full now, because we added message of size FETCH_THRESHOLD_MSGS/partitionsNum
-    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp0));
+    Assert.assertFalse(consumer.messageSink.needsMoreMessages(ssp0));
     consumer.messageSink.addMessage(ssp1, ime1);
     // queue for ssp1 should be less then full now, because we added message of size (FETCH_THRESHOLD_MSGS/partitionsNum - 1)
-    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1));
+    Assert.assertTrue(consumer.messageSink.needsMoreMessages(ssp1));
     consumer.messageSink.addMessage(ssp1, ime11);
     // queue for ssp1 should full now, because we added message of size 20 on top
-    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1));
+    Assert.assertFalse(consumer.messageSink.needsMoreMessages(ssp1));
 
     Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0));
     Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
@@ -190,13 +206,13 @@ public class TestKafkaSystemConsumer {
     consumer.start();
     consumer.messageSink.addMessage(ssp0, ime0);
     // should be full by size, but not full by number of messages (1 of 2)
-    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp0));
+    Assert.assertTrue(consumer.messageSink.needsMoreMessages(ssp0));
     consumer.messageSink.addMessage(ssp1, ime1);
     // not full neither by size nor by messages
-    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1));
+    Assert.assertTrue(consumer.messageSink.needsMoreMessages(ssp1));
     consumer.messageSink.addMessage(ssp1, ime11);
     // not full by size, but should be full by messages
-    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1));
+    Assert.assertFalse(consumer.messageSink.needsMoreMessages(ssp1));
 
     Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0));
     Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
@@ -206,6 +222,161 @@ public class TestKafkaSystemConsumer {
     consumer.stop();
   }
 
+  @Test
+  public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+    final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+    final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
+    final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler =  new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
+
+    final StartpointSpecific testStartpointSpecific = new StartpointSpecific(TEST_OFFSET);
+
+    // Mock the consumer interactions.
+    Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET));
+    Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
+
+    // Invoke the consumer with startpoint.
+    kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
+
+    // Mock verifications.
+    Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET));
+    Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
+  }
+
+  @Test
+  public void testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() {
+    // Define dummy variables for testing.
+    final Long testTimeStamp = 10L;
+
+    final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+    final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
+    final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler =  new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
+
+    final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp);
+    final Map<TopicPartition, OffsetAndTimestamp> offsetForTimesResult = ImmutableMap.of(
+        TEST_TOPIC_PARTITION, new OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp));
+
+    // Mock the consumer interactions.
+    Mockito.when(consumer.offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, testTimeStamp))).thenReturn(offsetForTimesResult);
+    Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET));
+    Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
+
+    kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp);
+
+    // Mock verifications.
+    Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET));
+    Mockito.verify(consumer).offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, testTimeStamp));
+    Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
+  }
+
+  @Test
+  public void testStartpointTimestampVisitorShouldMoveTheConsumerToEndWhenTimestampDoesNotExist() {
+    final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+    final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
+    final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler =  new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
+
+    final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(0L);
+    final Map<TopicPartition, OffsetAndTimestamp> offsetForTimesResult = new HashMap<>();
+    offsetForTimesResult.put(TEST_TOPIC_PARTITION, null);
+
+    // Mock the consumer interactions.
+    Mockito.when(consumer.offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, 0L))).thenReturn(offsetForTimesResult);
+    Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
+
+    kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp);
+
+    // Mock verifications.
+    Mockito.verify(consumer).seekToEnd(ImmutableList.of(TEST_TOPIC_PARTITION));
+    Mockito.verify(consumer).offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, 0L));
+    Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
+  }
+
+  @Test
+  public void testStartpointOldestVisitorShouldUpdateTheFetchOffsetInConsumer() {
+    // Define dummy variables for testing.
+    final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+    final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
+    final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler =  new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
+
+    final StartpointOldest testStartpointSpecific = new StartpointOldest();
+
+    // Mock the consumer interactions.
+    Mockito.doNothing().when(consumer).seekToBeginning(ImmutableList.of(TEST_TOPIC_PARTITION));
+    Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
+
+    // Invoke the consumer with startpoint.
+    kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
+
+    // Mock verifications.
+    Mockito.verify(consumer).seekToBeginning(ImmutableList.of(TEST_TOPIC_PARTITION));
+    Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
+  }
+
+  @Test
+  public void testStartpointUpcomingVisitorShouldUpdateTheFetchOffsetInConsumer() {
+    // Define dummy variables for testing.
+    final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+    final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
+    final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler =  new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
+
+    final StartpointUpcoming testStartpointSpecific = new StartpointUpcoming();
+
+    // Mock the consumer interactions.
+    Mockito.doNothing().when(consumer).seekToEnd(ImmutableList.of(TEST_TOPIC_PARTITION));
+    Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
+
+    // Invoke the consumer with startpoint.
+    kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
+
+    // Mock verifications.
+    Mockito.verify(consumer).seekToEnd(ImmutableList.of(TEST_TOPIC_PARTITION));
+    Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
+  }
+
+  @Test
+  public void testStartInvocationAfterStartPointsRegistrationShouldInvokeTheStartPointApplyMethod() {
+    // Initialize the constants required for the test.
+    final Consumer mockConsumer = Mockito.mock(Consumer.class);
+    final KafkaSystemConsumerMetrics kafkaSystemConsumerMetrics = new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry());
+
+    // Test system stream partitions.
+    SystemStreamPartition testSystemStreamPartition1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
+    SystemStreamPartition testSystemStreamPartition2 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
+    SystemStreamPartition testSystemStreamPartition3 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(2));
+    SystemStreamPartition testSystemStreamPartition4 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(3));
+
+    // Different kinds of {@code Startpoint}.
+    StartpointSpecific startPointSpecific = new StartpointSpecific("100");
+    StartpointTimestamp startpointTimestamp = new StartpointTimestamp(100L);
+    StartpointOldest startpointOldest = new StartpointOldest();
+    StartpointUpcoming startpointUpcoming = new StartpointUpcoming();
+
+    // Mock the visit methods of KafkaStartpointRegistrationHandler.
+    KafkaSystemConsumer.KafkaStartpointRegistrationHandler mockStartPointVisitor = Mockito.mock(KafkaStartpointRegistrationHandler.class);
+    Mockito.doNothing().when(mockStartPointVisitor).visit(testSystemStreamPartition1, startPointSpecific);
+    Mockito.doNothing().when(mockStartPointVisitor).visit(testSystemStreamPartition2, startpointTimestamp);
+    Mockito.doNothing().when(mockStartPointVisitor).visit(testSystemStreamPartition3, startpointOldest);
+    Mockito.doNothing().when(mockStartPointVisitor).visit(testSystemStreamPartition4, startpointUpcoming);
+
+    // Instantiate KafkaSystemConsumer for testing.
+    KafkaConsumerProxy proxy = Mockito.mock(KafkaConsumerProxy.class);
+    KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(mockConsumer, TEST_SYSTEM, new MapConfig(),
+                                                                      TEST_CLIENT_ID, proxy, kafkaSystemConsumerMetrics, new TestClock(), mockStartPointVisitor);
+
+
+    // Invoke the KafkaSystemConsumer register API with different type of startpoints.
+    kafkaSystemConsumer.register(testSystemStreamPartition1, startPointSpecific);
+    kafkaSystemConsumer.register(testSystemStreamPartition2, startpointTimestamp);
+    kafkaSystemConsumer.register(testSystemStreamPartition3, startpointOldest);
+    kafkaSystemConsumer.register(testSystemStreamPartition4, startpointUpcoming);
+    kafkaSystemConsumer.start();
+
+    // Mock verifications.
+    Mockito.verify(mockStartPointVisitor).visit(testSystemStreamPartition1, startPointSpecific);
+    Mockito.verify(mockStartPointVisitor).visit(testSystemStreamPartition2, startpointTimestamp);
+    Mockito.verify(mockStartPointVisitor).visit(testSystemStreamPartition3, startpointOldest);
+    Mockito.verify(mockStartPointVisitor).visit(testSystemStreamPartition4, startpointUpcoming);
+  }
+
   // mock kafkaConsumer and SystemConsumer
   static class MockKafkaConsumer extends KafkaConsumer {
     public MockKafkaConsumer(Map<String, Object> configs) {