You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/26 00:23:23 UTC
[24/29] samza git commit: more review comments
more review comments
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4ae563c6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4ae563c6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4ae563c6
Branch: refs/heads/NewKafkaSystemConsumer
Commit: 4ae563c609aaaf0d2ceaeb90ab9ae33dfdc8d601
Parents: 5397a34
Author: Boris S <bo...@apache.org>
Authored: Mon Sep 24 12:31:25 2018 -0700
Committer: Boris S <bo...@apache.org>
Committed: Mon Sep 24 12:31:25 2018 -0700
----------------------------------------------------------------------
.../samza/config/KafkaConsumerConfig.java | 19 ++---
.../samza/system/kafka/KafkaSystemConsumer.java | 40 +++++----
.../samza/system/kafka/KafkaSystemFactory.scala | 1 -
.../samza/config/TestKafkaConsumerConfig.java | 85 ++++++++------------
.../system/kafka/TestKafkaSystemAdminJava.java | 18 ++---
.../system/kafka/TestKafkaSystemConsumer.java | 40 ++++-----
6 files changed, 84 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/4ae563c6/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
index 4bbe00f..7d2408b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
@@ -23,13 +23,11 @@ package org.apache.samza.config;
import java.util.HashMap;
import java.util.Map;
-import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.samza.SamzaException;
-import org.apache.samza.config.JobConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
@@ -78,7 +76,6 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
-
// These are values we enforce in sazma, and they cannot be overwritten.
// Disable consumer auto-commit because Samza controls commits
@@ -86,7 +83,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
// Translate samza config value to kafka config value
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
- getAutoOffsetResetValue((String)consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)));
+ getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)));
// make sure bootstrap configs are in, if not - get them from the producer
if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
@@ -148,9 +145,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
String jobName = config.get(JobConfig.JOB_NAME());
String jobId = (config.get(JobConfig.JOB_ID()) != null) ? config.get(JobConfig.JOB_ID()) : "1";
- return String.format("%s-%s-%s", id.replaceAll(
- "\\W", "_"),
- jobName.replaceAll("\\W", "_"),
+ return String.format("%s-%s-%s", id.replaceAll("\\W", "_"), jobName.replaceAll("\\W", "_"),
jobId.replaceAll("\\W", "_"));
}
@@ -172,7 +167,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
final String KAFKA_OFFSET_NONE = "none";
if (autoOffsetReset == null) {
- return KAFKA_OFFSET_LATEST; // return default
+ return KAFKA_OFFSET_LATEST; // return default
}
// accept kafka values directly
@@ -184,15 +179,15 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
String newAutoOffsetReset;
switch (autoOffsetReset) {
case SAMZA_OFFSET_LARGEST:
- newAutoOffsetReset = KAFKA_OFFSET_LATEST;
+ newAutoOffsetReset = KAFKA_OFFSET_LATEST;
break;
case SAMZA_OFFSET_SMALLEST:
- newAutoOffsetReset = KAFKA_OFFSET_EARLIEST;
+ newAutoOffsetReset = KAFKA_OFFSET_EARLIEST;
break;
default:
- newAutoOffsetReset = KAFKA_OFFSET_LATEST;
+ newAutoOffsetReset = KAFKA_OFFSET_LATEST;
}
- LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, newAutoOffsetReset);
+ LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, newAutoOffsetReset);
return newAutoOffsetReset;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/4ae563c6/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
index e5ded8d..17f29f1 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -31,11 +31,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
import kafka.common.TopicAndPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.samza.config.KafkaConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.KafkaConsumerConfig;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;
@@ -69,7 +69,6 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
// BlockeingEnvelopMap's buffers.
final private KafkaConsumerProxy proxy;
-
// keep registration data until the start - mapping between registered SSPs and topicPartitions, and the offsets
final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
final Map<TopicPartition, SystemStreamPartition> topicPartitionsToSSP = new HashMap<>();
@@ -102,7 +101,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
// Create the proxy to do the actual message reading.
String metricName = String.format("%s %s", systemName, clientId);
proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, metrics, metricName);
- LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy );
+ LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy);
LOG.info("{}: Created KafkaSystemConsumer {}", this, kafkaConsumer);
}
@@ -117,8 +116,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
public static KafkaConsumer<byte[], byte[]> getKafkaConsumerImpl(String systemName, String clientId, Config config) {
// extract kafka client configs
- KafkaConsumerConfig consumerConfig =
- KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId);
+ KafkaConsumerConfig consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId);
LOG.info("{}:{} KafkaClient properties {}", systemName, clientId, consumerConfig);
@@ -179,8 +177,8 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
} catch (Exception e) {
// all recoverable execptions are handled by the client.
// if we get here there is nothing left to do but bail out.
- String msg = String.format("%s: Got Exception while seeking to %s for partition %s",
- this, startingOffsetString, tp);
+ String msg =
+ String.format("%s: Got Exception while seeking to %s for partition %s", this, startingOffsetString, tp);
LOG.error(msg, e);
throw new SamzaException(msg, e);
}
@@ -217,12 +215,12 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
}
int numTPs = topicPartitionsToSSP.size();
- if (numTPs == topicPartitionsToOffset.size()) {
+ if (numTPs != topicPartitionsToOffset.size()) {
throw new SamzaException("topicPartitionsToSSP.size() doesn't match topicPartitionsToOffset.size()");
}
- LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; partitions num={}",
- this, fetchThresholdBytes, fetchThreshold, numTPs);
+ LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; partitions num={}", this, fetchThresholdBytes,
+ fetchThreshold, numTPs);
if (numTPs > 0) {
perPartitionFetchThreshold = fetchThreshold / numTPs;
@@ -231,8 +229,8 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
// currently this feature cannot be enabled, because we do not have the size of the messages available.
// messages get double buffered, hence divide by 2
perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numTPs;
- LOG.info("{} :perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes={}",
- this, perPartitionFetchThresholdBytes);
+ LOG.info("{} :perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes={}", this,
+ perPartitionFetchThresholdBytes);
}
}
}
@@ -268,9 +266,8 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
@Override
public void register(SystemStreamPartition systemStreamPartition, String offset) {
if (started.get()) {
- String msg =
- String.format("%s: Trying to register partition after consumer has been started. ssp=%s",
- this, systemStreamPartition);
+ String msg = String.format("%s: Trying to register partition after consumer has been started. ssp=%s", this,
+ systemStreamPartition);
throw new SamzaException(msg);
}
@@ -286,7 +283,6 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
topicPartitionsToSSP.put(tp, systemStreamPartition);
-
String existingOffset = topicPartitionsToOffset.get(tp);
// register the older (of the two) offset in the consumer, to guarantee we do not miss any messages.
if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) {
@@ -353,10 +349,10 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
}
boolean needsMoreMessages(SystemStreamPartition ssp) {
- LOG.debug("{}: needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};"
- + "(limit={}); messagesNumInQueue={}(limit={};", this, ssp, fetchThresholdBytesEnabled,
- getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp),
- perPartitionFetchThreshold);
+ LOG.debug("{}: needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};"
+ + "(limit={}); messagesNumInQueue={}(limit={};", this, ssp, fetchThresholdBytesEnabled,
+ getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp),
+ perPartitionFetchThreshold);
if (fetchThresholdBytesEnabled) {
return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes;
@@ -372,8 +368,8 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
put(ssp, envelope);
} catch (InterruptedException e) {
throw new SamzaException(
- String.format("%s: Consumer was interrupted while trying to add message with offset %s for ssp %s",
- this, envelope.getOffset(), ssp));
+ String.format("%s: Consumer was interrupted while trying to add message with offset %s for ssp %s", this,
+ envelope.getOffset(), ssp));
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4ae563c6/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index deaee56..ba5390b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -22,7 +22,6 @@ package org.apache.samza.system.kafka
import java.util.Properties
import kafka.utils.ZkUtils
-import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.samza.SamzaException
import org.apache.samza.config.ApplicationConfig.ApplicationMode
http://git-wip-us.apache.org/repos/asf/samza/blob/4ae563c6/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
index 719ea22..35a717a 100644
--- a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
+++ b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.samza.SamzaException;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
@@ -35,84 +34,67 @@ public class TestKafkaConsumerConfig {
public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer.";
private final static String CLIENT_ID = "clientId";
- @Before
- public void setProps() {
-
- }
-
@Test
public void testDefaults() {
props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // should be ignored
- props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "Ignore"); // should be ignored
- props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // should NOT be ignored
+ props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+ "Ignore"); // should be ignored
+ props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
+ "100"); // should NOT be ignored
// if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored
props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092");
props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092");
Config config = new MapConfig(props);
- KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
- config, SYSTEM_NAME, CLIENT_ID);
+ KafkaConsumerConfig kafkaConsumerConfig =
+ KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, CLIENT_ID);
Assert.assertEquals("false", kafkaConsumerConfig.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
- Assert.assertEquals(
- KafkaConsumerConfig.DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS,
+ Assert.assertEquals(KafkaConsumerConfig.DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS,
kafkaConsumerConfig.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
- Assert.assertEquals(
- RangeAssignor.class.getName(),
+ Assert.assertEquals(RangeAssignor.class.getName(),
kafkaConsumerConfig.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
- Assert.assertEquals(
- "useThis:9092",
- kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
- Assert.assertEquals(
- "100",
- kafkaConsumerConfig.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+ Assert.assertEquals("useThis:9092", kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ Assert.assertEquals("100", kafkaConsumerConfig.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
- Assert.assertEquals(
- ByteArrayDeserializer.class.getName(),
+ Assert.assertEquals(ByteArrayDeserializer.class.getName(),
kafkaConsumerConfig.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
- Assert.assertEquals(
- ByteArrayDeserializer.class.getName(),
- kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) );
+ Assert.assertEquals(ByteArrayDeserializer.class.getName(),
+ kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
- Assert.assertEquals(
- CLIENT_ID,
- kafkaConsumerConfig.get(ConsumerConfig.CLIENT_ID_CONFIG));
+ Assert.assertEquals(CLIENT_ID, kafkaConsumerConfig.get(ConsumerConfig.CLIENT_ID_CONFIG));
- Assert.assertEquals(
- KafkaConsumerConfig.getConsumerGroupId(config),
+ Assert.assertEquals(KafkaConsumerConfig.getConsumerGroupId(config),
kafkaConsumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG));
}
- @Test
// test stuff that should not be overridden
+ @Test
public void testNotOverride() {
// if KAFKA_CONSUMER_PROPERTY_PREFIX is not set, then PRODUCER should be used
props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "useThis:9092");
- props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName());
- props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName());
-
+ props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ TestKafkaConsumerConfig.class.getName());
+ props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ TestKafkaConsumerConfig.class.getName());
Config config = new MapConfig(props);
- KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
- config, SYSTEM_NAME, CLIENT_ID);
+ KafkaConsumerConfig kafkaConsumerConfig =
+ KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, CLIENT_ID);
- Assert.assertEquals(
- "useThis:9092",
- kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ Assert.assertEquals("useThis:9092", kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
- Assert.assertEquals(
- TestKafkaConsumerConfig.class.getName(),
+ Assert.assertEquals(TestKafkaConsumerConfig.class.getName(),
kafkaConsumerConfig.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
- Assert.assertEquals(
- TestKafkaConsumerConfig.class.getName(),
+ Assert.assertEquals(TestKafkaConsumerConfig.class.getName(),
kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
}
@@ -122,30 +104,29 @@ public class TestKafkaConsumerConfig {
map.put(JobConfig.JOB_NAME(), "jobName");
map.put(JobConfig.JOB_ID(), "jobId");
- String result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
+ String result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
Assert.assertEquals("consumer-jobName-jobId", result);
- result = KafkaConsumerConfig.getConsumerClientId("consumer-", new MapConfig(map));
+ result = KafkaConsumerConfig.getConsumerClientId("consumer-", new MapConfig(map));
Assert.assertEquals("consumer_-jobName-jobId", result);
- result = KafkaConsumerConfig.getConsumerClientId("super-duper-consumer", new MapConfig(map));
+ result = KafkaConsumerConfig.getConsumerClientId("super-duper-consumer", new MapConfig(map));
Assert.assertEquals("super_duper_consumer-jobName-jobId", result);
map.put(JobConfig.JOB_NAME(), " very important!job");
- result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
+ result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
Assert.assertEquals("consumer-_very_important_job-jobId", result);
map.put(JobConfig.JOB_ID(), "number-#3");
- result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
+ result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map));
Assert.assertEquals("consumer-_very_important_job-number__3", result);
}
-
-
@Test(expected = SamzaException.class)
public void testNoBootstrapServers() {
- KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(
- new MapConfig(Collections.emptyMap()), SYSTEM_NAME, "clientId");
+ KafkaConsumerConfig kafkaConsumerConfig =
+ KafkaConsumerConfig.getKafkaSystemConsumerConfig(new MapConfig(Collections.emptyMap()), SYSTEM_NAME,
+ "clientId");
Assert.fail("didn't get exception for the missing config:" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4ae563c6/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 77f47f9..7e968bf 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -19,17 +19,14 @@
package org.apache.samza.system.kafka;
-import java.util.*;
import java.util.HashMap;
import java.util.Map;
-
+import java.util.Properties;
import kafka.api.TopicMetadata;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.StreamValidationException;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.util.ScalaJavaUtil;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -71,7 +68,6 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
admin.createStream(spec);
admin.validateStream(spec);
-
}
@Test
@@ -143,7 +139,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
public void testCreateStream() {
StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8);
- assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec));
+ assertTrue("createStream should return true if the stream does not exist and then is created.",
+ systemAdmin().createStream(spec));
systemAdmin().validateStream(spec);
assertFalse("createStream should return false if the stream already exists.", systemAdmin().createStream(spec));
@@ -162,7 +159,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
StreamSpec spec1 = new StreamSpec("testId", "testStreamPartition", "testSystem", 8);
StreamSpec spec2 = new StreamSpec("testId", "testStreamPartition", "testSystem", 4);
- assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec1));
+ assertTrue("createStream should return true if the stream does not exist and then is created.",
+ systemAdmin().createStream(spec1));
systemAdmin().validateStream(spec2);
}
@@ -172,7 +170,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
StreamSpec spec1 = new StreamSpec("testId", "testStreamName1", "testSystem", 8);
StreamSpec spec2 = new StreamSpec("testId", "testStreamName2", "testSystem", 8);
- assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec1));
+ assertTrue("createStream should return true if the stream does not exist and then is created.",
+ systemAdmin().createStream(spec1));
systemAdmin().validateStream(spec2);
}
@@ -181,7 +180,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
public void testClearStream() {
StreamSpec spec = new StreamSpec("testId", "testStreamClear", "testSystem", 8);
- assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec));
+ assertTrue("createStream should return true if the stream does not exist and then is created.",
+ systemAdmin().createStream(spec));
assertTrue(systemAdmin().clearStream(spec));
scala.collection.immutable.Set<String> topic = new scala.collection.immutable.Set.Set1<>(spec.getPhysicalName());
http://git-wip-us.apache.org/repos/asf/samza/blob/4ae563c6/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
index 9e8ff44..933558c 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
@@ -21,24 +21,22 @@
package org.apache.samza.system.kafka;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.samza.config.KafkaConsumerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.samza.Partition;
import org.apache.samza.config.Config;
import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.KafkaConsumerConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;
import org.apache.samza.util.NoOpMetricsRegistry;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -52,12 +50,7 @@ public class TestKafkaSystemConsumer {
public final String FETCH_THRESHOLD_MSGS = "50000";
public final String FETCH_THRESHOLD_BYTES = "100000";
- @Before
- public void setUp() {
-
- }
-
- private KafkaSystemConsumer setupConsumer(String fetchMsg, String fetchBytes) {
+ private KafkaSystemConsumer createConsumer(String fetchMsg, String fetchBytes) {
final Map<String, String> map = new HashMap<>();
map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD(), TEST_SYSTEM), fetchMsg);
@@ -70,8 +63,8 @@ public class TestKafkaSystemConsumer {
KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, TEST_CLIENT_ID);
final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig);
- MockKafkaSystmeCosumer newKafkaSystemConsumer =
- new MockKafkaSystmeCosumer(kafkaConsumer, TEST_SYSTEM, config, TEST_CLIENT_ID,
+ MockKafkaSystemConsumer newKafkaSystemConsumer =
+ new MockKafkaSystemConsumer(kafkaConsumer, TEST_SYSTEM, config, TEST_CLIENT_ID,
new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry()), System::currentTimeMillis);
return newKafkaSystemConsumer;
@@ -80,7 +73,7 @@ public class TestKafkaSystemConsumer {
@Test
public void testConfigValidations() {
- final KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
+ final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
consumer.start();
// should be no failures
@@ -88,7 +81,7 @@ public class TestKafkaSystemConsumer {
@Test
public void testFetchThresholdShouldDivideEvenlyAmongPartitions() {
- final KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
+ final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
final int partitionsNum = 50;
for (int i = 0; i < partitionsNum; i++) {
consumer.register(new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(i)), "0");
@@ -99,12 +92,14 @@ public class TestKafkaSystemConsumer {
Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum, consumer.perPartitionFetchThreshold);
Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_BYTES) / 2 / partitionsNum,
consumer.perPartitionFetchThresholdBytes);
+
+ consumer.stop();
}
@Test
public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() {
- KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
+ KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
@@ -137,7 +132,7 @@ public class TestKafkaSystemConsumer {
bytesSerde.serialize("", "value1".getBytes()), ime1Size);
IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()),
bytesSerde.serialize("", "value11".getBytes()), ime11Size);
- KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
+ KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
consumer.register(ssp0, "0");
consumer.register(ssp1, "0");
@@ -156,6 +151,8 @@ public class TestKafkaSystemConsumer {
Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0));
Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1));
+
+ consumer.stop();
}
@Test
@@ -178,7 +175,7 @@ public class TestKafkaSystemConsumer {
// limit by number of messages 4/2 = 2 per partition
// limit by number of bytes - disabled
- KafkaSystemConsumer consumer = setupConsumer("4", "0"); // should disable
+ KafkaSystemConsumer consumer = createConsumer("4", "0"); // should disable
consumer.register(ssp0, "0");
consumer.register(ssp1, "0");
@@ -197,6 +194,8 @@ public class TestKafkaSystemConsumer {
Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0));
Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1));
+
+ consumer.stop();
}
// mock kafkaConsumer and SystemConsumer
@@ -206,17 +205,12 @@ public class TestKafkaSystemConsumer {
}
}
- static class MockKafkaSystmeCosumer extends KafkaSystemConsumer {
- public MockKafkaSystmeCosumer(Consumer kafkaConsumer, String systemName, Config config, String clientId,
+ static class MockKafkaSystemConsumer extends KafkaSystemConsumer {
+ public MockKafkaSystemConsumer(Consumer kafkaConsumer, String systemName, Config config, String clientId,
KafkaSystemConsumerMetrics metrics, Clock clock) {
super(kafkaConsumer, systemName, config, clientId, metrics, clock);
}
- //@Override
- //void createConsumerProxy() {
- // this.messageSink = new KafkaConsumerMessageSink();
- //}
-
@Override
void startConsumer() {
}