You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/18 23:01:40 UTC
[43/47] samza git commit: Added Test for KafkaConsumerConfig
Added Test for KafkaConsumerConfig
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/32c92828
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/32c92828
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/32c92828
Branch: refs/heads/NewKafkaSystemConsumer
Commit: 32c92828eaff98f4c2e6691533ece9f502ef1f98
Parents: 2480aa3
Author: Boris S <bo...@apache.org>
Authored: Wed Sep 12 14:06:41 2018 -0700
Committer: Boris S <bo...@apache.org>
Committed: Wed Sep 12 14:06:41 2018 -0700
----------------------------------------------------------------------
.../clients/consumer/KafkaConsumerConfig.java | 23 ++--
.../org/apache/samza/config/KafkaConfig.scala | 5 +-
.../samza/system/kafka/KafkaConsumerProxy.java | 14 ++-
.../consumer/TestKafkaConsumerConfig.java | 121 +++++++++++++++++++
4 files changed, 149 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/32c92828/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
index 1a97ec7..8ada1b4 100644
--- a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
+++ b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
@@ -54,7 +54,7 @@ public class KafkaConsumerConfig extends ConsumerConfig {
* By default, KafkaConsumer will fetch ALL available messages for all the partitions.
* This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll().
*/
- private static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100";
+ static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100";
private KafkaConsumerConfig(Properties props) {
super(props);
@@ -83,6 +83,11 @@ public class KafkaConsumerConfig extends ConsumerConfig {
//Kafka client configuration
+ // put overrides
+ consumerProps.putAll(injectProps);
+
+ // These are values we enforce in sazma, and they cannot be overwritten.
+
// Disable consumer auto-commit because Samza controls commits
consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
@@ -106,28 +111,24 @@ public class KafkaConsumerConfig extends ConsumerConfig {
// the consumer is fully typed, and deserialization can be too. But in case it is not provided we should
// default to byte[]
- if (!config.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
+ if (!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
LOG.info("setting default key serialization for the consumer(for {}) to ByteArrayDeserializer", systemName);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
}
- if (!config.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
+ if (!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
LOG.info("setting default value serialization for the consumer(for {}) to ByteArrayDeserializer", systemName);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
}
- // NOT SURE THIS IS NEEDED TODO
- final String maxPollRecords =
- subConf.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
- consumerProps.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
-
- // put overrides
- consumerProps.putAll(injectProps);
+ // Override default max poll config if there is no value
+ consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
+ (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
return new KafkaConsumerConfig(consumerProps);
}
// group id should be unique per job
- private static String getConsumerGroupId(Config config) {
+ static String getConsumerGroupId(Config config) {
JobConfig jobConfig = new JobConfig(config);
Option<String> jobIdOption = jobConfig.getJobId();
Option<String> jobNameOption = jobConfig.getName();
http://git-wip-us.apache.org/repos/asf/samza/blob/32c92828/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 26664ea..ef43e72 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -289,7 +289,10 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
properties
}
- // kafka config
+ /**
+ * @deprecated Use KafkaConsumerConfig
+ */
+ @Deprecated
def getKafkaSystemConsumerConfig( systemName: String,
clientId: String,
groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString,
http://git-wip-us.apache.org/repos/asf/samza/blob/32c92828/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 4b99fcc..83e7a58 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -431,12 +431,22 @@ public class KafkaConsumerProxy<K, V> {
return failureCause;
}
- public void stop(long timeout) {
+ /**
+ * stop the thread and wait for it to stop
+ * @param timeoutMs how long to wait in join
+ */
+ public void stop(long timeoutMs) {
LOG.info("Shutting down KafkaConsumerProxy poll thread:" + consumerPollThread.getName());
isRunning = false;
try {
- consumerPollThread.join(timeout);
+ consumerPollThread.join(timeoutMs);
+ // join returns event if the thread didn't finish
+ // in this case we should interrupt it and wait again
+ if (consumerPollThread.isAlive()) {
+ consumerPollThread.interrupt();
+ consumerPollThread.join(timeoutMs);
+ }
} catch (InterruptedException e) {
LOG.warn("Join in KafkaConsumerProxy has failed", e);
consumerPollThread.interrupt();
http://git-wip-us.apache.org/repos/asf/samza/blob/32c92828/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java
new file mode 100644
index 0000000..ee300d0
--- /dev/null
+++ b/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java
@@ -0,0 +1,121 @@
+package org.apache.kafka.clients.consumer;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestKafkaConsumerConfig {
+ private final Map<String, String> props = new HashMap<>();
+ public final static String SYSTEM_NAME = "testSystem";
+ public final static String KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer.";
+ public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer.";
+ private final static String CLIENT_ID = "clientId";
+
+ @Before
+ public void setProps() {
+
+ }
+
+ @Test
+ public void testDefaultsAndOverrides() {
+
+ Map<String, String> overrides = new HashMap<>();
+ overrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // should be ignored
+ overrides.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "Ignore"); // should be ignored
+ overrides.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // should NOT be ignored
+
+ // if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored
+ props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092");
+ props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092");
+
+ // should be overridden
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "true"); //ignore
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); // ignore
+
+
+ // should be overridden
+ props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "200");
+
+ Config config = new MapConfig(props);
+ KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
+ config, SYSTEM_NAME, CLIENT_ID, overrides);
+
+ Assert.assertEquals(kafkaConsumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), false);
+
+ Assert.assertEquals(
+ kafkaConsumerConfig.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
+ Integer.valueOf(KafkaConsumerConfig.DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS));
+
+ Assert.assertEquals(
+ kafkaConsumerConfig.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG).get(0),
+ RangeAssignor.class.getName());
+
+ Assert.assertEquals(
+ kafkaConsumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0),
+ "useThis:9092");
+ Assert.assertEquals(
+ kafkaConsumerConfig.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG).longValue(),
+ 100);
+
+ Assert.assertEquals(
+ kafkaConsumerConfig.getClass(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG),
+ ByteArrayDeserializer.class);
+
+ Assert.assertEquals(
+ kafkaConsumerConfig.getClass(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG),
+ ByteArrayDeserializer.class);
+
+ Assert.assertEquals(
+ kafkaConsumerConfig.getString(ConsumerConfig.CLIENT_ID_CONFIG),
+ CLIENT_ID);
+
+ Assert.assertEquals(
+ kafkaConsumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG),
+ KafkaConsumerConfig.getConsumerGroupId(config));
+ }
+
+ @Test
+ // test stuff that should not be overridden
+ public void testNotOverride() {
+
+ // if KAFKA_CONSUMER_PROPERTY_PREFIX is not set, then PRODUCER should be used
+ props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "useThis:9092");
+ props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName());
+ props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName());
+
+
+ Config config = new MapConfig(props);
+ KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
+ config, SYSTEM_NAME, CLIENT_ID, Collections.emptyMap());
+
+ Assert.assertEquals(
+ kafkaConsumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0),
+ "useThis:9092");
+
+ Assert.assertEquals(
+ kafkaConsumerConfig.getClass(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG),
+ TestKafkaConsumerConfig.class);
+
+ Assert.assertEquals(
+ kafkaConsumerConfig.getClass(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG),
+ TestKafkaConsumerConfig.class);
+ }
+
+
+
+ @Test(expected = SamzaException.class)
+ public void testNoBootstrapServers() {
+ KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
+ new MapConfig(Collections.emptyMap()), SYSTEM_NAME, "clientId", Collections.emptyMap());
+
+ Assert.fail("didn't get exception for the missing config:" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+ }
+}