You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/26 00:23:22 UTC
[23/29] samza git commit: review comments
review comments
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5397a34e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5397a34e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5397a34e
Branch: refs/heads/NewKafkaSystemConsumer
Commit: 5397a34e2a6a5df0d7ae088ec2b309e65b53b4e7
Parents: 1d1fb89
Author: Boris S <bo...@apache.org>
Authored: Mon Sep 24 10:54:27 2018 -0700
Committer: Boris S <bo...@apache.org>
Committed: Mon Sep 24 10:54:27 2018 -0700
----------------------------------------------------------------------
.../apache/samza/container/SamzaContainer.scala | 2 +-
.../clients/consumer/KafkaConsumerConfig.java | 194 ----------------
.../samza/config/KafkaConsumerConfig.java | 198 +++++++++++++++++
.../samza/system/kafka/KafkaConsumerProxy.java | 220 +++++++++----------
.../samza/system/kafka/KafkaSystemConsumer.java | 187 ++++++++--------
.../kafka/KafkaSystemConsumerMetrics.scala | 2 +-
.../samza/system/kafka/KafkaSystemFactory.scala | 4 +-
.../consumer/TestKafkaConsumerConfig.java | 137 ------------
.../samza/config/TestKafkaConsumerConfig.java | 152 +++++++++++++
.../system/kafka/TestKafkaSystemConsumer.java | 12 +-
10 files changed, 552 insertions(+), 556 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index e71fcb3..fba7329 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -822,7 +822,7 @@ class SamzaContainer(
}
try {
- info("Shutting down SamzaContaier.")
+ info("Shutting down SamzaContainer.")
removeShutdownHook
jmxServer.stop
http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
deleted file mode 100644
index 8ada1b4..0000000
--- a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.kafka.clients.consumer;
-
-import java.util.Map;
-import java.util.Properties;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigException;
-import org.apache.samza.config.JobConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-
-
-/**
- * The configuration class for KafkaConsumer
- */
-public class KafkaConsumerConfig extends ConsumerConfig {
-
- public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class);
-
- private static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer";
- private static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer";
- private static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin";
- private static final String SAMZA_OFFSET_LARGEST = "largest";
- private static final String SAMZA_OFFSET_SMALLEST = "smallest";
- private static final String KAFKA_OFFSET_LATEST = "latest";
- private static final String KAFKA_OFFSET_EARLIEST = "earliest";
- private static final String KAFKA_OFFSET_NONE = "none";
-
- /*
- * By default, KafkaConsumer will fetch ALL available messages for all the partitions.
- * This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll().
- */
- static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100";
-
- private KafkaConsumerConfig(Properties props) {
- super(props);
- }
-
- /**
- * Create kafka consumer configs, based on the subset of global configs.
- * @param config
- * @param systemName
- * @param clientId
- * @param injectProps
- * @return KafkaConsumerConfig
- */
- public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId,
- Map<String, String> injectProps) {
-
- final Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true);
-
- final String groupId = getConsumerGroupId(config);
-
- final Properties consumerProps = new Properties();
- consumerProps.putAll(subConf);
-
- consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
-
- //Kafka client configuration
-
- // put overrides
- consumerProps.putAll(injectProps);
-
- // These are values we enforce in sazma, and they cannot be overwritten.
-
- // Disable consumer auto-commit because Samza controls commits
- consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
- // Translate samza config value to kafka config value
- consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
- getAutoOffsetResetValue(consumerProps));
-
- // make sure bootstrap configs are in ?? SHOULD WE FAIL IF THEY ARE NOT?
- if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
- // get it from the producer config
- String bootstrapServers =
- config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
- if (StringUtils.isEmpty(bootstrapServers)) {
- throw new SamzaException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config for " + systemName);
- }
- consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- }
-
- // Always use default partition assignment strategy. Do not allow override.
- consumerProps.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
-
- // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should
- // default to byte[]
- if (!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
- LOG.info("setting default key serialization for the consumer(for {}) to ByteArrayDeserializer", systemName);
- consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
- }
- if (!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
- LOG.info("setting default value serialization for the consumer(for {}) to ByteArrayDeserializer", systemName);
- consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
- }
-
- // Override default max poll config if there is no value
- consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
- (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
-
- return new KafkaConsumerConfig(consumerProps);
- }
-
- // group id should be unique per job
- static String getConsumerGroupId(Config config) {
- JobConfig jobConfig = new JobConfig(config);
- Option<String> jobIdOption = jobConfig.getJobId();
- Option<String> jobNameOption = jobConfig.getName();
- return (jobNameOption.isDefined() ? jobNameOption.get() : "undefined_job_name") + "-" + (jobIdOption.isDefined()
- ? jobIdOption.get() : "undefined_job_id");
- }
-
- // client id should be unique per job
- public static String getConsumerClientId(Config config) {
- return getConsumerClientId(CONSUMER_CLIENT_ID_PREFIX, config);
- }
- public static String getProducerClientId(Config config) {
- return getConsumerClientId(PRODUCER_CLIENT_ID_PREFIX, config);
- }
- public static String getAdminClientId(Config config) {
- return getConsumerClientId(ADMIN_CLIENT_ID_PREFIX, config);
- }
-
- private static String getConsumerClientId(String id, Config config) {
- if (config.get(JobConfig.JOB_NAME()) == null) {
- throw new ConfigException("Missing job name");
- }
- String jobName = config.get(JobConfig.JOB_NAME());
- String jobId = (config.get(JobConfig.JOB_ID()) != null) ? config.get(JobConfig.JOB_ID()) : "1";
-
- return String.format("%s-%s-%s", id.replaceAll("[^A-Za-z0-9]", "_"), jobName.replaceAll("[^A-Za-z0-9]", "_"),
- jobId.replaceAll("[^A-Za-z0-9]", "_"));
- }
-
- /**
- * If settings for auto.reset in samza are different from settings in Kafka (auto.offset.reset),
- * then need to convert them (see kafka.apache.org/documentation):
- * "largest" -> "latest"
- * "smallest" -> "earliest"
- *
- * If no setting specified we return "latest" (same as Kafka).
- * @param properties All consumer related {@link Properties} parsed from samza config
- * @return String representing the config value for "auto.offset.reset" property
- */
- static String getAutoOffsetResetValue(Properties properties) {
- String autoOffsetReset = properties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KAFKA_OFFSET_LATEST);
-
- // accept kafka values directly
- if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || autoOffsetReset.equals(KAFKA_OFFSET_LATEST)
- || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) {
- return autoOffsetReset;
- }
-
- String newAutoOffsetReset;
- switch (autoOffsetReset) {
- case SAMZA_OFFSET_LARGEST:
- newAutoOffsetReset = KAFKA_OFFSET_LATEST;
- break;
- case SAMZA_OFFSET_SMALLEST:
- newAutoOffsetReset = KAFKA_OFFSET_EARLIEST;
- break;
- default:
- newAutoOffsetReset = KAFKA_OFFSET_LATEST;
- }
- LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, newAutoOffsetReset);
- return newAutoOffsetReset;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
new file mode 100644
index 0000000..4bbe00f
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
@@ -0,0 +1,198 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.config;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.JobConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+
+/**
+ * The configuration class for KafkaConsumer
+ */
+public class KafkaConsumerConfig extends HashMap<String, Object> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class);
+
+ private static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer";
+ private static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer";
+ private static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin";
+
+ /*
+ * By default, KafkaConsumer will fetch some big number of available messages for all the partitions.
+ * This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll().
+ */
+ static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100";
+
+ private KafkaConsumerConfig(Map<String, Object> map) {
+ super(map);
+ }
+
+ /**
+ * This is a help method to create the configs for use in Kafka consumer.
+ * The values are based on the "consumer" subset of the configs provided by the app and Samza overrides.
+ *
+ * @param config - config provided by the app.
+ * @param systemName - system name for which the consumer is configured.
+ * @param clientId - client id to be used in the Kafka consumer.
+ * @return KafkaConsumerConfig
+ */
+ public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId) {
+
+ Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true);
+
+ //Kafka client configuration
+ String groupId = getConsumerGroupId(config);
+
+ Map<String, Object> consumerProps = new HashMap<>();
+ consumerProps.putAll(subConf);
+
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+
+
+ // These are values we enforce in sazma, and they cannot be overwritten.
+
+ // Disable consumer auto-commit because Samza controls commits
+ consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+ // Translate samza config value to kafka config value
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ getAutoOffsetResetValue((String)consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)));
+
+ // make sure bootstrap configs are in, if not - get them from the producer
+ if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+ String bootstrapServers =
+ config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ if (StringUtils.isEmpty(bootstrapServers)) {
+ throw new SamzaException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config for " + systemName);
+ }
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ }
+
+ // Always use default partition assignment strategy. Do not allow override.
+ consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
+
+ // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should
+ // default to byte[]
+ if (!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
+ LOG.info("setting key serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ }
+ if (!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
+ LOG.info("setting value serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName);
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ }
+
+ // Override default max poll config if there is no value
+ consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
+ (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
+
+ return new KafkaConsumerConfig(consumerProps);
+ }
+
+ // group id should be unique per job
+ static String getConsumerGroupId(Config config) {
+ JobConfig jobConfig = new JobConfig(config);
+ Option<String> jobIdOption = jobConfig.getJobId();
+ Option<String> jobNameOption = jobConfig.getName();
+ return (jobNameOption.isDefined() ? jobNameOption.get() : "undefined_job_name") + "-" + (jobIdOption.isDefined()
+ ? jobIdOption.get() : "undefined_job_id");
+ }
+
+ // client id should be unique per job
+ public static String getConsumerClientId(Config config) {
+ return getConsumerClientId(CONSUMER_CLIENT_ID_PREFIX, config);
+ }
+
+ public static String getProducerClientId(Config config) {
+ return getConsumerClientId(PRODUCER_CLIENT_ID_PREFIX, config);
+ }
+
+ public static String getAdminClientId(Config config) {
+ return getConsumerClientId(ADMIN_CLIENT_ID_PREFIX, config);
+ }
+
+ static String getConsumerClientId(String id, Config config) {
+ if (config.get(JobConfig.JOB_NAME()) == null) {
+ throw new ConfigException("Missing job name");
+ }
+ String jobName = config.get(JobConfig.JOB_NAME());
+ String jobId = (config.get(JobConfig.JOB_ID()) != null) ? config.get(JobConfig.JOB_ID()) : "1";
+
+ return String.format("%s-%s-%s", id.replaceAll(
+ "\\W", "_"),
+ jobName.replaceAll("\\W", "_"),
+ jobId.replaceAll("\\W", "_"));
+ }
+
+ /**
+ * If settings for auto.reset in samza are different from settings in Kafka (auto.offset.reset),
+ * then need to convert them (see kafka.apache.org/documentation):
+ * "largest" -> "latest"
+ * "smallest" -> "earliest"
+ *
+ * If no setting specified we return "latest" (same as Kafka).
+ * @param autoOffsetReset value from the app provided config
+ * @return String representing the config value for "auto.offset.reset" property
+ */
+ static String getAutoOffsetResetValue(final String autoOffsetReset) {
+ final String SAMZA_OFFSET_LARGEST = "largest";
+ final String SAMZA_OFFSET_SMALLEST = "smallest";
+ final String KAFKA_OFFSET_LATEST = "latest";
+ final String KAFKA_OFFSET_EARLIEST = "earliest";
+ final String KAFKA_OFFSET_NONE = "none";
+
+ if (autoOffsetReset == null) {
+ return KAFKA_OFFSET_LATEST; // return default
+ }
+
+ // accept kafka values directly
+ if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || autoOffsetReset.equals(KAFKA_OFFSET_LATEST)
+ || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) {
+ return autoOffsetReset;
+ }
+
+ String newAutoOffsetReset;
+ switch (autoOffsetReset) {
+ case SAMZA_OFFSET_LARGEST:
+ newAutoOffsetReset = KAFKA_OFFSET_LATEST;
+ break;
+ case SAMZA_OFFSET_SMALLEST:
+ newAutoOffsetReset = KAFKA_OFFSET_EARLIEST;
+ break;
+ default:
+ newAutoOffsetReset = KAFKA_OFFSET_LATEST;
+ }
+ LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, newAutoOffsetReset);
+ return newAutoOffsetReset;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index b67df0a..d2f7096 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -30,6 +30,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import javax.print.DocFlavor;
import kafka.common.KafkaException;
import kafka.common.TopicAndPartition;
import org.apache.kafka.clients.consumer.Consumer;
@@ -47,7 +48,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Separate thread that reads messages from kafka and puts them into the BlockingEnvelopeMap.
+ * This class contains a separate thread that reads messages from kafka and puts them into the BlockingEnvelopeMap
+ * through KafkaSystemConsumer.KafkaConsumerMessageSink object.
* This class is not thread safe. There will be only one instance of this class per KafkaSystemConsumer object.
* We still need some synchronization around kafkaConsumer. See pollConsumer() method for details.
*/
@@ -74,7 +76,8 @@ import org.slf4j.LoggerFactory;
private volatile Throwable failureCause = null;
private final CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1);
- /* package private */KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId,
+ // package private constructor
+ KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId,
KafkaSystemConsumer.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics,
String metricName) {
@@ -93,6 +96,11 @@ import org.slf4j.LoggerFactory;
"Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName);
}
+ @Override
+ public String toString() {
+ return String.format("consumerProxy-%s-%s", systemName, clientId);
+ }
+
public void start() {
if (!consumerPollThread.isAlive()) {
LOG.info("Starting KafkaConsumerProxy polling thread for system " + systemName + " " + this.toString());
@@ -108,12 +116,43 @@ import org.slf4j.LoggerFactory;
}
}
} else {
- LOG.debug("Tried to start an already started KafkaConsumerProxy (%s). Ignoring.", this.toString());
+ LOG.warn("Tried to start an already started KafkaConsumerProxy (%s). Ignoring.", this.toString());
+ }
+
+ if (topicPartitions2SSP.size() == 0) {
+ String msg = String.format("Cannot start empty set of TopicPartitions for system %s, clientid %s",
+ systemName, clientId);
+ LOG.error(msg);
+ throw new SamzaException(msg);
}
}
- // add new partition to the list of polled partitions
- // this method is called only at the beginning, before the thread is started
+ /**
+ * Stop the thread and wait for it to stop.
+ * @param timeoutMs how long to wait in join
+ */
+ public void stop(long timeoutMs) {
+ LOG.info("Shutting down KafkaConsumerProxy poll thread:" + consumerPollThread.getName());
+
+ isRunning = false;
+ try {
+ consumerPollThread.join(timeoutMs);
+ // join returns event if the thread didn't finish
+ // in this case we should interrupt it and wait again
+ if (consumerPollThread.isAlive()) {
+ consumerPollThread.interrupt();
+ consumerPollThread.join(timeoutMs);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Join in KafkaConsumerProxy has failed", e);
+ consumerPollThread.interrupt();
+ }
+ }
+
+ /**
+ * Add new partition to the list of polled partitions.
+ * This method should be called only at the beginning, before the thread is started.
+ */
public void addTopicPartition(SystemStreamPartition ssp, long nextOffset) {
LOG.info(String.format("Adding new topic and partition %s, offset = %s to queue for consumer %s", ssp, nextOffset,
this));
@@ -124,67 +163,13 @@ import org.slf4j.LoggerFactory;
nextOffsets.put(ssp, nextOffset);
- // we reuse existing metrics. They assume host and port for the broker
- // for now fake the port with the consumer name
kafkaConsumerMetrics.setTopicPartitionValue(metricName, nextOffsets.size());
}
- /**
- * creates a separate thread for pulling messages
- */
- private Runnable createProxyThreadRunnable() {
- Runnable runnable= () -> {
- isRunning = true;
-
- try {
- consumerPollThreadStartLatch.countDown();
- LOG.info("Starting runnable " + consumerPollThread.getName());
- initializeLags();
- while (isRunning) {
- fetchMessages();
- }
- } catch (Throwable throwable) {
- LOG.error(String.format("Error in KafkaConsumerProxy poll thread for system: %s.", systemName), throwable);
- // SamzaKafkaSystemConsumer uses the failureCause to propagate the throwable to the container
- failureCause = throwable;
- isRunning = false;
- }
-
- if (!isRunning) {
- LOG.info("Stopping the KafkaConsumerProxy poll thread for system: {}.", systemName);
- }
- };
-
- return runnable;
- }
-
- private void initializeLags() {
- // This is expensive, so only do it once at the beginning. After the first poll, we can rely on metrics for lag.
- Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions2SSP.keySet());
- endOffsets.forEach((tp, offset) -> {
- SystemStreamPartition ssp = topicPartitions2SSP.get(tp);
- long startingOffset = nextOffsets.get(ssp);
- // End offsets are the offset of the newest message + 1
- // If the message we are about to consume is < end offset, we are starting with a lag.
- long initialLag = endOffsets.get(tp) - startingOffset;
-
- LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset);
- latestLags.put(ssp, initialLag);
- sink.setIsAtHighWatermark(ssp, initialLag == 0);
- });
-
- // initialize lag metrics
- refreshLatencyMetrics();
- }
-
// the actual polling of the messages from kafka
- public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollConsumer(
+ private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollConsumer(
Set<SystemStreamPartition> systemStreamPartitions, long timeout) {
- if (topicPartitions2SSP.size() == 0) {
- throw new SamzaException("cannot poll empty set of TopicPartitions");
- }
-
// Since we need to poll only from some subset of TopicPartitions (passed as the argument),
// we need to pause the rest.
List<TopicPartition> topicPartitionsToPause = new ArrayList<>();
@@ -201,10 +186,9 @@ import org.slf4j.LoggerFactory;
}
ConsumerRecords<K, V> records;
- // make a call on the client
+
try {
- // Currently, when doing checkpoint we are making a safeOffset request through this client, thus we need to synchronize
- // them. In the future we may use this client for the actually checkpointing.
+ // Synchronize, in case the consumer is used in some other thread (metadata or something else)
synchronized (kafkaConsumer) {
// Since we are not polling from ALL the subscribed topics, so we need to "change" the subscription temporarily
kafkaConsumer.pause(topicPartitionsToPause);
@@ -213,12 +197,7 @@ import org.slf4j.LoggerFactory;
// resume original set of subscription - may be required for checkpointing
kafkaConsumer.resume(topicPartitionsToPause);
}
- } catch (InvalidOffsetException e) {
- // If the consumer has thrown this exception it means that auto reset is not set for this consumer.
- // So we just rethrow.
- LOG.error("Caught InvalidOffsetException in pollConsumer", e);
- throw e;
- } catch (KafkaException e) {
+ } catch (Exception e) {
// we may get InvalidOffsetException | AuthorizationException | KafkaException exceptions,
// but we still just rethrow, and log it up the stack.
LOG.error("Caught a Kafka exception in pollConsumer", e);
@@ -230,11 +209,10 @@ import org.slf4j.LoggerFactory;
private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> processResults(ConsumerRecords<K, V> records) {
if (records == null) {
- throw new SamzaException("processResults is called with null object for records");
+ throw new SamzaException("ERROR:records is null, after pollConsumer call (in processResults)");
}
- int capacity = (int) (records.count() / 0.75 + 1); // to avoid rehash, allocate more then 75% of expected capacity.
- Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = new HashMap<>(capacity);
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = new HashMap<>(records.count());
// Parse the returned records and convert them into the IncomingMessageEnvelope.
// Note. They have been already de-serialized by the consumer.
for (ConsumerRecord<K, V> record : records) {
@@ -268,6 +246,52 @@ import org.slf4j.LoggerFactory;
return results;
}
+ // creates a separate thread for getting the messages.
+ private Runnable createProxyThreadRunnable() {
+ Runnable runnable= () -> {
+ isRunning = true;
+
+ try {
+ consumerPollThreadStartLatch.countDown();
+ LOG.info("Starting runnable " + consumerPollThread.getName());
+ initializeLags();
+ while (isRunning) {
+ fetchMessages();
+ }
+ } catch (Throwable throwable) {
+ LOG.error(String.format("Error in KafkaConsumerProxy poll thread for system: %s.", systemName), throwable);
+ // KafkaSystemConsumer uses the failureCause to propagate the throwable to the container
+ failureCause = throwable;
+ isRunning = false;
+ }
+
+ if (!isRunning) {
+ LOG.info("KafkaConsumerProxy for system {} has stopped.", systemName);
+ }
+ };
+
+ return runnable;
+ }
+
+ private void initializeLags() {
+ // This is expensive, so only do it once at the beginning. After the first poll, we can rely on metrics for lag.
+ Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions2SSP.keySet());
+ endOffsets.forEach((tp, offset) -> {
+ SystemStreamPartition ssp = topicPartitions2SSP.get(tp);
+ long startingOffset = nextOffsets.get(ssp);
+ // End offsets are the offset of the newest message + 1
+ // If the message we are about to consume is < end offset, we are starting with a lag.
+ long initialLag = endOffsets.get(tp) - startingOffset;
+
+ LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset);
+ latestLags.put(ssp, initialLag);
+ sink.setIsAtHighWatermark(ssp, initialLag == 0);
+ });
+
+ // initialize lag metrics
+ refreshLatencyMetrics();
+ }
+
private int getRecordSize(ConsumerRecord<K, V> r) {
int keySize = (r.key() == null) ? 0 : r.serializedKeySize();
return keySize + r.serializedValueSize();
@@ -291,9 +315,7 @@ import org.slf4j.LoggerFactory;
kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark);
}
- /*
- This method put messages into blockingEnvelopeMap.
- */
+
private void moveMessagesToTheirQueue(SystemStreamPartition ssp, List<IncomingMessageEnvelope> envelopes) {
long nextOffset = nextOffsets.get(ssp);
@@ -317,11 +339,9 @@ import org.slf4j.LoggerFactory;
}
}
- /*
- The only way to figure out lag for the KafkaConsumer is to look at the metrics after each poll() call.
- One of the metrics (records-lag) shows how far behind the HighWatermark the consumer is.
- This method populates the lag information for each SSP into latestLags member variable.
- */
+ // The only way to figure out lag for the KafkaConsumer is to look at the metrics after each poll() call.
+ // One of the metrics (records-lag) shows how far behind the HighWatermark the consumer is.
+ // This method populates the lag information for each SSP into latestLags member variable.
private void populateCurrentLags(Set<SystemStreamPartition> ssps) {
Map<MetricName, ? extends Metric> consumerMetrics = kafkaConsumer.metrics();
@@ -339,12 +359,6 @@ import org.slf4j.LoggerFactory;
// so the lag is now at least 0, which is the same as Samza's definition.
// If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling.
long currentLag = (currentLagM != null) ? (long) currentLagM.value() : -1L;
- /*
- Metric averageLagM = consumerMetrics.get(new MetricName(tp + ".records-lag-avg", "consumer-fetch-manager-metrics", "", tags));
- double averageLag = (averageLagM != null) ? averageLagM.value() : -1.0;
- Metric maxLagM = consumerMetrics.get(new MetricName(tp + ".records-lag-max", "consumer-fetch-manager-metrics", "", tags));
- double maxLag = (maxLagM != null) ? maxLagM.value() : -1.0;
- */
latestLags.put(ssp, currentLag);
// calls the setIsAtHead for the BlockingEnvelopeMap
@@ -352,10 +366,8 @@ import org.slf4j.LoggerFactory;
}
}
- /*
- Get the latest lag for a specific SSP.
- */
- public long getLatestLag(SystemStreamPartition ssp) {
+ // Get the latest lag for a specific SSP.
+ private long getLatestLag(SystemStreamPartition ssp) {
Long lag = latestLags.get(ssp);
if (lag == null) {
throw new SamzaException("Unknown/unregistered ssp in latestLags request: " + ssp);
@@ -363,9 +375,7 @@ import org.slf4j.LoggerFactory;
return lag;
}
- /*
- Using the consumer to poll the messages from the stream.
- */
+ // Using the consumer to poll the messages from the stream.
private void fetchMessages() {
Set<SystemStreamPartition> sspsToFetch = new HashSet<>();
for (SystemStreamPartition ssp : nextOffsets.keySet()) {
@@ -380,7 +390,7 @@ import org.slf4j.LoggerFactory;
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response;
LOG.debug("pollConsumer from following SSPs: {}; total#={}", sspsToFetch, sspsToFetch.size());
- response = pollConsumer(sspsToFetch, 500); // TODO should be default value from ConsumerConfig
+ response = pollConsumer(sspsToFetch, 500L);
// move the responses into the queue
for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : response.entrySet()) {
@@ -430,27 +440,5 @@ import org.slf4j.LoggerFactory;
Throwable getFailureCause() {
return failureCause;
}
-
- /**
- * stop the thread and wait for it to stop
- * @param timeoutMs how long to wait in join
- */
- public void stop(long timeoutMs) {
- LOG.info("Shutting down KafkaConsumerProxy poll thread:" + consumerPollThread.getName());
-
- isRunning = false;
- try {
- consumerPollThread.join(timeoutMs);
- // join returns event if the thread didn't finish
- // in this case we should interrupt it and wait again
- if (consumerPollThread.isAlive()) {
- consumerPollThread.interrupt();
- consumerPollThread.join(timeoutMs);
- }
- } catch (InterruptedException e) {
- LOG.warn("Join in KafkaConsumerProxy has failed", e);
- consumerPollThread.interrupt();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index 9101a89..e5ded8d 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -31,9 +31,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import kafka.common.TopicAndPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.KafkaConsumerConfig;
+import org.apache.samza.config.KafkaConsumerConfig;
import org.apache.kafka.common.TopicPartition;
-import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.KafkaConfig;
@@ -56,32 +55,33 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
private final Consumer<K, V> kafkaConsumer;
private final String systemName;
- private final KafkaSystemConsumerMetrics samzaConsumerMetrics;
private final String clientId;
- private final String metricName;
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final AtomicBoolean started = new AtomicBoolean(false);
private final Config config;
private final boolean fetchThresholdBytesEnabled;
+ private final KafkaSystemConsumerMetrics metrics;
// This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap.
- /* package private */final KafkaConsumerMessageSink messageSink;
+ final KafkaConsumerMessageSink messageSink;
- // proxy is doing the actual reading
+ // This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates
+ // BlockeingEnvelopMap's buffers.
final private KafkaConsumerProxy proxy;
- /* package private */final Map<TopicPartition, String> topicPartitions2Offset = new HashMap<>();
- /* package private */final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>();
- /* package private */ long perPartitionFetchThreshold;
- /* package private */ long perPartitionFetchThresholdBytes;
+ // keep registration data until the start - mapping between registered SSPs and topicPartitions, and the offsets
+ final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
+ final Map<TopicPartition, SystemStreamPartition> topicPartitionsToSSP = new HashMap<>();
+
+ long perPartitionFetchThreshold;
+ long perPartitionFetchThresholdBytes;
/**
- * Constructor
* @param systemName system name for which we create the consumer
- * @param config config
- * @param metrics metrics
- * @param clock - system clock
+ * @param config config passed into the the app
+ * @param metrics metrics collecting object
+ * @param clock - system clock, allows to override internal clock (System.currentTimeMillis())
*/
public KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId,
KafkaSystemConsumerMetrics metrics, Clock clock) {
@@ -89,54 +89,50 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
super(metrics.registry(), clock, metrics.getClass().getName());
this.kafkaConsumer = kafkaConsumer;
- this.samzaConsumerMetrics = metrics;
this.clientId = clientId;
this.systemName = systemName;
this.config = config;
- this.metricName = String.format("%s %s", systemName, clientId);
+ this.metrics = metrics;
- this.fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
+ fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
// create a sink for passing the messages between the proxy and the consumer
messageSink = new KafkaConsumerMessageSink();
- // Create the proxy to do the actual message reading. It is a separate thread that reads the messages from the stream
- // and puts them into the sink.
- proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, samzaConsumerMetrics, metricName);
- LOG.info("Created consumer proxy: " + proxy);
+ // Create the proxy to do the actual message reading.
+ String metricName = String.format("%s %s", systemName, clientId);
+ proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, metrics, metricName);
+ LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy );
- LOG.info("Created SamzaKafkaSystemConsumer for system={}, clientId={}, metricName={}, KafkaConsumer={}", systemName,
- clientId, metricName, this.kafkaConsumer.toString());
+ LOG.info("{}: Created KafkaSystemConsumer {}", this, kafkaConsumer);
}
/**
- * create kafka consumer
+ * Create internal kafka consumer object, which will be used in the Proxy.
* @param systemName system name for which we create the consumer
* @param clientId client id to use int the kafka client
* @param config config
- * @return kafka consumer
+ * @return kafka consumer object
*/
public static KafkaConsumer<byte[], byte[]> getKafkaConsumerImpl(String systemName, String clientId, Config config) {
- Map<String, String> injectProps = new HashMap<>();
-
// extract kafka client configs
KafkaConsumerConfig consumerConfig =
- KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId, injectProps);
+ KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId);
- LOG.info("KafkaClient properties for systemName {}: {}", systemName, consumerConfig.originals());
+ LOG.info("{}:{} KafkaClient properties {}", systemName, clientId, consumerConfig);
- return new KafkaConsumer<>(consumerConfig.originals());
+ return new KafkaConsumer(consumerConfig);
}
@Override
public void start() {
if (!started.compareAndSet(false, true)) {
- LOG.warn("attempting to start the consumer for the second (or more) time.");
+ LOG.warn("{}: Attempting to start the consumer for the second (or more) time.", this);
return;
}
if (stopped.get()) {
- LOG.warn("attempting to start a stopped consumer");
+ LOG.warn("{}: Attempting to start a stopped consumer", this);
return;
}
// initialize the subscriptions for all the registered TopicPartitions
@@ -145,58 +141,59 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
setFetchThresholds();
startConsumer();
- LOG.info("consumer {} started", this);
+ LOG.info("{}: Consumer started", this);
}
private void startSubscription() {
//subscribe to all the registered TopicPartitions
- LOG.info("consumer {}, subscribes to {} ", this, topicPartitions2SSP.keySet());
+ LOG.info("{}: Consumer subscribes to {}", this, topicPartitionsToSSP.keySet());
try {
synchronized (kafkaConsumer) {
// we are using assign (and not subscribe), so we need to specify both topic and partition
- kafkaConsumer.assign(topicPartitions2SSP.keySet());
+ kafkaConsumer.assign(topicPartitionsToSSP.keySet());
}
} catch (Exception e) {
- LOG.warn("startSubscription failed.", e);
+ LOG.warn("{}: Start subscription failed", this);
throw new SamzaException(e);
}
}
- /*
- Set the offsets to start from.
- Add the TopicPartitions to the proxy.
- Start the proxy thread.
+ /**
+ * Set the offsets to start from.
+ * Register the TopicPartitions with the proxy.
+ * Start the proxy.
*/
void startConsumer() {
- //set the offset for each TopicPartition
- if (topicPartitions2Offset.size() <= 0) {
- LOG.warn("Consumer {} is not subscribed to any SSPs", this);
+ // set the offset for each TopicPartition
+ if (topicPartitionsToOffset.size() <= 0) {
+ LOG.warn("{}: Consumer is not subscribed to any SSPs", this);
}
- topicPartitions2Offset.forEach((tp, startingOffsetString) -> {
+ topicPartitionsToOffset.forEach((tp, startingOffsetString) -> {
long startingOffset = Long.valueOf(startingOffsetString);
try {
synchronized (kafkaConsumer) {
- // TODO in the future we may need to add special handling here for BEGIN/END_OFFSET
- // this will call KafkaConsumer.seekToBegin/End()
kafkaConsumer.seek(tp, startingOffset); // this value should already be the 'upcoming' value
}
} catch (Exception e) {
- // all other exceptions - non recoverable
- LOG.error("Got Exception while seeking to " + startingOffsetString + " for " + tp, e);
- throw new SamzaException(e);
+ // all recoverable execptions are handled by the client.
+ // if we get here there is nothing left to do but bail out.
+ String msg = String.format("%s: Got Exception while seeking to %s for partition %s",
+ this, startingOffsetString, tp);
+ LOG.error(msg, e);
+ throw new SamzaException(msg, e);
}
- LOG.info("Changing consumer's starting offset for tp = " + tp + " to " + startingOffsetString);
+ LOG.info("{}: Changing consumer's starting offset for tp = %s to %s", this, tp, startingOffsetString);
// add the partition to the proxy
- proxy.addTopicPartition(topicPartitions2SSP.get(tp), startingOffset);
+ proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset);
});
// start the proxy thread
if (proxy != null && !proxy.isRunning()) {
- LOG.info("Starting proxy: " + proxy);
+ LOG.info("{}: Starting proxy {}", this, proxy);
proxy.start();
}
}
@@ -209,57 +206,59 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
long fetchThreshold = FETCH_THRESHOLD;
if (fetchThresholdOption.isDefined()) {
fetchThreshold = Long.valueOf(fetchThresholdOption.get());
- LOG.info("fetchThresholdOption is configured. fetchThreshold=" + fetchThreshold);
+ LOG.info("{}: fetchThresholdOption is configured. fetchThreshold={}", this, fetchThreshold);
}
Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName);
long fetchThresholdBytes = FETCH_THRESHOLD_BYTES;
if (fetchThresholdBytesOption.isDefined()) {
fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get());
- LOG.info("fetchThresholdBytesOption is configured. fetchThresholdBytes=" + fetchThresholdBytes);
+ LOG.info("{}: fetchThresholdBytesOption is configured. fetchThresholdBytes={}", this, fetchThresholdBytes);
}
- int numTPs = topicPartitions2SSP.size();
- assert (numTPs == topicPartitions2Offset.size());
+ int numTPs = topicPartitionsToSSP.size();
+ if (numTPs == topicPartitionsToOffset.size()) {
+ throw new SamzaException("topicPartitionsToSSP.size() doesn't match topicPartitionsToOffset.size()");
+ }
- LOG.info("fetchThresholdBytes = " + fetchThresholdBytes + "; fetchThreshold=" + fetchThreshold);
- LOG.info("number of topicPartitions " + numTPs);
+ LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; partitions num={}",
+ this, fetchThresholdBytes, fetchThreshold, numTPs);
if (numTPs > 0) {
perPartitionFetchThreshold = fetchThreshold / numTPs;
- LOG.info("perPartitionFetchThreshold=" + perPartitionFetchThreshold);
+ LOG.info("{}: perPartitionFetchThreshold={}", this, perPartitionFetchThreshold);
if (fetchThresholdBytesEnabled) {
// currently this feature cannot be enabled, because we do not have the size of the messages available.
// messages get double buffered, hence divide by 2
perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numTPs;
- LOG.info("perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes="
- + perPartitionFetchThresholdBytes);
+ LOG.info("{} :perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes={}",
+ this, perPartitionFetchThresholdBytes);
}
}
}
@Override
public void stop() {
- LOG.info("Stopping Samza kafkaConsumer " + this);
-
if (!stopped.compareAndSet(false, true)) {
- LOG.warn("attempting to stop stopped consumer.");
+ LOG.warn("{}: Attempting to stop stopped consumer.", this);
return;
}
- // stop the proxy (with 5 minutes timeout)
+ LOG.info("{}: Stopping Samza kafkaConsumer ", this);
+
+ // stop the proxy (with 1 minute timeout)
if (proxy != null) {
- LOG.info("Stopping proxy " + proxy);
+ LOG.info("{}: Stopping proxy {}", this, proxy);
proxy.stop(TimeUnit.SECONDS.toMillis(60));
}
try {
synchronized (kafkaConsumer) {
- LOG.info("Closing kafka consumer " + kafkaConsumer);
+ LOG.info("{}: Closing kafkaSystemConsumer {}", this, kafkaConsumer);
kafkaConsumer.close();
}
} catch (Exception e) {
- LOG.warn("failed to stop SamzaRawKafkaConsumer + " + this, e);
+ LOG.warn("{}: Failed to stop KafkaSystemConsumer.", this, e);
}
}
@@ -270,45 +269,45 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
public void register(SystemStreamPartition systemStreamPartition, String offset) {
if (started.get()) {
String msg =
- String.format("Trying to register partition after consumer has been started. sn=%s, ssp=%s", systemName,
- systemStreamPartition);
- LOG.error(msg);
+ String.format("%s: Trying to register partition after consumer has been started. ssp=%s",
+ this, systemStreamPartition);
throw new SamzaException(msg);
}
if (!systemStreamPartition.getSystem().equals(systemName)) {
- LOG.warn("ignoring SSP " + systemStreamPartition + ", because this consumer's system is " + systemName);
+ LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition);
return;
}
+ LOG.info("{}: Registering ssp = {} with offset {}", this, systemStreamPartition, offset);
+
super.register(systemStreamPartition, offset);
TopicPartition tp = toTopicPartition(systemStreamPartition);
- topicPartitions2SSP.put(tp, systemStreamPartition);
+ topicPartitionsToSSP.put(tp, systemStreamPartition);
- LOG.info("Registering ssp = " + systemStreamPartition + " with offset " + offset);
- String existingOffset = topicPartitions2Offset.get(tp);
+ String existingOffset = topicPartitionsToOffset.get(tp);
// register the older (of the two) offset in the consumer, to guarantee we do not miss any messages.
if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) {
- topicPartitions2Offset.put(tp, offset);
+ topicPartitionsToOffset.put(tp, offset);
}
- samzaConsumerMetrics.registerTopicAndPartition(toTopicAndPartition(tp));
+ metrics.registerTopicAndPartition(toTopicAndPartition(tp));
}
/**
* Compare two String offsets.
- * Note. There is a method in KafkaAdmin that does that, but that would require instantiation of systemadmin for each consumer.
+ * Note. There is a method in KafkaSystemAdmin that does that, but that would require instantiation of systemadmin for each consumer.
* @return see {@link Long#compareTo(Long)}
*/
- public static int compareOffsets(String offset1, String offset2) {
+ private static int compareOffsets(String offset1, String offset2) {
return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
}
@Override
public String toString() {
- return systemName + "/" + clientId + "/" + super.toString();
+ return String.format("%s:%s", systemName, clientId);
}
@Override
@@ -318,17 +317,11 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
// check if the proxy is running
if (!proxy.isRunning()) {
stop();
- if (proxy.getFailureCause() != null) {
- String message = "KafkaConsumerProxy has stopped";
- throw new SamzaException(message, proxy.getFailureCause());
- } else {
- LOG.warn("Failure cause is not populated for KafkaConsumerProxy");
- throw new SamzaException("KafkaConsumerProxy has stopped");
- }
+ String message = String.format("%s: KafkaConsumerProxy has stopped.", this);
+ throw new SamzaException(message, proxy.getFailureCause());
}
- Map<SystemStreamPartition, List<IncomingMessageEnvelope>> res = super.poll(systemStreamPartitions, timeout);
- return res;
+ return super.poll(systemStreamPartitions, timeout);
}
/**
@@ -353,9 +346,6 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
return systemName;
}
- ////////////////////////////////////
- // inner class for the message sink
- ////////////////////////////////////
public class KafkaConsumerMessageSink {
public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean isAtHighWatermark) {
@@ -363,8 +353,8 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
}
boolean needsMoreMessages(SystemStreamPartition ssp) {
- LOG.debug("needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};"
- + "(limit={}); messagesNumInQueue={}(limit={};", ssp, fetchThresholdBytesEnabled,
+ LOG.debug("{}: needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};"
+ + "(limit={}); messagesNumInQueue={}(limit={};", this, ssp, fetchThresholdBytesEnabled,
getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp),
perPartitionFetchThreshold);
@@ -376,16 +366,15 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
}
void addMessage(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) {
- LOG.trace("Incoming message ssp = {}: envelope = {}.", ssp, envelope);
+ LOG.trace("{}: Incoming message ssp = {}: envelope = {}.", this, ssp, envelope);
try {
put(ssp, envelope);
} catch (InterruptedException e) {
throw new SamzaException(
- String.format("Interrupted while trying to add message with offset %s for ssp %s", envelope.getOffset(),
- ssp));
+ String.format("%s: Consumer was interrupted while trying to add message with offset %s for ssp %s",
+ this, envelope.getOffset(), ssp));
}
}
- } // end of KafkaMessageSink class
- ///////////////////////////////////////////////////////////////////////////
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
index 7dce261..c4552e6 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -50,7 +50,7 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr
clientBytesRead.put(clientName, newCounter("%s-bytes-read" format clientName))
clientReads.put((clientName), newCounter("%s-messages-read" format clientName))
clientSkippedFetchRequests.put((clientName), newCounter("%s-skipped-fetch-requests" format clientName))
- topicPartitions.put(clientName, newGauge("%s-topic-partitions" format clientName, 0))
+ topicPartitions.put(clientName, newGauge("%s-registered-topic-partitions" format clientName, 0))
}
// java friendlier interfaces
http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 5342b08..deaee56 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -22,7 +22,7 @@ package org.apache.samza.system.kafka
import java.util.Properties
import kafka.utils.ZkUtils
-import org.apache.kafka.clients.consumer.{KafkaConsumer, KafkaConsumerConfig}
+import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.samza.SamzaException
import org.apache.samza.config.ApplicationConfig.ApplicationMode
@@ -30,7 +30,7 @@ import org.apache.samza.config.KafkaConfig.Config2Kafka
import org.apache.samza.config.StorageConfig._
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.{ApplicationConfig, Config, KafkaConfig, StreamConfig}
+import org.apache.samza.config._
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, SystemProducer}
import org.apache.samza.util._
http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java
deleted file mode 100644
index 264098b..0000000
--- a/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class TestKafkaConsumerConfig {
- private final Map<String, String> props = new HashMap<>();
- public final static String SYSTEM_NAME = "testSystem";
- public final static String KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer.";
- public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer.";
- private final static String CLIENT_ID = "clientId";
-
- @Before
- public void setProps() {
-
- }
-
- @Test
- public void testDefaultsAndOverrides() {
-
- Map<String, String> overrides = new HashMap<>();
- overrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // should be ignored
- overrides.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "Ignore"); // should be ignored
- overrides.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // should NOT be ignored
-
- // if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored
- props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092");
- props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092");
-
- // should be overridden
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "true"); //ignore
- props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); // ignore
-
-
- // should be overridden
- props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "200");
-
- Config config = new MapConfig(props);
- KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
- config, SYSTEM_NAME, CLIENT_ID, overrides);
-
- Assert.assertEquals(kafkaConsumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), false);
-
- Assert.assertEquals(
- kafkaConsumerConfig.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
- Integer.valueOf(KafkaConsumerConfig.DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS));
-
- Assert.assertEquals(
- kafkaConsumerConfig.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG).get(0),
- RangeAssignor.class.getName());
-
- Assert.assertEquals(
- kafkaConsumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0),
- "useThis:9092");
- Assert.assertEquals(
- kafkaConsumerConfig.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG).longValue(),
- 100);
-
- Assert.assertEquals(
- kafkaConsumerConfig.getClass(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG),
- ByteArrayDeserializer.class);
-
- Assert.assertEquals(
- kafkaConsumerConfig.getClass(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG),
- ByteArrayDeserializer.class);
-
- Assert.assertEquals(
- kafkaConsumerConfig.getString(ConsumerConfig.CLIENT_ID_CONFIG),
- CLIENT_ID);
-
- Assert.assertEquals(
- kafkaConsumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG),
- KafkaConsumerConfig.getConsumerGroupId(config));
- }
-
- @Test
- // test stuff that should not be overridden
- public void testNotOverride() {
-
- // if KAFKA_CONSUMER_PROPERTY_PREFIX is not set, then PRODUCER should be used
- props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "useThis:9092");
- props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName());
- props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName());
-
-
- Config config = new MapConfig(props);
- KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
- config, SYSTEM_NAME, CLIENT_ID, Collections.emptyMap());
-
- Assert.assertEquals(
- kafkaConsumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0),
- "useThis:9092");
-
- Assert.assertEquals(
- kafkaConsumerConfig.getClass(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG),
- TestKafkaConsumerConfig.class);
-
- Assert.assertEquals(
- kafkaConsumerConfig.getClass(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG),
- TestKafkaConsumerConfig.class);
- }
-
-
-
- @Test(expected = SamzaException.class)
- public void testNoBootstrapServers() {
- KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
- new MapConfig(Collections.emptyMap()), SYSTEM_NAME, "clientId", Collections.emptyMap());
-
- Assert.fail("didn't get exception for the missing config:" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
new file mode 100644
index 0000000..719ea22
--- /dev/null
+++ b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.samza.SamzaException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestKafkaConsumerConfig {
+ private final Map<String, String> props = new HashMap<>();
+ public final static String SYSTEM_NAME = "testSystem";
+ public final static String KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer.";
+ public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer.";
+ private final static String CLIENT_ID = "clientId";
+
+ @Before
+ public void setProps() {
+
+ }
+
+ @Test
+ public void testDefaults() {
+
+ props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // should be ignored
+ props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "Ignore"); // should be ignored
+ props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // should NOT be ignored
+
+ // if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored
+ props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092");
+ props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092");
+
+ Config config = new MapConfig(props);
+ KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
+ config, SYSTEM_NAME, CLIENT_ID);
+
+ Assert.assertEquals("false", kafkaConsumerConfig.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+
+ Assert.assertEquals(
+ KafkaConsumerConfig.DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS,
+ kafkaConsumerConfig.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+
+ Assert.assertEquals(
+ RangeAssignor.class.getName(),
+ kafkaConsumerConfig.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
+
+ Assert.assertEquals(
+ "useThis:9092",
+ kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ Assert.assertEquals(
+ "100",
+ kafkaConsumerConfig.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+
+ Assert.assertEquals(
+ ByteArrayDeserializer.class.getName(),
+ kafkaConsumerConfig.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+
+ Assert.assertEquals(
+ ByteArrayDeserializer.class.getName(),
+ kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) );
+
+ Assert.assertEquals(
+ CLIENT_ID,
+ kafkaConsumerConfig.get(ConsumerConfig.CLIENT_ID_CONFIG));
+
+ Assert.assertEquals(
+ KafkaConsumerConfig.getConsumerGroupId(config),
+ kafkaConsumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG));
+ }
+
+ @Test
+ // test stuff that should not be overridden
+ public void testNotOverride() {
+
+ // if KAFKA_CONSUMER_PROPERTY_PREFIX is not set, then PRODUCER should be used
+ props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "useThis:9092");
+ props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName());
+ props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName());
+
+
+ Config config = new MapConfig(props);
+ KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
+ config, SYSTEM_NAME, CLIENT_ID);
+
+ Assert.assertEquals(
+ "useThis:9092",
+ kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+
+ Assert.assertEquals(
+ TestKafkaConsumerConfig.class.getName(),
+ kafkaConsumerConfig.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+
+ Assert.assertEquals(
+ TestKafkaConsumerConfig.class.getName(),
+ kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
+ }
+
+ @Test
+ public void testGetConsumerClientId() {
+ Map<String, String> map = new HashMap<>();
+
+ map.put(JobConfig.JOB_NAME(), "jobName");
+ map.put(JobConfig.JOB_ID(), "jobId");
+ String result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
+ Assert.assertEquals("consumer-jobName-jobId", result);
+
+ result = KafkaConsumerConfig.getConsumerClientId("consumer-", new MapConfig(map));
+ Assert.assertEquals("consumer_-jobName-jobId", result);
+
+ result = KafkaConsumerConfig.getConsumerClientId("super-duper-consumer", new MapConfig(map));
+ Assert.assertEquals("super_duper_consumer-jobName-jobId", result);
+
+ map.put(JobConfig.JOB_NAME(), " very important!job");
+ result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
+ Assert.assertEquals("consumer-_very_important_job-jobId", result);
+
+ map.put(JobConfig.JOB_ID(), "number-#3");
+ result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
+ Assert.assertEquals("consumer-_very_important_job-number__3", result);
+ }
+
+
+
+ @Test(expected = SamzaException.class)
+ public void testNoBootstrapServers() {
+ KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
+ new MapConfig(Collections.emptyMap()), SYSTEM_NAME, "clientId");
+
+ Assert.fail("didn't get exception for the missing config:" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/5397a34e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
index d90bc35..9e8ff44 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
@@ -27,7 +27,7 @@ import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.KafkaConsumerConfig;
+import org.apache.samza.config.KafkaConsumerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.samza.Partition;
import org.apache.samza.config.Config;
@@ -67,8 +67,8 @@ public class TestKafkaSystemConsumer {
Config config = new MapConfig(map);
KafkaConsumerConfig consumerConfig =
- KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, TEST_CLIENT_ID, Collections.emptyMap());
- final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig.originals());
+ KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, TEST_CLIENT_ID);
+ final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig);
MockKafkaSystmeCosumer newKafkaSystemConsumer =
new MockKafkaSystmeCosumer(kafkaConsumer, TEST_SYSTEM, config, TEST_CLIENT_ID,
@@ -116,9 +116,9 @@ public class TestKafkaSystemConsumer {
consumer.register(ssp1, "3");
consumer.register(ssp2, "0");
- assertEquals("0", consumer.topicPartitions2Offset.get(KafkaSystemConsumer.toTopicPartition(ssp0)));
- assertEquals("2", consumer.topicPartitions2Offset.get(KafkaSystemConsumer.toTopicPartition(ssp1)));
- assertEquals("0", consumer.topicPartitions2Offset.get(KafkaSystemConsumer.toTopicPartition(ssp2)));
+ assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp0)));
+ assertEquals("2", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp1)));
+ assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp2)));
}
@Test