You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/18 23:01:40 UTC

[43/47] samza git commit: Added Test for KafkaConsumerConfig

Added Test for KafkaConsumerConfig


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/32c92828
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/32c92828
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/32c92828

Branch: refs/heads/NewKafkaSystemConsumer
Commit: 32c92828eaff98f4c2e6691533ece9f502ef1f98
Parents: 2480aa3
Author: Boris S <bo...@apache.org>
Authored: Wed Sep 12 14:06:41 2018 -0700
Committer: Boris S <bo...@apache.org>
Committed: Wed Sep 12 14:06:41 2018 -0700

----------------------------------------------------------------------
 .../clients/consumer/KafkaConsumerConfig.java   |  23 ++--
 .../org/apache/samza/config/KafkaConfig.scala   |   5 +-
 .../samza/system/kafka/KafkaConsumerProxy.java  |  14 ++-
 .../consumer/TestKafkaConsumerConfig.java       | 121 +++++++++++++++++++
 4 files changed, 149 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/32c92828/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
index 1a97ec7..8ada1b4 100644
--- a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
+++ b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
@@ -54,7 +54,7 @@ public class KafkaConsumerConfig extends ConsumerConfig {
    * By default, KafkaConsumer will fetch ALL available messages for all the partitions.
    * This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll().
    */
-  private static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100";
+  static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100";
 
   private KafkaConsumerConfig(Properties props) {
     super(props);
@@ -83,6 +83,11 @@ public class KafkaConsumerConfig extends ConsumerConfig {
 
     //Kafka client configuration
 
+    // put overrides
+    consumerProps.putAll(injectProps);
+
+    // These are values we enforce in sazma, and they cannot be overwritten.
+
     // Disable consumer auto-commit because Samza controls commits
     consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
@@ -106,28 +111,24 @@ public class KafkaConsumerConfig extends ConsumerConfig {
 
     // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should
     // default to byte[]
-    if (!config.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
+    if (!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
       LOG.info("setting default key serialization for the consumer(for {}) to ByteArrayDeserializer", systemName);
       consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
     }
-    if (!config.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
+    if (!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
       LOG.info("setting default value serialization for the consumer(for {}) to ByteArrayDeserializer", systemName);
       consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
     }
 
-    // NOT SURE THIS IS NEEDED TODO
-    final String maxPollRecords =
-        subConf.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
-    consumerProps.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
-
-    // put overrides
-    consumerProps.putAll(injectProps);
+    // Override default max poll config if there is no value
+    consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
+        (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
 
     return new KafkaConsumerConfig(consumerProps);
   }
 
   // group id should be unique per job
-  private static String getConsumerGroupId(Config config) {
+  static String getConsumerGroupId(Config config) {
     JobConfig jobConfig = new JobConfig(config);
     Option<String> jobIdOption = jobConfig.getJobId();
     Option<String> jobNameOption = jobConfig.getName();

http://git-wip-us.apache.org/repos/asf/samza/blob/32c92828/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 26664ea..ef43e72 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -289,7 +289,10 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     properties
   }
 
-  // kafka config
+  /**
+    * @deprecated Use KafkaConsumerConfig
+    */
+  @Deprecated
   def getKafkaSystemConsumerConfig( systemName: String,
                                     clientId: String,
                                     groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString,

http://git-wip-us.apache.org/repos/asf/samza/blob/32c92828/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 4b99fcc..83e7a58 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -431,12 +431,22 @@ public class KafkaConsumerProxy<K, V> {
     return failureCause;
   }
 
-  public void stop(long timeout) {
+  /**
+   * stop the thread and wait for it to stop
+   * @param timeoutMs how long to wait in join
+   */
+  public void stop(long timeoutMs) {
     LOG.info("Shutting down KafkaConsumerProxy poll thread:" + consumerPollThread.getName());
 
     isRunning = false;
     try {
-      consumerPollThread.join(timeout);
+      consumerPollThread.join(timeoutMs);
+      // join returns event if the thread didn't finish
+      // in this case we should interrupt it and wait again
+      if (consumerPollThread.isAlive()) {
+        consumerPollThread.interrupt();
+        consumerPollThread.join(timeoutMs);
+      }
     } catch (InterruptedException e) {
       LOG.warn("Join in KafkaConsumerProxy has failed", e);
       consumerPollThread.interrupt();

http://git-wip-us.apache.org/repos/asf/samza/blob/32c92828/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java
new file mode 100644
index 0000000..ee300d0
--- /dev/null
+++ b/samza-kafka/src/test/java/org/apache/kafka/clients/consumer/TestKafkaConsumerConfig.java
@@ -0,0 +1,121 @@
+package org.apache.kafka.clients.consumer;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestKafkaConsumerConfig {
+  private final Map<String, String> props = new HashMap<>();
+  public final static String SYSTEM_NAME = "testSystem";
+  public final static String KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer.";
+  public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer.";
+  private final static String CLIENT_ID = "clientId";
+
+  @Before
+  public void setProps() {
+
+  }
+
+  @Test
+  public void testDefaultsAndOverrides() {
+
+    Map<String, String> overrides = new HashMap<>();
+    overrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // should be ignored
+    overrides.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "Ignore"); // should be ignored
+    overrides.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // should NOT be ignored
+
+    // if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored
+    props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092");
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092");
+
+    // should be overridden
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "true"); //ignore
+    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); // ignore
+
+
+    // should be overridden
+    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "200");
+
+    Config config = new MapConfig(props);
+    KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
+        config, SYSTEM_NAME, CLIENT_ID, overrides);
+
+    Assert.assertEquals(kafkaConsumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), false);
+
+    Assert.assertEquals(
+        kafkaConsumerConfig.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
+        Integer.valueOf(KafkaConsumerConfig.DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS));
+
+    Assert.assertEquals(
+        kafkaConsumerConfig.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG).get(0),
+        RangeAssignor.class.getName());
+
+    Assert.assertEquals(
+        kafkaConsumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0),
+        "useThis:9092");
+    Assert.assertEquals(
+        kafkaConsumerConfig.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG).longValue(),
+        100);
+
+    Assert.assertEquals(
+        kafkaConsumerConfig.getClass(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG),
+        ByteArrayDeserializer.class);
+
+    Assert.assertEquals(
+        kafkaConsumerConfig.getClass(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG),
+        ByteArrayDeserializer.class);
+
+    Assert.assertEquals(
+        kafkaConsumerConfig.getString(ConsumerConfig.CLIENT_ID_CONFIG),
+        CLIENT_ID);
+
+    Assert.assertEquals(
+        kafkaConsumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG),
+        KafkaConsumerConfig.getConsumerGroupId(config));
+  }
+
+  @Test
+  // test stuff that should not be overridden
+  public void testNotOverride() {
+
+    // if KAFKA_CONSUMER_PROPERTY_PREFIX is not set, then PRODUCER should be used
+    props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "useThis:9092");
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName());
+    props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName());
+
+
+    Config config = new MapConfig(props);
+    KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
+        config, SYSTEM_NAME, CLIENT_ID, Collections.emptyMap());
+
+    Assert.assertEquals(
+        kafkaConsumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0),
+        "useThis:9092");
+
+    Assert.assertEquals(
+        kafkaConsumerConfig.getClass(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG),
+        TestKafkaConsumerConfig.class);
+
+    Assert.assertEquals(
+        kafkaConsumerConfig.getClass(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG),
+        TestKafkaConsumerConfig.class);
+  }
+
+
+
+  @Test(expected = SamzaException.class)
+  public void testNoBootstrapServers() {
+    KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
+        new MapConfig(Collections.emptyMap()), SYSTEM_NAME, "clientId", Collections.emptyMap());
+
+    Assert.fail("didn't get exception for the missing config:" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+  }
+}