You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/26 00:23:26 UTC
[27/29] samza git commit: more review comments
more review comments
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/190a3999
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/190a3999
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/190a3999
Branch: refs/heads/NewKafkaSystemConsumer
Commit: 190a39990a9281511e07876636b6c8784337d35a
Parents: 93ca950
Author: Boris S <bo...@apache.org>
Authored: Tue Sep 25 15:55:15 2018 -0700
Committer: Boris S <bo...@apache.org>
Committed: Tue Sep 25 15:55:15 2018 -0700
----------------------------------------------------------------------
.../samza/config/KafkaConsumerConfig.java | 47 ++-
.../samza/system/kafka/KafkaConsumerProxy.java | 345 +++++++++----------
.../samza/system/kafka/KafkaSystemConsumer.java | 49 ++-
.../kafka/KafkaSystemConsumerMetrics.scala | 2 +-
.../samza/config/TestKafkaConsumerConfig.java | 19 +-
5 files changed, 235 insertions(+), 227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/190a3999/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
index 7d2408b..3fa66e5 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
@@ -31,6 +31,7 @@ import org.apache.samza.SamzaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
+import scala.runtime.AbstractFunction0;
/**
@@ -40,9 +41,9 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class);
- private static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer";
- private static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer";
- private static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin";
+ static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer";
+ static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer";
+ static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin";
/*
* By default, KafkaConsumer will fetch some big number of available messages for all the partitions.
@@ -55,12 +56,12 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
}
/**
- * This is a help method to create the configs for use in Kafka consumer.
+ * Helper method to create configs for use in Kafka consumer.
* The values are based on the "consumer" subset of the configs provided by the app and Samza overrides.
*
- * @param config - config provided by the app.
- * @param systemName - system name for which the consumer is configured.
- * @param clientId - client id to be used in the Kafka consumer.
+ * @param config config provided by the app.
+ * @param systemName system name to get the consumer configuration for.
+ * @param clientId client id to be used in the Kafka consumer.
* @return KafkaConsumerConfig
*/
public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId) {
@@ -85,7 +86,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)));
- // make sure bootstrap configs are in, if not - get them from the producer
+ // if consumer bootstrap servers are not configured, get them from the producer configs
if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
String bootstrapServers =
config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
@@ -119,10 +120,19 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
// group id should be unique per job
static String getConsumerGroupId(Config config) {
JobConfig jobConfig = new JobConfig(config);
- Option<String> jobIdOption = jobConfig.getJobId();
- Option<String> jobNameOption = jobConfig.getName();
- return (jobNameOption.isDefined() ? jobNameOption.get() : "undefined_job_name") + "-" + (jobIdOption.isDefined()
- ? jobIdOption.get() : "undefined_job_id");
+ Option jobNameOption = jobConfig.getName();
+ if (jobNameOption.isEmpty()) {
+ throw new ConfigException("Missing job name");
+ }
+ String jobName = (String) jobNameOption.get();
+
+ Option jobIdOption = jobConfig.getJobId();
+ String jobId = "1";
+ if (! jobIdOption.isEmpty()) {
+ jobId = (String) jobIdOption.get();
+ }
+
+ return String.format("%s-%s", jobName, jobId);
}
// client id should be unique per job
@@ -139,11 +149,18 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
}
static String getConsumerClientId(String id, Config config) {
- if (config.get(JobConfig.JOB_NAME()) == null) {
+ JobConfig jobConfig = new JobConfig(config);
+ Option jobNameOption = jobConfig.getName();
+ if (jobNameOption.isEmpty()) {
throw new ConfigException("Missing job name");
}
- String jobName = config.get(JobConfig.JOB_NAME());
- String jobId = (config.get(JobConfig.JOB_ID()) != null) ? config.get(JobConfig.JOB_ID()) : "1";
+ String jobName = (String) jobNameOption.get();
+
+ Option jobIdOption = jobConfig.getJobId();
+ String jobId = "1";
+ if (! jobIdOption.isEmpty()) {
+ jobId = (String) jobIdOption.get();
+ }
return String.format("%s-%s-%s", id.replaceAll("\\W", "_"), jobName.replaceAll("\\W", "_"),
jobId.replaceAll("\\W", "_"));
http://git-wip-us.apache.org/repos/asf/samza/blob/190a3999/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index d2f7096..04071c1 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -30,13 +30,10 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import javax.print.DocFlavor;
-import kafka.common.KafkaException;
import kafka.common.TopicAndPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
@@ -53,21 +50,21 @@ import org.slf4j.LoggerFactory;
* This class is not thread safe. There will be only one instance of this class per KafkaSystemConsumer object.
* We still need some synchronization around kafkaConsumer. See pollConsumer() method for details.
*/
-/*package private */class KafkaConsumerProxy<K, V> {
+class KafkaConsumerProxy<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProxy.class);
private static final int SLEEP_MS_WHILE_NO_TOPIC_PARTITION = 100;
- /* package private */ final Thread consumerPollThread;
+ final Thread consumerPollThread;
private final Consumer<K, V> kafkaConsumer;
private final KafkaSystemConsumer.KafkaConsumerMessageSink sink;
private final KafkaSystemConsumerMetrics kafkaConsumerMetrics;
private final String metricName;
private final String systemName;
private final String clientId;
- private final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>();
+ private final Map<TopicPartition, SystemStreamPartition> topicPartitionToSSP = new HashMap<>();
private final Map<SystemStreamPartition, MetricName> perPartitionMetrics = new HashMap<>();
- // list of all the SSPs we poll from, with their next offsets correspondingly.
+ // list of all the SSPs we poll from, with their next(most recently read + 1) offsets correspondingly.
private final Map<SystemStreamPartition, Long> nextOffsets = new ConcurrentHashMap<>();
// lags behind the high water mark, as reported by the Kafka consumer.
private final Map<SystemStreamPartition, Long> latestLags = new HashMap<>();
@@ -76,7 +73,6 @@ import org.slf4j.LoggerFactory;
private volatile Throwable failureCause = null;
private final CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1);
- // package private constructor
KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId,
KafkaSystemConsumer.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics,
String metricName) {
@@ -96,14 +92,46 @@ import org.slf4j.LoggerFactory;
"Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName);
}
- @Override
- public String toString() {
- return String.format("consumerProxy-%s-%s", systemName, clientId);
+ /**
+ * Add new partition to the list of polled partitions.
+ * Bust only be called before {@link KafkaConsumerProxy#start} is called..
+ */
+ public void addTopicPartition(SystemStreamPartition ssp, long nextOffset) {
+ LOG.info(String.format("Adding new topicPartition %s with offset %s to queue for consumer %s", ssp, nextOffset,
+ this));
+ topicPartitionToSSP.put(KafkaSystemConsumer.toTopicPartition(ssp), ssp); //registered SSPs
+
+ // this is already vetted offset so there is no need to validate it
+ nextOffsets.put(ssp, nextOffset);
+
+ kafkaConsumerMetrics.setNumTopicPartitions(metricName, nextOffsets.size());
+ }
+
+ /**
+ * Stop this KafkaConsumerProxy and wait for at most {@code timeoutMs}.
+ * @param timeoutMs maximum time to wait to stop this KafkaConsumerProxy
+ */
+ public void stop(long timeoutMs) {
+ LOG.info("Shutting down KafkaConsumerProxy poll thread {} for {}", consumerPollThread.getName(), this);
+
+ isRunning = false;
+ try {
+ consumerPollThread.join(timeoutMs/2);
+ // join() may timeout
+ // in this case we should interrupt it and wait again
+ if (consumerPollThread.isAlive()) {
+ consumerPollThread.interrupt();
+ consumerPollThread.join(timeoutMs/2);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Join in KafkaConsumerProxy has failed", e);
+ consumerPollThread.interrupt();
+ }
}
public void start() {
if (!consumerPollThread.isAlive()) {
- LOG.info("Starting KafkaConsumerProxy polling thread for system " + systemName + " " + this.toString());
+ LOG.info("Starting KafkaConsumerProxy polling thread for " + this.toString());
consumerPollThread.start();
@@ -112,70 +140,124 @@ import org.slf4j.LoggerFactory;
try {
consumerPollThreadStartLatch.await(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
- LOG.info("Got InterruptedException", e);
+ LOG.info("Ignoring InterruptedException while waiting for consumer poll thread to start.", e);
}
}
} else {
LOG.warn("Tried to start an already started KafkaConsumerProxy (%s). Ignoring.", this.toString());
}
- if (topicPartitions2SSP.size() == 0) {
- String msg = String.format("Cannot start empty set of TopicPartitions for system %s, clientid %s",
- systemName, clientId);
+ if (topicPartitionToSSP.size() == 0) {
+ String msg = String.format("Cannot start KafkaConsumerProxy without any registered TopicPartitions for %s", systemName);
LOG.error(msg);
throw new SamzaException(msg);
}
}
- /**
- * Stop the thread and wait for it to stop.
- * @param timeoutMs how long to wait in join
- */
- public void stop(long timeoutMs) {
- LOG.info("Shutting down KafkaConsumerProxy poll thread:" + consumerPollThread.getName());
+ boolean isRunning() {
+ return isRunning;
+ }
- isRunning = false;
- try {
- consumerPollThread.join(timeoutMs);
- // join returns event if the thread didn't finish
- // in this case we should interrupt it and wait again
- if (consumerPollThread.isAlive()) {
- consumerPollThread.interrupt();
- consumerPollThread.join(timeoutMs);
+ Throwable getFailureCause() {
+ return failureCause;
+ }
+
+ private void initializeLags() {
+ // This is expensive, so only do it once at the beginning. After the first poll, we can rely on metrics for lag.
+ Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitionToSSP.keySet());
+ endOffsets.forEach((tp, offset) -> {
+ SystemStreamPartition ssp = topicPartitionToSSP.get(tp);
+ long startingOffset = nextOffsets.get(ssp);
+ // End offsets are the offset of the newest message + 1
+ // If the message we are about to consume is < end offset, we are starting with a lag.
+ long initialLag = endOffsets.get(tp) - startingOffset;
+
+ LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset);
+ latestLags.put(ssp, initialLag);
+ sink.setIsAtHighWatermark(ssp, initialLag == 0);
+ });
+
+ // initialize lag metrics
+ refreshLagMetrics();
+ }
+
+ // creates a separate thread for getting the messages.
+ private Runnable createProxyThreadRunnable() {
+ Runnable runnable = () -> {
+ isRunning = true;
+
+ try {
+ consumerPollThreadStartLatch.countDown();
+ LOG.info("Starting consumer poll thread {} for system {}", consumerPollThread.getName(), systemName);
+ initializeLags();
+ while (isRunning) {
+ fetchMessages();
+ }
+ } catch (Throwable throwable) {
+ LOG.error(String.format("Error in KafkaConsumerProxy poll thread for system: %s.", systemName), throwable);
+ // KafkaSystemConsumer uses the failureCause to propagate the throwable to the container
+ failureCause = throwable;
+ isRunning = false;
}
- } catch (InterruptedException e) {
- LOG.warn("Join in KafkaConsumerProxy has failed", e);
- consumerPollThread.interrupt();
- }
+
+ if (!isRunning) {
+ LOG.info("KafkaConsumerProxy for system {} has stopped.", systemName);
+ }
+ };
+
+ return runnable;
}
- /**
- * Add new partition to the list of polled partitions.
- * This method should be called only at the beginning, before the thread is started.
- */
- public void addTopicPartition(SystemStreamPartition ssp, long nextOffset) {
- LOG.info(String.format("Adding new topic and partition %s, offset = %s to queue for consumer %s", ssp, nextOffset,
- this));
- topicPartitions2SSP.put(KafkaSystemConsumer.toTopicPartition(ssp), ssp); //registered SSPs
+ private void fetchMessages() {
+ Set<SystemStreamPartition> sspsToFetch = new HashSet<>();
+ for (SystemStreamPartition ssp : nextOffsets.keySet()) {
+ if (sink.needsMoreMessages(ssp)) {
+ sspsToFetch.add(ssp);
+ }
+ }
+ LOG.debug("pollConsumer for {} SSPs: {}", sspsToFetch.size(), sspsToFetch);
+ if (!sspsToFetch.isEmpty()) {
+ kafkaConsumerMetrics.incClientReads(metricName);
- // this is already vetted offset so there is no need to validate it
- LOG.info(String.format("Got offset %s for new topic and partition %s.", nextOffset, ssp));
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response;
- nextOffsets.put(ssp, nextOffset);
+ response = pollConsumer(sspsToFetch, 500L);
+
+ // move the responses into the queue
+ for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : response.entrySet()) {
+ List<IncomingMessageEnvelope> envelopes = e.getValue();
+ if (envelopes != null) {
+ moveMessagesToTheirQueue(e.getKey(), envelopes);
+ }
+ }
+
+ populateCurrentLags(sspsToFetch); // find current lags for for each SSP
+ } else { // nothing to read
+
+ LOG.debug("No topic/partitions need to be fetched for system {} right now. Sleeping {}ms.", systemName,
+ SLEEP_MS_WHILE_NO_TOPIC_PARTITION);
+
+ kafkaConsumerMetrics.incClientSkippedFetchRequests(metricName);
- kafkaConsumerMetrics.setTopicPartitionValue(metricName, nextOffsets.size());
+ try {
+ Thread.sleep(SLEEP_MS_WHILE_NO_TOPIC_PARTITION);
+ } catch (InterruptedException e) {
+ LOG.warn("Sleep in fetchMessages was interrupted");
+ }
+ }
+ refreshLagMetrics();
}
// the actual polling of the messages from kafka
private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollConsumer(
- Set<SystemStreamPartition> systemStreamPartitions, long timeout) {
+ Set<SystemStreamPartition> systemStreamPartitions, long timeoutMs) {
// Since we need to poll only from some subset of TopicPartitions (passed as the argument),
// we need to pause the rest.
List<TopicPartition> topicPartitionsToPause = new ArrayList<>();
List<TopicPartition> topicPartitionsToPoll = new ArrayList<>();
- for (Map.Entry<TopicPartition, SystemStreamPartition> e : topicPartitions2SSP.entrySet()) {
+ for (Map.Entry<TopicPartition, SystemStreamPartition> e : topicPartitionToSSP.entrySet()) {
TopicPartition tp = e.getKey();
SystemStreamPartition ssp = e.getValue();
if (systemStreamPartitions.contains(ssp)) {
@@ -186,21 +268,18 @@ import org.slf4j.LoggerFactory;
}
ConsumerRecords<K, V> records;
-
try {
// Synchronize, in case the consumer is used in some other thread (metadata or something else)
synchronized (kafkaConsumer) {
// Since we are not polling from ALL the subscribed topics, so we need to "change" the subscription temporarily
kafkaConsumer.pause(topicPartitionsToPause);
kafkaConsumer.resume(topicPartitionsToPoll);
- records = kafkaConsumer.poll(timeout);
- // resume original set of subscription - may be required for checkpointing
- kafkaConsumer.resume(topicPartitionsToPause);
+ records = kafkaConsumer.poll(timeoutMs);
}
} catch (Exception e) {
// we may get InvalidOffsetException | AuthorizationException | KafkaException exceptions,
// but we still just rethrow, and log it up the stack.
- LOG.error("Caught a Kafka exception in pollConsumer", e);
+ LOG.error("Caught a Kafka exception in pollConsumer for system " + systemName, e);
throw e;
}
@@ -209,12 +288,11 @@ import org.slf4j.LoggerFactory;
private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> processResults(ConsumerRecords<K, V> records) {
if (records == null) {
- throw new SamzaException("ERROR:records is null, after pollConsumer call (in processResults)");
+ throw new SamzaException("Received null 'records' after polling consumer in KafkaConsumerProxy " + this);
}
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = new HashMap<>(records.count());
// Parse the returned records and convert them into the IncomingMessageEnvelope.
- // Note. They have been already de-serialized by the consumer.
for (ConsumerRecord<K, V> record : records) {
int partition = record.partition();
String topic = record.topic();
@@ -222,18 +300,18 @@ import org.slf4j.LoggerFactory;
updateMetrics(record, tp);
- SystemStreamPartition ssp = topicPartitions2SSP.get(tp);
- List<IncomingMessageEnvelope> listMsgs = results.get(ssp);
- if (listMsgs == null) {
- listMsgs = new ArrayList<>();
- results.put(ssp, listMsgs);
+ SystemStreamPartition ssp = topicPartitionToSSP.get(tp);
+ List<IncomingMessageEnvelope> messages = results.get(ssp);
+ if (messages == null) {
+ messages = new ArrayList<>();
+ results.put(ssp, messages);
}
- final K key = record.key();
- final Object value = record.value();
- final IncomingMessageEnvelope imEnvelope =
+ K key = record.key();
+ Object value = record.value();
+ IncomingMessageEnvelope imEnvelope =
new IncomingMessageEnvelope(ssp, String.valueOf(record.offset()), key, value, getRecordSize(record));
- listMsgs.add(imEnvelope);
+ messages.add(imEnvelope);
}
if (LOG.isDebugEnabled()) {
LOG.debug("# records per SSP:");
@@ -246,52 +324,6 @@ import org.slf4j.LoggerFactory;
return results;
}
- // creates a separate thread for getting the messages.
- private Runnable createProxyThreadRunnable() {
- Runnable runnable= () -> {
- isRunning = true;
-
- try {
- consumerPollThreadStartLatch.countDown();
- LOG.info("Starting runnable " + consumerPollThread.getName());
- initializeLags();
- while (isRunning) {
- fetchMessages();
- }
- } catch (Throwable throwable) {
- LOG.error(String.format("Error in KafkaConsumerProxy poll thread for system: %s.", systemName), throwable);
- // KafkaSystemConsumer uses the failureCause to propagate the throwable to the container
- failureCause = throwable;
- isRunning = false;
- }
-
- if (!isRunning) {
- LOG.info("KafkaConsumerProxy for system {} has stopped.", systemName);
- }
- };
-
- return runnable;
- }
-
- private void initializeLags() {
- // This is expensive, so only do it once at the beginning. After the first poll, we can rely on metrics for lag.
- Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions2SSP.keySet());
- endOffsets.forEach((tp, offset) -> {
- SystemStreamPartition ssp = topicPartitions2SSP.get(tp);
- long startingOffset = nextOffsets.get(ssp);
- // End offsets are the offset of the newest message + 1
- // If the message we are about to consume is < end offset, we are starting with a lag.
- long initialLag = endOffsets.get(tp) - startingOffset;
-
- LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset);
- latestLags.put(ssp, initialLag);
- sink.setIsAtHighWatermark(ssp, initialLag == 0);
- });
-
- // initialize lag metrics
- refreshLatencyMetrics();
- }
-
private int getRecordSize(ConsumerRecord<K, V> r) {
int keySize = (r.key() == null) ? 0 : r.serializedKeySize();
return keySize + r.serializedValueSize();
@@ -300,10 +332,16 @@ import org.slf4j.LoggerFactory;
private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) {
TopicAndPartition tap = KafkaSystemConsumer.toTopicAndPartition(tp);
SystemStreamPartition ssp = new SystemStreamPartition(systemName, tp.topic(), new Partition(tp.partition()));
- long currentSSPLag = getLatestLag(ssp); // lag between the current offset and the highwatermark
+
+ Long lag = latestLags.get(ssp);
+ if (lag == null) {
+ throw new SamzaException("Unknown/unregistered ssp in latestLags. ssp=" + ssp + "; system=" + systemName);
+ }
+ long currentSSPLag = lag.longValue(); // lag between the current offset and the highwatermark
if (currentSSPLag < 0) {
return;
}
+
long recordOffset = r.offset();
long highWatermark = recordOffset + currentSSPLag; // derived value for the highwatermark
@@ -315,7 +353,6 @@ import org.slf4j.LoggerFactory;
kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark);
}
-
private void moveMessagesToTheirQueue(SystemStreamPartition ssp, List<IncomingMessageEnvelope> envelopes) {
long nextOffset = nextOffsets.get(ssp);
@@ -329,16 +366,6 @@ import org.slf4j.LoggerFactory;
nextOffsets.put(ssp, nextOffset);
}
- private void populateMetricNames(Set<SystemStreamPartition> ssps) {
- HashMap<String, String> tags = new HashMap<>();
- tags.put("client-id", clientId);// this is required by the KafkaConsumer to get the metrics
-
- for (SystemStreamPartition ssp : ssps) {
- TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp);
- perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags));
- }
- }
-
// The only way to figure out lag for the KafkaConsumer is to look at the metrics after each poll() call.
// One of the metrics (records-lag) shows how far behind the HighWatermark the consumer is.
// This method populates the lag information for each SSP into latestLags member variable.
@@ -348,17 +375,23 @@ import org.slf4j.LoggerFactory;
// populate the MetricNames first time
if (perPartitionMetrics.isEmpty()) {
- populateMetricNames(ssps);
+ HashMap<String, String> tags = new HashMap<>();
+ tags.put("client-id", clientId); // this is required by the KafkaConsumer to get the metrics
+
+ for (SystemStreamPartition ssp : ssps) {
+ TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp);
+ perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags));
+ }
}
for (SystemStreamPartition ssp : ssps) {
MetricName mn = perPartitionMetrics.get(ssp);
- Metric currentLagM = consumerMetrics.get(mn);
+ Metric currentLagMetric = consumerMetrics.get(mn);
// High watermark is fixed to be the offset of last available message,
// so the lag is now at least 0, which is the same as Samza's definition.
// If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling.
- long currentLag = (currentLagM != null) ? (long) currentLagM.value() : -1L;
+ long currentLag = (currentLagMetric != null) ? (long) currentLagMetric.value() : -1L;
latestLags.put(ssp, currentLag);
// calls the setIsAtHead for the BlockingEnvelopeMap
@@ -366,58 +399,7 @@ import org.slf4j.LoggerFactory;
}
}
- // Get the latest lag for a specific SSP.
- private long getLatestLag(SystemStreamPartition ssp) {
- Long lag = latestLags.get(ssp);
- if (lag == null) {
- throw new SamzaException("Unknown/unregistered ssp in latestLags request: " + ssp);
- }
- return lag;
- }
-
- // Using the consumer to poll the messages from the stream.
- private void fetchMessages() {
- Set<SystemStreamPartition> sspsToFetch = new HashSet<>();
- for (SystemStreamPartition ssp : nextOffsets.keySet()) {
- if (sink.needsMoreMessages(ssp)) {
- sspsToFetch.add(ssp);
- }
- }
- LOG.debug("pollConsumer {}", sspsToFetch.size());
- if (!sspsToFetch.isEmpty()) {
- kafkaConsumerMetrics.incClientReads(metricName);
-
- Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response;
- LOG.debug("pollConsumer from following SSPs: {}; total#={}", sspsToFetch, sspsToFetch.size());
-
- response = pollConsumer(sspsToFetch, 500L);
-
- // move the responses into the queue
- for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : response.entrySet()) {
- List<IncomingMessageEnvelope> envelopes = e.getValue();
- if (envelopes != null) {
- moveMessagesToTheirQueue(e.getKey(), envelopes);
- }
- }
-
- populateCurrentLags(sspsToFetch); // find current lags for for each SSP
- } else { // nothing to read
-
- LOG.debug("No topic/partitions need to be fetched for consumer {} right now. Sleeping {}ms.", kafkaConsumer,
- SLEEP_MS_WHILE_NO_TOPIC_PARTITION);
-
- kafkaConsumerMetrics.incClientSkippedFetchRequests(metricName);
-
- try {
- Thread.sleep(SLEEP_MS_WHILE_NO_TOPIC_PARTITION);
- } catch (InterruptedException e) {
- LOG.warn("Sleep in fetchMessages was interrupted");
- }
- }
- refreshLatencyMetrics();
- }
-
- private void refreshLatencyMetrics() {
+ private void refreshLagMetrics() {
for (Map.Entry<SystemStreamPartition, Long> e : nextOffsets.entrySet()) {
SystemStreamPartition ssp = e.getKey();
Long offset = e.getValue();
@@ -433,12 +415,9 @@ import org.slf4j.LoggerFactory;
}
}
- boolean isRunning() {
- return isRunning;
- }
-
- Throwable getFailureCause() {
- return failureCause;
+ @Override
+ public String toString() {
+ return String.format("consumerProxy-%s-%s", systemName, clientId);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/190a3999/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index 17f29f1..10ce274 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -66,10 +66,10 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
final KafkaConsumerMessageSink messageSink;
// This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates
- // BlockeingEnvelopMap's buffers.
+ // BlockingEnvelopMap's buffers.
final private KafkaConsumerProxy proxy;
- // keep registration data until the start - mapping between registered SSPs and topicPartitions, and the offsets
+ // keep registration data until the start - mapping between registered SSPs and topicPartitions, and their offsets
final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
final Map<TopicPartition, SystemStreamPartition> topicPartitionsToSSP = new HashMap<>();
@@ -77,10 +77,11 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
long perPartitionFetchThresholdBytes;
/**
+ * Create a KafkaSystemConsumer for the provided {@code systemName}
* @param systemName system name for which we create the consumer
- * @param config config passed into the the app
- * @param metrics metrics collecting object
- * @param clock - system clock, allows to override internal clock (System.currentTimeMillis())
+ * @param config application config
+ * @param metrics metrics for this KafkaSystemConsumer
+ * @param clock system clock
*/
public KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId,
KafkaSystemConsumerMetrics metrics, Clock clock) {
@@ -99,11 +100,9 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
messageSink = new KafkaConsumerMessageSink();
// Create the proxy to do the actual message reading.
- String metricName = String.format("%s %s", systemName, clientId);
+ String metricName = String.format("%s", systemName);
proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, metrics, metricName);
LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy);
-
- LOG.info("{}: Created KafkaSystemConsumer {}", this, kafkaConsumer);
}
/**
@@ -118,7 +117,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
// extract kafka client configs
KafkaConsumerConfig consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId);
- LOG.info("{}:{} KafkaClient properties {}", systemName, clientId, consumerConfig);
+ LOG.info("{}: KafkaClient properties {}", systemName, consumerConfig);
return new KafkaConsumer(consumerConfig);
}
@@ -130,7 +129,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
return;
}
if (stopped.get()) {
- LOG.warn("{}: Attempting to start a stopped consumer", this);
+ LOG.error("{}: Attempting to start a stopped consumer", this);
return;
}
// initialize the subscriptions for all the registered TopicPartitions
@@ -151,8 +150,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
kafkaConsumer.assign(topicPartitionsToSSP.keySet());
}
} catch (Exception e) {
- LOG.warn("{}: Start subscription failed", this);
- throw new SamzaException(e);
+ throw new SamzaException("Consumer subscription failed for " + this, e);
}
}
@@ -164,7 +162,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
void startConsumer() {
// set the offset for each TopicPartition
if (topicPartitionsToOffset.size() <= 0) {
- LOG.warn("{}: Consumer is not subscribed to any SSPs", this);
+ LOG.error ("{}: Consumer is not subscribed to any SSPs", this);
}
topicPartitionsToOffset.forEach((tp, startingOffsetString) -> {
@@ -204,35 +202,30 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
long fetchThreshold = FETCH_THRESHOLD;
if (fetchThresholdOption.isDefined()) {
fetchThreshold = Long.valueOf(fetchThresholdOption.get());
- LOG.info("{}: fetchThresholdOption is configured. fetchThreshold={}", this, fetchThreshold);
}
Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName);
long fetchThresholdBytes = FETCH_THRESHOLD_BYTES;
if (fetchThresholdBytesOption.isDefined()) {
fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get());
- LOG.info("{}: fetchThresholdBytesOption is configured. fetchThresholdBytes={}", this, fetchThresholdBytes);
}
- int numTPs = topicPartitionsToSSP.size();
- if (numTPs != topicPartitionsToOffset.size()) {
+ int numPartitions = topicPartitionsToSSP.size();
+ if (numPartitions != topicPartitionsToOffset.size()) {
throw new SamzaException("topicPartitionsToSSP.size() doesn't match topicPartitionsToOffset.size()");
}
- LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; partitions num={}", this, fetchThresholdBytes,
- fetchThreshold, numTPs);
- if (numTPs > 0) {
- perPartitionFetchThreshold = fetchThreshold / numTPs;
- LOG.info("{}: perPartitionFetchThreshold={}", this, perPartitionFetchThreshold);
+ if (numPartitions > 0) {
+ perPartitionFetchThreshold = fetchThreshold / numPartitions;
if (fetchThresholdBytesEnabled) {
// currently this feature cannot be enabled, because we do not have the size of the messages available.
// messages get double buffered, hence divide by 2
- perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numTPs;
- LOG.info("{} :perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes={}", this,
- perPartitionFetchThresholdBytes);
+ perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numPartitions;
}
}
+ LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; numPartitions={}, perPartitionFetchThreshold={}, perPartitionFetchThresholdBytes(0 if disabled)={}",
+ this, fetchThresholdBytes, fetchThreshold, numPartitions, perPartitionFetchThreshold, perPartitionFetchThresholdBytes);
}
@Override
@@ -260,8 +253,10 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
}
}
- /*
- record the ssp and the offset. Do not submit it to the consumer yet.
+ /**
+ * record the ssp and the offset. Do not submit it to the consumer yet.
+ * @param systemStreamPartition ssp to register
+ * @param offset offset to register with
*/
@Override
public void register(SystemStreamPartition systemStreamPartition, String offset) {
http://git-wip-us.apache.org/repos/asf/samza/blob/190a3999/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
index c4552e6..59a8854 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -55,7 +55,7 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr
// java friendlier interfaces
// Gauges
- def setTopicPartitionValue(clientName: String, value: Int) {
+ def setNumTopicPartitions(clientName: String, value: Int) {
topicPartitions.get(clientName).set(value)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/190a3999/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
index 35a717a..de5d093 100644
--- a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
+++ b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
@@ -28,7 +28,7 @@ import org.junit.Test;
public class TestKafkaConsumerConfig {
- private final Map<String, String> props = new HashMap<>();
+
public final static String SYSTEM_NAME = "testSystem";
public final static String KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer.";
public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer.";
@@ -36,6 +36,7 @@ public class TestKafkaConsumerConfig {
@Test
public void testDefaults() {
+ Map<String, String> props = new HashMap<>();
props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // should be ignored
props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
@@ -43,6 +44,8 @@ public class TestKafkaConsumerConfig {
props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
"100"); // should NOT be ignored
+ props.put(JobConfig.JOB_NAME(), "jobName");
+
// if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored
props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092");
props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092");
@@ -72,11 +75,23 @@ public class TestKafkaConsumerConfig {
Assert.assertEquals(KafkaConsumerConfig.getConsumerGroupId(config),
kafkaConsumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG));
+
+ Assert.assertEquals(KafkaConsumerConfig.CONSUMER_CLIENT_ID_PREFIX.replace("-", "_") + "-jobName-1",
+ KafkaConsumerConfig.getConsumerClientId(config));
+ Assert.assertEquals("jobName-1", KafkaConsumerConfig.getConsumerGroupId(config));
+
+ props.put(JobConfig.JOB_ID(), "jobId");
+ config = new MapConfig(props);
+
+ Assert.assertEquals(KafkaConsumerConfig.CONSUMER_CLIENT_ID_PREFIX.replace("-", "_") + "-jobName-jobId",
+ KafkaConsumerConfig.getConsumerClientId(config));
+ Assert.assertEquals("jobName-jobId", KafkaConsumerConfig.getConsumerGroupId(config));
}
// test stuff that should not be overridden
@Test
public void testNotOverride() {
+ Map<String, String> props = new HashMap<>();
// if KAFKA_CONSUMER_PROPERTY_PREFIX is not set, then PRODUCER should be used
props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "useThis:9092");
@@ -85,6 +100,8 @@ public class TestKafkaConsumerConfig {
props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
TestKafkaConsumerConfig.class.getName());
+ props.put(JobConfig.JOB_NAME(), "jobName");
+
Config config = new MapConfig(props);
KafkaConsumerConfig kafkaConsumerConfig =
KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, CLIENT_ID);