You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/03/19 22:06:57 UTC
[samza] branch master updated: SAMZA-2094: Implement the
StartpointVisitor for the KafkaSystemConsumer. (#918)
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 0967398 SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. (#918)
0967398 is described below
commit 0967398dae1825d8d72786910a7471f7b3f40346
Author: shanthoosh <sv...@linkedin.com>
AuthorDate: Tue Mar 19 15:06:53 2019 -0700
SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. (#918)
SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
---
.../org/apache/samza/util/BlockingEnvelopeMap.java | 21 +-
.../samza/system/kafka/KafkaSystemConsumer.java | 215 +++++++++++++++------
.../system/kafka/TestKafkaSystemConsumer.java | 215 ++++++++++++++++++---
3 files changed, 365 insertions(+), 86 deletions(-)
diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 0205a44..e284cef 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.startpoint.Startpoint;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;
@@ -48,7 +49,7 @@ import java.util.concurrent.atomic.AtomicLong;
* rather those extending Samza to consume from new types of stream providers
* and other systems.
* </p>
- *
+ *
* <p>
* SystemConsumers that implement BlockingEnvelopeMap need to add messages using
* {@link #put(org.apache.samza.system.SystemStreamPartition, org.apache.samza.system.IncomingMessageEnvelope) put}
@@ -97,7 +98,24 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
/**
* {@inheritDoc}
*/
+ @Override
public void register(SystemStreamPartition systemStreamPartition, String offset) {
+ initializeInternalStateForSSP(systemStreamPartition);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
+ initializeInternalStateForSSP(systemStreamPartition);
+ }
+
+ /**
+ * Initializes the metrics and in-memory buffer for the {@param systemStreamPartition}.
+ * @param systemStreamPartition represents the input system stream partition.
+ */
+ private void initializeInternalStateForSSP(SystemStreamPartition systemStreamPartition) {
metrics.initMetrics(systemStreamPartition);
bufferedMessages.putIfAbsent(systemStreamPartition, newBlockingQueue());
bufferedMessagesSize.putIfAbsent(systemStreamPartition, new AtomicLong(0));
@@ -110,6 +128,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
/**
* {@inheritDoc}
*/
+ @Override
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
long stopTime = clock.currentTimeMillis() + timeout;
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messagesToReturn = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index ee51222..0cb2bb6 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -22,19 +22,33 @@
package org.apache.samza.system.kafka;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import kafka.common.TopicAndPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
+import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.startpoint.Startpoint;
+import org.apache.samza.startpoint.StartpointOldest;
+import org.apache.samza.startpoint.StartpointSpecific;
+import org.apache.samza.startpoint.StartpointTimestamp;
+import org.apache.samza.startpoint.StartpointUpcoming;
+import org.apache.samza.startpoint.StartpointVisitor;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;
@@ -44,7 +58,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
-
public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements SystemConsumer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSystemConsumer.class);
@@ -60,17 +73,17 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
private final Config config;
private final boolean fetchThresholdBytesEnabled;
private final KafkaSystemConsumerMetrics metrics;
+ private final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler;
// This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap.
final KafkaConsumerMessageSink messageSink;
// This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates
// BlockingEnvelopMap's buffers.
- final private KafkaConsumerProxy<K, V> proxy;
+ private final KafkaConsumerProxy proxy;
- // keep registration data until the start - mapping between registered SSPs and topicPartitions, and their offsets
- final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
- final Map<TopicPartition, SystemStreamPartition> topicPartitionsToSSP = new HashMap<>();
+ // Holds mapping from {@code TopicPartition} to registered {@code Startpoint}. This will be used in the start().
+ final Map<TopicPartition, Startpoint> topicPartitionToStartpointMap = new HashMap<>();
long perPartitionFetchThreshold;
long perPartitionFetchThresholdBytes;
@@ -98,12 +111,26 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
// create a sink for passing the messages between the proxy and the consumer
- messageSink = new KafkaConsumerMessageSink();
+ this.messageSink = new KafkaConsumerMessageSink();
// Create the proxy to do the actual message reading.
- String metricName = String.format("%s-%s", systemName, clientId);
proxy = kafkaConsumerProxyFactory.create(this.messageSink);
LOG.info("{}: Created proxy {} ", this, proxy);
+ this.kafkaStartpointRegistrationHandler = new KafkaStartpointRegistrationHandler(kafkaConsumer, proxy);
+ }
+
+ @VisibleForTesting
+ KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId,
+ KafkaConsumerProxy<K, V> kafkaConsumerProxy, KafkaSystemConsumerMetrics metrics, Clock clock, KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler) {
+ this.kafkaConsumer = kafkaConsumer;
+ this.clientId = clientId;
+ this.systemName = systemName;
+ this.config = config;
+ this.metrics = metrics;
+ this.proxy = kafkaConsumerProxy;
+ this.kafkaStartpointRegistrationHandler = kafkaStartpointRegistrationHandler;
+ this.messageSink = new KafkaConsumerMessageSink();
+ fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
}
/**
@@ -115,7 +142,6 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
* @return KafkaConsumer newly created kafka consumer object
*/
public static <K, V> KafkaConsumer<K, V> createKafkaConsumerImpl(String systemName, HashMap<String, Object> kafkaConsumerConfig) {
-
LOG.info("Instantiating KafkaConsumer for systemName {} with properties {}", systemName, kafkaConsumerConfig);
return new KafkaConsumer<>(kafkaConsumerConfig);
}
@@ -141,11 +167,12 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
private void startSubscription() {
//subscribe to all the registered TopicPartitions
- LOG.info("{}: Consumer subscribes to {}", this, topicPartitionsToSSP.keySet());
+ Set<TopicPartition> registeredTopicPartitions = new HashSet<>(topicPartitionToStartpointMap.keySet());
+ LOG.info("{}: Consumer subscribes to {}", this, registeredTopicPartitions);
try {
synchronized (kafkaConsumer) {
// we are using assign (and not subscribe), so we need to specify both topic and partition
- kafkaConsumer.assign(topicPartitionsToSSP.keySet());
+ kafkaConsumer.assign(registeredTopicPartitions);
}
} catch (Exception e) {
throw new SamzaException("Consumer subscription failed for " + this, e);
@@ -159,30 +186,14 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
*/
void startConsumer() {
// set the offset for each TopicPartition
- if (topicPartitionsToOffset.size() <= 0) {
+ if (topicPartitionToStartpointMap.size() <= 0) {
LOG.error ("{}: Consumer is not subscribed to any SSPs", this);
}
- topicPartitionsToOffset.forEach((tp, startingOffsetString) -> {
- long startingOffset = Long.valueOf(startingOffsetString);
-
- try {
- synchronized (kafkaConsumer) {
- kafkaConsumer.seek(tp, startingOffset); // this value should already be the 'upcoming' value
- }
- } catch (Exception e) {
- // all recoverable execptions are handled by the client.
- // if we get here there is nothing left to do but bail out.
- String msg =
- String.format("%s: Got Exception while seeking to %s for partition %s", this, startingOffsetString, tp);
- LOG.error(msg, e);
- throw new SamzaException(msg, e);
- }
-
- LOG.info("{}: Changing consumer's starting offset for tp = {} to {}", this, tp, startingOffsetString);
-
- // add the partition to the proxy
- proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset);
+ topicPartitionToStartpointMap.forEach((topicPartition, startpoint) -> {
+ Partition partition = new Partition(topicPartition.partition());
+ SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, topicPartition.topic(), partition);
+ startpoint.apply(systemStreamPartition, kafkaStartpointRegistrationHandler);
});
// start the proxy thread
@@ -208,11 +219,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get());
}
- int numPartitions = topicPartitionsToSSP.size();
- if (numPartitions != topicPartitionsToOffset.size()) {
- throw new SamzaException("topicPartitionsToSSP.size() doesn't match topicPartitionsToOffset.size()");
- }
-
+ int numPartitions = topicPartitionToStartpointMap.size();
if (numPartitions > 0) {
perPartitionFetchThreshold = fetchThreshold / numPartitions;
@@ -258,40 +265,28 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
*/
@Override
public void register(SystemStreamPartition systemStreamPartition, String offset) {
+ register(systemStreamPartition, new StartpointSpecific(offset));
+ }
+
+ @Override
+ public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
if (started.get()) {
- String msg = String.format("%s: Trying to register partition after consumer has been started. ssp=%s", this,
- systemStreamPartition);
- throw new SamzaException(msg);
+ String exceptionMessage = String.format("KafkaSystemConsumer: %s had started. Registration of ssp: %s, startpoint: %s failed.", this, systemStreamPartition, startpoint);
+ throw new SamzaException(exceptionMessage);
}
- if (!systemStreamPartition.getSystem().equals(systemName)) {
+ if (!Objects.equals(systemStreamPartition.getSystem(), systemName)) {
LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition);
return;
}
- LOG.info("{}: Registering ssp = {} with offset {}", this, systemStreamPartition, offset);
-
- super.register(systemStreamPartition, offset);
-
- TopicPartition tp = toTopicPartition(systemStreamPartition);
-
- topicPartitionsToSSP.put(tp, systemStreamPartition);
- String existingOffset = topicPartitionsToOffset.get(tp);
- // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages.
- if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) {
- topicPartitionsToOffset.put(tp, offset);
- }
+ LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", systemStreamPartition, startpoint);
- metrics.registerTopicAndPartition(toTopicAndPartition(tp));
- }
+ super.register(systemStreamPartition, startpoint);
- /**
- * Compare two String offsets.
- * Note. There is a method in KafkaSystemAdmin that does that, but that would require instantiation of systemadmin for each consumer.
- * @return see {@link Long#compareTo(Long)}
- */
- private static int compareOffsets(String offset1, String offset2) {
- return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
+ TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+ topicPartitionToStartpointMap.put(topicPartition, startpoint);
+ metrics.registerTopicAndPartition(toTopicAndPartition(topicPartition));
}
@Override
@@ -314,11 +309,11 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
return super.poll(systemStreamPartitions, timeout);
}
- public static TopicAndPartition toTopicAndPartition(TopicPartition tp) {
+ static TopicAndPartition toTopicAndPartition(TopicPartition tp) {
return new TopicAndPartition(tp.topic(), tp.partition());
}
- public static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
+ static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
}
@@ -330,6 +325,100 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
return systemName;
}
+ @VisibleForTesting
+ static class KafkaStartpointRegistrationHandler implements StartpointVisitor {
+
+ private final KafkaConsumerProxy proxy;
+ private final Consumer kafkaConsumer;
+
+ KafkaStartpointRegistrationHandler(Consumer consumer, KafkaConsumerProxy proxy) {
+ this.proxy = proxy;
+ this.kafkaConsumer = consumer;
+ }
+
+ @Override
+ public void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) {
+ TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+ long offsetInStartpoint = Long.parseLong(startpointSpecific.getSpecificOffset());
+ LOG.info("Updating the consumer fetch offsets of topic partition: {} to {}.", topicPartition, offsetInStartpoint);
+
+ // KafkaConsumer is not thread-safe.
+ synchronized (kafkaConsumer) {
+ kafkaConsumer.seek(topicPartition, offsetInStartpoint);
+
+ // add the partition to the proxy
+ proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
+ }
+ }
+
+ @Override
+ public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
+ Long timestampInStartpoint = startpointTimestamp.getTimestampOffset();
+ TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+ Map<TopicPartition, Long> topicPartitionsToTimeStamps = ImmutableMap.of(topicPartition, timestampInStartpoint);
+
+ // Look up the offset by timestamp.
+ LOG.info("Looking up the offsets of the topic partition: {} by timestamp: {}.", topicPartition, timestampInStartpoint);
+ Map<TopicPartition, OffsetAndTimestamp> topicPartitionToOffsetTimestamps = new HashMap<>();
+ synchronized (kafkaConsumer) {
+ topicPartitionToOffsetTimestamps = kafkaConsumer.offsetsForTimes(topicPartitionsToTimeStamps);
+ }
+
+ // If the timestamp does not exist for the partition, then seek the consumer to end.
+ if (topicPartitionToOffsetTimestamps.get(topicPartition) == null) {
+ LOG.info("Timestamp does not exist for partition: {}. Seeking the kafka consumer to the end offset.", topicPartition);
+
+ // KafkaConsumer is not thread-safe.
+ synchronized (kafkaConsumer) {
+ kafkaConsumer.seekToEnd(ImmutableList.of(topicPartition));
+
+ // add the partition to the proxy
+ proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
+ }
+ } else {
+
+ // KafkaConsumer is not thread-safe.
+ synchronized (kafkaConsumer) {
+ // Update the consumer fetch offsets.
+ OffsetAndTimestamp offsetAndTimeStamp = topicPartitionToOffsetTimestamps.get(topicPartition);
+ LOG.info("Updating the consumer fetch offsets of the topic partition: {} to {}.", topicPartition, offsetAndTimeStamp.offset());
+ kafkaConsumer.seek(topicPartition, offsetAndTimeStamp.offset());
+
+ // add the partition to the proxy
+ proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
+ }
+ }
+ }
+
+ @Override
+ public void visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
+ TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+ Collection<TopicPartition> topicPartitions = ImmutableList.of(topicPartition);
+ LOG.info("Seeking the kafka consumer to the first offset for the topic partition: {}.", topicPartitions);
+
+ // KafkaConsumer is not thread-safe.
+ synchronized (kafkaConsumer) {
+ kafkaConsumer.seekToBeginning(topicPartitions);
+ // add the partition to the proxy
+ proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
+ }
+ }
+
+ @Override
+ public void visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
+ TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+ Collection<TopicPartition> topicPartitions = ImmutableList.of(topicPartition);
+ LOG.info("Seeking the kafka consumer to the end offset for the topic partition: {}.", topicPartitions);
+
+ // KafkaConsumer is not thread-safe.
+ synchronized (kafkaConsumer) {
+ kafkaConsumer.seekToEnd(topicPartitions);
+ // add the partition to the proxy
+ proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition));
+ }
+ }
+ }
+
public class KafkaConsumerMessageSink {
public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean isAtHighWatermark) {
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
index e66c374..91444c7 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
@@ -21,11 +21,15 @@
package org.apache.samza.system.kafka;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.samza.Partition;
import org.apache.samza.config.Config;
@@ -33,25 +37,37 @@ import org.apache.samza.config.JobConfig;
import org.apache.samza.config.KafkaConfig;
import org.apache.samza.config.KafkaConsumerConfig;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.startpoint.StartpointOldest;
+import org.apache.samza.startpoint.StartpointSpecific;
+import org.apache.samza.startpoint.StartpointTimestamp;
+import org.apache.samza.startpoint.StartpointUpcoming;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.kafka.KafkaSystemConsumer.KafkaStartpointRegistrationHandler;
+import org.apache.samza.testUtils.TestClock;
import org.apache.samza.util.Clock;
import org.apache.samza.util.NoOpMetricsRegistry;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
-
public class TestKafkaSystemConsumer {
- public final String TEST_SYSTEM = "test-system";
- public final String TEST_STREAM = "test-stream";
- public final String TEST_JOB = "test-job";
- public final String TEST_PREFIX_ID = "testClientId";
- public final String BOOTSTRAP_SERVER = "127.0.0.1:8888";
- public final String FETCH_THRESHOLD_MSGS = "50000";
- public final String FETCH_THRESHOLD_BYTES = "100000";
+ private static final String TEST_SYSTEM = "test-system";
+ private static final String TEST_STREAM = "test-stream";
+ private static final String TEST_JOB = "test-job";
+ private static final String TEST_CLIENT_ID = "testClientId";
+ private static final String BOOTSTRAP_SERVER = "127.0.0.1:8888";
+ private static final String FETCH_THRESHOLD_MSGS = "50000";
+ private static final String FETCH_THRESHOLD_BYTES = "100000";
+
+ private static final Integer TEST_PARTITION_ID = 0;
+ private static final TopicPartition TEST_TOPIC_PARTITION = new TopicPartition(TEST_STREAM, TEST_PARTITION_ID);
+ private static final Partition TEST_PARTITION = new Partition(TEST_PARTITION_ID);
+ private static final SystemStreamPartition TEST_SYSTEM_STREAM_PARTITION = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, TEST_PARTITION);
+ private static final String TEST_OFFSET = "10";
private KafkaSystemConsumer createConsumer(String fetchMsg, String fetchBytes) {
final Map<String, String> map = new HashMap<>();
@@ -65,14 +81,14 @@ public class TestKafkaSystemConsumer {
map.put(JobConfig.JOB_NAME(), "jobName");
Config config = new MapConfig(map);
- String clientId = KafkaConsumerConfig.createClientId(TEST_PREFIX_ID, config);
+ String clientId = KafkaConsumerConfig.createClientId(TEST_CLIENT_ID, config);
KafkaConsumerConfig consumerConfig =
KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, clientId);
final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig);
MockKafkaSystemConsumer newKafkaSystemConsumer =
- new MockKafkaSystemConsumer(kafkaConsumer, TEST_SYSTEM, config, TEST_PREFIX_ID,
+ new MockKafkaSystemConsumer(kafkaConsumer, TEST_SYSTEM, config, TEST_CLIENT_ID,
new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry()), System::currentTimeMillis);
return newKafkaSystemConsumer;
@@ -105,8 +121,7 @@ public class TestKafkaSystemConsumer {
}
@Test
- public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() {
-
+ public void testConsumerShouldRegisterTheLatestOffsetForSSP() {
KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
@@ -119,14 +134,15 @@ public class TestKafkaSystemConsumer {
consumer.register(ssp1, "3");
consumer.register(ssp2, "0");
- assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp0)));
- assertEquals("2", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp1)));
- assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp2)));
+ consumer.start();
+
+ assertEquals("5", ((StartpointSpecific) consumer.topicPartitionToStartpointMap.get(KafkaSystemConsumer.toTopicPartition(ssp0))).getSpecificOffset());
+ assertEquals("3", ((StartpointSpecific) consumer.topicPartitionToStartpointMap.get(KafkaSystemConsumer.toTopicPartition(ssp1))).getSpecificOffset());
+ assertEquals("0", ((StartpointSpecific) consumer.topicPartitionToStartpointMap.get(KafkaSystemConsumer.toTopicPartition(ssp2))).getSpecificOffset());
}
@Test
public void testFetchThresholdBytes() {
-
SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
int partitionsNum = 2;
@@ -147,13 +163,13 @@ public class TestKafkaSystemConsumer {
consumer.start();
consumer.messageSink.addMessage(ssp0, ime0);
// queue for ssp0 should be full now, because we added message of size FETCH_THRESHOLD_MSGS/partitionsNum
- Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp0));
+ Assert.assertFalse(consumer.messageSink.needsMoreMessages(ssp0));
consumer.messageSink.addMessage(ssp1, ime1);
// queue for ssp1 should be less then full now, because we added message of size (FETCH_THRESHOLD_MSGS/partitionsNum - 1)
- Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1));
+ Assert.assertTrue(consumer.messageSink.needsMoreMessages(ssp1));
consumer.messageSink.addMessage(ssp1, ime11);
// queue for ssp1 should full now, because we added message of size 20 on top
- Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1));
+ Assert.assertFalse(consumer.messageSink.needsMoreMessages(ssp1));
Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0));
Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
@@ -190,13 +206,13 @@ public class TestKafkaSystemConsumer {
consumer.start();
consumer.messageSink.addMessage(ssp0, ime0);
// should be full by size, but not full by number of messages (1 of 2)
- Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp0));
+ Assert.assertTrue(consumer.messageSink.needsMoreMessages(ssp0));
consumer.messageSink.addMessage(ssp1, ime1);
// not full neither by size nor by messages
- Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1));
+ Assert.assertTrue(consumer.messageSink.needsMoreMessages(ssp1));
consumer.messageSink.addMessage(ssp1, ime11);
// not full by size, but should be full by messages
- Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1));
+ Assert.assertFalse(consumer.messageSink.needsMoreMessages(ssp1));
Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0));
Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
@@ -206,6 +222,161 @@ public class TestKafkaSystemConsumer {
consumer.stop();
}
+ @Test
+ public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+ final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+ final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
+ final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
+
+ final StartpointSpecific testStartpointSpecific = new StartpointSpecific(TEST_OFFSET);
+
+ // Mock the consumer interactions.
+ Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET));
+ Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
+
+ // Invoke the consumer with startpoint.
+ kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
+
+ // Mock verifications.
+ Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET));
+ Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
+ }
+
+ @Test
+ public void testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() {
+ // Define dummy variables for testing.
+ final Long testTimeStamp = 10L;
+
+ final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+ final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
+ final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
+
+ final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp);
+ final Map<TopicPartition, OffsetAndTimestamp> offsetForTimesResult = ImmutableMap.of(
+ TEST_TOPIC_PARTITION, new OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp));
+
+ // Mock the consumer interactions.
+ Mockito.when(consumer.offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, testTimeStamp))).thenReturn(offsetForTimesResult);
+ Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET));
+ Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
+
+ kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp);
+
+ // Mock verifications.
+ Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET));
+ Mockito.verify(consumer).offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, testTimeStamp));
+ Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
+ }
+
+ @Test
+ public void testStartpointTimestampVisitorShouldMoveTheConsumerToEndWhenTimestampDoesNotExist() {
+ final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+ final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
+ final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
+
+ final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(0L);
+ final Map<TopicPartition, OffsetAndTimestamp> offsetForTimesResult = new HashMap<>();
+ offsetForTimesResult.put(TEST_TOPIC_PARTITION, null);
+
+ // Mock the consumer interactions.
+ Mockito.when(consumer.offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, 0L))).thenReturn(offsetForTimesResult);
+ Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
+
+ kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp);
+
+ // Mock verifications.
+ Mockito.verify(consumer).seekToEnd(ImmutableList.of(TEST_TOPIC_PARTITION));
+ Mockito.verify(consumer).offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, 0L));
+ Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
+ }
+
+ @Test
+ public void testStartpointOldestVisitorShouldUpdateTheFetchOffsetInConsumer() {
+ // Define dummy variables for testing.
+ final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+ final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
+ final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
+
+ final StartpointOldest testStartpointSpecific = new StartpointOldest();
+
+ // Mock the consumer interactions.
+ Mockito.doNothing().when(consumer).seekToBeginning(ImmutableList.of(TEST_TOPIC_PARTITION));
+ Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
+
+ // Invoke the consumer with startpoint.
+ kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
+
+ // Mock verifications.
+ Mockito.verify(consumer).seekToBeginning(ImmutableList.of(TEST_TOPIC_PARTITION));
+ Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
+ }
+
+ @Test
+ public void testStartpointUpcomingVisitorShouldUpdateTheFetchOffsetInConsumer() {
+ // Define dummy variables for testing.
+ final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+ final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class);
+ final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = new KafkaStartpointRegistrationHandler(consumer, kafkaConsumerProxy);
+
+ final StartpointUpcoming testStartpointSpecific = new StartpointUpcoming();
+
+ // Mock the consumer interactions.
+ Mockito.doNothing().when(consumer).seekToEnd(ImmutableList.of(TEST_TOPIC_PARTITION));
+ Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
+
+ // Invoke the consumer with startpoint.
+ kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
+
+ // Mock verifications.
+ Mockito.verify(consumer).seekToEnd(ImmutableList.of(TEST_TOPIC_PARTITION));
+ Mockito.verify(kafkaConsumerProxy).addTopicPartition(TEST_SYSTEM_STREAM_PARTITION, Long.valueOf(TEST_OFFSET));
+ }
+
+ @Test
+ public void testStartInvocationAfterStartPointsRegistrationShouldInvokeTheStartPointApplyMethod() {
+ // Initialize the constants required for the test.
+ final Consumer mockConsumer = Mockito.mock(Consumer.class);
+ final KafkaSystemConsumerMetrics kafkaSystemConsumerMetrics = new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry());
+
+ // Test system stream partitions.
+ SystemStreamPartition testSystemStreamPartition1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
+ SystemStreamPartition testSystemStreamPartition2 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
+ SystemStreamPartition testSystemStreamPartition3 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(2));
+ SystemStreamPartition testSystemStreamPartition4 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(3));
+
+ // Different kinds of {@code Startpoint}.
+ StartpointSpecific startPointSpecific = new StartpointSpecific("100");
+ StartpointTimestamp startpointTimestamp = new StartpointTimestamp(100L);
+ StartpointOldest startpointOldest = new StartpointOldest();
+ StartpointUpcoming startpointUpcoming = new StartpointUpcoming();
+
+ // Mock the visit methods of KafkaStartpointRegistrationHandler.
+ KafkaSystemConsumer.KafkaStartpointRegistrationHandler mockStartPointVisitor = Mockito.mock(KafkaStartpointRegistrationHandler.class);
+ Mockito.doNothing().when(mockStartPointVisitor).visit(testSystemStreamPartition1, startPointSpecific);
+ Mockito.doNothing().when(mockStartPointVisitor).visit(testSystemStreamPartition2, startpointTimestamp);
+ Mockito.doNothing().when(mockStartPointVisitor).visit(testSystemStreamPartition3, startpointOldest);
+ Mockito.doNothing().when(mockStartPointVisitor).visit(testSystemStreamPartition4, startpointUpcoming);
+
+ // Instantiate KafkaSystemConsumer for testing.
+ KafkaConsumerProxy proxy = Mockito.mock(KafkaConsumerProxy.class);
+ KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(mockConsumer, TEST_SYSTEM, new MapConfig(),
+ TEST_CLIENT_ID, proxy, kafkaSystemConsumerMetrics, new TestClock(), mockStartPointVisitor);
+
+
+ // Invoke the KafkaSystemConsumer register API with different type of startpoints.
+ kafkaSystemConsumer.register(testSystemStreamPartition1, startPointSpecific);
+ kafkaSystemConsumer.register(testSystemStreamPartition2, startpointTimestamp);
+ kafkaSystemConsumer.register(testSystemStreamPartition3, startpointOldest);
+ kafkaSystemConsumer.register(testSystemStreamPartition4, startpointUpcoming);
+ kafkaSystemConsumer.start();
+
+ // Mock verifications.
+ Mockito.verify(mockStartPointVisitor).visit(testSystemStreamPartition1, startPointSpecific);
+ Mockito.verify(mockStartPointVisitor).visit(testSystemStreamPartition2, startpointTimestamp);
+ Mockito.verify(mockStartPointVisitor).visit(testSystemStreamPartition3, startpointOldest);
+ Mockito.verify(mockStartPointVisitor).visit(testSystemStreamPartition4, startpointUpcoming);
+ }
+
// mock kafkaConsumer and SystemConsumer
static class MockKafkaConsumer extends KafkaConsumer {
public MockKafkaConsumer(Map<String, Object> configs) {