You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/10/31 21:22:20 UTC
samza git commit: y
Repository: samza
Updated Branches:
refs/heads/master 8723c3f79 -> a528cc680
y
Author: Boris S <bo...@apache.org>
Author: Boris S <bs...@linkedin.com>
Author: Boris Shkolnik <bs...@linkedin.com>
Reviewers: Ray Matharu <rm...@linkedin.com>
Closes #779 from sborya/RemoveGetKafkaSystemConsumerConfig
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a528cc68
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a528cc68
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a528cc68
Branch: refs/heads/master
Commit: a528cc6805756c9ee11904e00346e08002dcd143
Parents: 8723c3f
Author: Boris S <bo...@apache.org>
Authored: Wed Oct 31 14:22:04 2018 -0700
Committer: Boris S <bs...@linkedin.com>
Committed: Wed Oct 31 14:22:04 2018 -0700
----------------------------------------------------------------------
.../samza/config/KafkaConsumerConfig.java | 14 +
.../org/apache/samza/config/KafkaConfig.scala | 18 -
.../samza/config/RegExTopicGenerator.scala | 4 +-
.../samza/config_deprecated/KafkaConfig.scala | 400 +++++++++++++++++++
.../kafka_deprecated/KafkaSystemFactory.scala | 6 +-
.../apache/samza/config/TestKafkaConfig.scala | 42 +-
6 files changed, 425 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/a528cc68/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
index ad17e82..1e62d94 100644
--- a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
+++ b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java
@@ -43,6 +43,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class);
public static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
+ private static final int FETCH_MAX_BYTES = 1024 * 1024;
private final String systemName;
/*
@@ -123,6 +124,19 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
return clientId;
}
+ public int fetchMessageMaxBytes() {
+ String fetchSize = (String)get("fetch.message.max.bytes");
+ if (StringUtils.isBlank(fetchSize)) {
+ return FETCH_MAX_BYTES;
+ } else {
+ return Integer.valueOf(fetchSize);
+ }
+ }
+
+ public String getZkConnect() {
+ return (String) get(ZOOKEEPER_CONNECT);
+ }
+
// group id should be unique per job
static String createConsumerGroupId(Config config) {
Pair<String, String> jobNameId = getJobNameAndId(config);
http://git-wip-us.apache.org/repos/asf/samza/blob/a528cc68/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index f492518..1954ac7 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -303,24 +303,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
properties
}
- /**
- * @deprecated Use KafkaConsumerConfig
- */
- @Deprecated
- def getKafkaSystemConsumerConfig( systemName: String,
- clientId: String,
- groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString,
- injectedProps: Map[String, String] = Map()) = {
-
- val subConf = config.subset("systems.%s.consumer." format systemName, true)
- val consumerProps = new Properties()
- consumerProps.putAll(subConf)
- consumerProps.put("group.id", groupId)
- consumerProps.put("client.id", clientId)
- consumerProps.putAll(injectedProps.asJava)
- new ConsumerConfig(consumerProps)
- }
-
def getKafkaSystemProducerConfig( systemName: String,
clientId: String,
injectedProps: Map[String, String] = Map()) = {
http://git-wip-us.apache.org/repos/asf/samza/blob/a528cc68/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index a81ff13..e6068b0 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -100,8 +100,8 @@ class RegExTopicGenerator extends ConfigRewriter with Logging {
val systemName = config
.getRegexResolvedSystem(rewriterName)
.getOrElse(throw new SamzaException("No system defined in config for rewriter %s." format rewriterName))
- val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, "")
- val zkConnect = Option(consumerConfig.zkConnect)
+ val consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, "")
+ val zkConnect = Option(consumerConfig.getZkConnect)
.getOrElse(throw new SamzaException("No zookeeper.connect for system %s defined in config." format systemName))
val zkClient = new ZkClient(zkConnect, 6000, 6000)
http://git-wip-us.apache.org/repos/asf/samza/blob/a528cc68/samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala
new file mode 100644
index 0000000..02a6275
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config_deprecated
+
+
+import java.util
+import java.util.concurrent.TimeUnit
+import java.util.regex.Pattern
+import java.util.{Properties, UUID}
+
+import com.google.common.collect.ImmutableMap
+import kafka.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.apache.samza.SamzaException
+import org.apache.samza.config.ApplicationConfig.ApplicationMode
+import org.apache.samza.config._
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.util.{Logging, StreamUtil}
+
+import scala.collection.JavaConverters._
+
+object KafkaConfig {
+ val TOPIC_REPLICATION_FACTOR = "replication.factor"
+ val TOPIC_DEFAULT_REPLICATION_FACTOR = "2"
+
+ val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex"
+ val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
+ val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
+
+ val SEGMENT_BYTES = "segment.bytes"
+
+ val CHECKPOINT_SYSTEM = "task.checkpoint.system"
+ val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint." + TOPIC_REPLICATION_FACTOR
+ val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint." + SEGMENT_BYTES
+
+ val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog." + TOPIC_REPLICATION_FACTOR
+ val DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR = CHANGELOG_STREAM_REPLICATION_FACTOR format "default"
+ val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka."
+ // The default segment size to use for changelog topics
+ val CHANGELOG_DEFAULT_SEGMENT_SIZE = "536870912"
+
+ // Helper regular expression definitions to extract/match configurations
+ val CHANGELOG_STREAM_NAMES_REGEX = "stores\\.(.*)\\.changelog$"
+
+ val JOB_COORDINATOR_REPLICATION_FACTOR = "job.coordinator." + TOPIC_REPLICATION_FACTOR
+ val JOB_COORDINATOR_SEGMENT_BYTES = "job.coordinator." + SEGMENT_BYTES
+
+ val CONSUMER_CONFIGS_CONFIG_KEY = "systems.%s.consumer.%s"
+ val PRODUCER_BOOTSTRAP_SERVERS_CONFIG_KEY = "systems.%s.producer.bootstrap.servers"
+ val PRODUCER_CONFIGS_CONFIG_KEY = "systems.%s.producer.%s"
+ val CONSUMER_ZK_CONNECT_CONFIG_KEY = "systems.%s.consumer.zookeeper.connect"
+
+ /**
+ * Defines how low a queue can get for a single system/stream/partition
+ * combination before trying to fetch more messages for it.
+ */
+ val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold"
+
+ val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400
+
+ /**
+ * Defines how many bytes to use for the buffered prefetch messages for job as a whole.
+ * The bytes for a single system/stream/partition are computed based on this.
+ * This fetches wholes messages, hence this bytes limit is a soft one, and the actual usage can be
+ * the bytes limit + size of max message in the partition for a given stream.
+ * If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config.
+ */
+ val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes"
+
+ val DEFAULT_RETENTION_MS_FOR_BATCH = TimeUnit.DAYS.toMillis(1)
+
+ implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
+}
+
+class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
+ /**
+ * Gets the System to use for reading/writing checkpoints. Uses the following precedence.
+ *
+ * 1. If task.checkpoint.system is defined, that value is used.
+ * 2. If job.default.system is defined, that value is used.
+ * 3. None
+ */
+ def getCheckpointSystem = Option(getOrElse(KafkaConfig.CHECKPOINT_SYSTEM, new JobConfig(config).getDefaultSystem.orNull))
+
+ /**
+ * Gets the replication factor for the checkpoint topic. Uses the following precedence.
+ *
+ * 1. If task.checkpoint.replication.factor is configured, that value is used.
+ * 2. If systems.checkpoint-system.default.stream.replication.factor is configured, that value is used.
+ * 3. None
+ *
+ * Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]]
+ */
+ def getCheckpointReplicationFactor() = {
+ val defaultReplicationFactor: String = getSystemDefaultReplicationFactor(getCheckpointSystem.orNull, "3")
+ val replicationFactor = getOrDefault(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR, defaultReplicationFactor)
+
+ Option(replicationFactor)
+ }
+
+ private def getSystemDefaultReplicationFactor(systemName: String, defaultValue: String) = {
+ val defaultReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, defaultValue)
+ defaultReplicationFactor
+ }
+
+ /**
+ * Gets the segment bytes for the checkpoint topic. Uses the following precedence.
+ *
+ * 1. If task.checkpoint.segment.bytes is configured, that value is used.
+ * 2. If systems.checkpoint-system.default.stream.segment.bytes is configured, that value is used.
+ * 3. None
+ *
+ * Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]]
+ */
+ def getCheckpointSegmentBytes() = {
+ val defaultsegBytes = new JavaSystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
+ getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, defaultsegBytes)
+ }
+
+ /**
+ * Gets the replication factor for the coordinator topic. Uses the following precedence.
+ *
+ * 1. If job.coordinator.replication.factor is configured, that value is used.
+ * 2. If systems.coordinator-system.default.stream.replication.factor is configured, that value is used.
+ * 3. 3
+ *
+ * Note that the coordinator-system has a similar precedence. See [[JobConfig.getCoordinatorSystemName]]
+ */
+ def getCoordinatorReplicationFactor = getOption(KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR) match {
+ case Some(rplFactor) => rplFactor
+ case _ =>
+ val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
+ val systemReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, "3")
+ systemReplicationFactor
+ }
+
+ /**
+ * Gets the segment bytes for the coordinator topic. Uses the following precedence.
+ *
+ * 1. If job.coordinator.segment.bytes is configured, that value is used.
+ * 2. If systems.coordinator-system.default.stream.segment.bytes is configured, that value is used.
+ * 3. None
+ *
+ * Note that the coordinator-system has a similar precedence. See [[JobConfig.getCoordinatorSystemName]]
+ */
+ def getCoordinatorSegmentBytes = getOption(KafkaConfig.JOB_COORDINATOR_SEGMENT_BYTES) match {
+ case Some(segBytes) => segBytes
+ case _ =>
+ val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
+ val segBytes = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.SEGMENT_BYTES, "26214400")
+ segBytes
+ }
+
+ // custom consumer config
+ def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
+
+ def getConsumerFetchThresholdBytes(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES format name)
+
+ def isConsumerFetchThresholdBytesEnabled(name: String): Boolean = getConsumerFetchThresholdBytes(name).getOrElse("-1").toLong > 0
+
+ /**
+ * Returns a map of topic -> fetch.message.max.bytes value for all streams that
+ * are defined with this property in the config.
+ */
+ def getFetchMessageMaxBytesTopics(systemName: String) = {
+ val subConf = config.subset("systems.%s.streams." format systemName, true)
+ subConf
+ .asScala
+ .filterKeys(k => k.endsWith(".consumer.fetch.message.max.bytes"))
+ .map {
+ case (fetchMessageMaxBytes, fetchSizeValue) =>
+ (fetchMessageMaxBytes.replace(".consumer.fetch.message.max.bytes", ""), fetchSizeValue.toInt)
+ }.toMap
+ }
+
+ /**
+ * Returns a map of topic -> auto.offset.reset value for all streams that
+ * are defined with this property in the config.
+ */
+ def getAutoOffsetResetTopics(systemName: String) = {
+ val subConf = config.subset("systems.%s.streams." format systemName, true)
+ subConf
+ .asScala
+ .filterKeys(k => k.endsWith(".consumer.auto.offset.reset"))
+ .map {
+ case (topicAutoOffsetReset, resetValue) =>
+ (topicAutoOffsetReset.replace(".consumer.auto.offset.reset", ""), resetValue)
+ }.toMap
+ }
+
+ // regex resolver
+ def getRegexResolvedStreams(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName)
+
+ def getRegexResolvedSystem(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
+
+ def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
+
+ /**
+ * Gets the replication factor for the changelog topics. Uses the following precedence.
+ *
+ * 1. If stores.myStore.changelog.replication.factor is configured, that value is used.
+ * 2. If systems.changelog-system.default.stream.replication.factor is configured, that value is used.
+ * 3. 2
+ *
+ * Note that the changelog-system has a similar precedence. See [[JavaStorageConfig]]
+ */
+ def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name).getOrElse(getDefaultChangelogStreamReplicationFactor)
+
+ def getDefaultChangelogStreamReplicationFactor() = {
+ val changelogSystem = new JavaStorageConfig(config).getChangelogSystem()
+ getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse(getSystemDefaultReplicationFactor(changelogSystem, "2"))
+ }
+
+ // The method returns a map of storenames to changelog topic names, which are configured to use kafka as the changelog stream
+ def getKafkaChangelogEnabledStores() = {
+ val changelogConfigs = config.regexSubset(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX).asScala
+ var storeToChangelog = Map[String, String]()
+ val storageConfig = new StorageConfig(config)
+ val pattern = Pattern.compile(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX)
+
+ for ((changelogConfig, cn) <- changelogConfigs) {
+ // Lookup the factory for this particular stream and verify if it's a kafka system
+
+ val matcher = pattern.matcher(changelogConfig)
+ val storeName = if (matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + cn)
+
+ storageConfig.getChangelogStream(storeName).foreach(changelogName => {
+ val systemStream = StreamUtil.getSystemStreamFromNames(changelogName)
+ val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem))
+ storeToChangelog += storeName -> systemStream.getStream
+ })
+ }
+ storeToChangelog
+ }
+
+ // Get all kafka properties for changelog stream topic creation
+ def getChangelogKafkaProperties(name: String) = {
+ val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true)
+ val kafkaChangeLogProperties = new Properties
+
+ val appConfig = new ApplicationConfig(config)
+ // SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.57,
+ // 1.0.2, or 1.1.0 (see KAFKA-6568)
+
+ // Adjust changelog topic setting, when TTL is set on a RocksDB store
+ // - Disable log compaction on Kafka changelog topic
+ // - Set topic TTL to be the same as RocksDB TTL
+ Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match {
+ case Some(rocksDbTtl) =>
+ if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) {
+ kafkaChangeLogProperties.setProperty("cleanup.policy", "delete")
+ if (!config.containsKey("stores.%s.changelog.kafka.retention.ms" format name)) {
+ kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(rocksDbTtl))
+ }
+ }
+ case _ =>
+ kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
+ }
+
+ kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
+ kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name)))
+ filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) }
+ kafkaChangeLogProperties
+ }
+
+ // Set the checkpoint topic configs to have a very small segment size and
+ // enable log compaction. This keeps job startup time small since there
+ // are fewer useless (overwritten) messages to read from the checkpoint
+ // topic.
+ def getCheckpointTopicProperties() = {
+ val segmentBytes: Int = getCheckpointSegmentBytes()
+ val appConfig = new ApplicationConfig(config)
+ val isStreamMode = appConfig.getAppMode == ApplicationMode.STREAM
+ val properties = new Properties()
+
+ if (isStreamMode) {
+ properties.putAll(ImmutableMap.of(
+ "cleanup.policy", "compact",
+ "segment.bytes", String.valueOf(segmentBytes)))
+ } else {
+ properties.putAll(ImmutableMap.of(
+ "cleanup.policy", "compact,delete",
+ "retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH),
+ "segment.bytes", String.valueOf(segmentBytes)))
+ }
+ properties
+ }
+
+ /**
+ * @deprecated Use KafkaConsumerConfig
+ */
+ @Deprecated
+ def getKafkaSystemConsumerConfig( systemName: String,
+ clientId: String,
+ groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString,
+ injectedProps: Map[String, String] = Map()) = {
+
+ val subConf = config.subset("systems.%s.consumer." format systemName, true)
+ val consumerProps = new Properties()
+ consumerProps.putAll(subConf)
+ consumerProps.put("group.id", groupId)
+ consumerProps.put("client.id", clientId)
+ consumerProps.putAll(injectedProps.asJava)
+ new ConsumerConfig(consumerProps)
+ }
+
+ def getKafkaSystemProducerConfig( systemName: String,
+ clientId: String,
+ injectedProps: Map[String, String] = Map()) = {
+
+ val subConf = config.subset("systems.%s.producer." format systemName, true)
+ val producerProps = new util.HashMap[String, String]()
+ producerProps.putAll(subConf)
+ producerProps.put("client.id", clientId)
+ producerProps.putAll(injectedProps.asJava)
+ new KafkaProducerConfig(systemName, clientId, producerProps)
+ }
+}
+
+class KafkaProducerConfig(val systemName: String,
+ val clientId: String = "",
+ properties: java.util.Map[String, String] = new util.HashMap[String, String]()) extends Logging {
+
+ // Copied from new Kafka API - Workaround until KAFKA-1794 is resolved
+ val RECONNECT_BACKOFF_MS_DEFAULT = 10L
+
+ //Overrides specific to samza-kafka (these are considered as defaults in Samza & can be overridden by user
+ val MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT: java.lang.Integer = 1.asInstanceOf[Integer]
+ val RETRIES_DEFAULT: java.lang.Integer = Integer.MAX_VALUE
+ val LINGER_MS_DEFAULT: java.lang.Integer = 10
+
+ def getProducerProperties = {
+
+ val byteArraySerializerClassName = classOf[ByteArraySerializer].getCanonicalName
+ val producerProperties: java.util.Map[String, Object] = new util.HashMap[String, Object]()
+ producerProperties.putAll(properties)
+
+ if (!producerProperties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
+ debug("%s undefined. Defaulting to %s." format(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName))
+ producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName)
+ }
+
+ if (!producerProperties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
+ debug("%s undefined. Defaulting to %s." format(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName))
+ producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, byteArraySerializerClassName)
+ }
+
+ if (producerProperties.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)
+ && producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION).asInstanceOf[String].toInt > MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT) {
+ warn("Setting '%s' to a value other than %d does not guarantee message ordering because new messages will be sent without waiting for previous ones to be acknowledged."
+ format(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT))
+ } else {
+ producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT)
+ }
+
+ if (!producerProperties.containsKey(ProducerConfig.RETRIES_CONFIG)) {
+ debug("%s undefined. Defaulting to %s." format(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT))
+ producerProperties.put(ProducerConfig.RETRIES_CONFIG, RETRIES_DEFAULT)
+ }
+ producerProperties.get(ProducerConfig.RETRIES_CONFIG).toString.toInt // Verify int
+
+ if (!producerProperties.containsKey(ProducerConfig.LINGER_MS_CONFIG)) {
+ debug("%s undefined. Defaulting to %s." format(ProducerConfig.LINGER_MS_CONFIG, LINGER_MS_DEFAULT))
+ producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, LINGER_MS_DEFAULT)
+ }
+ producerProperties.get(ProducerConfig.LINGER_MS_CONFIG).toString.toInt // Verify int
+
+ producerProperties
+ }
+
+ val reconnectIntervalMs = Option(properties.get(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG))
+ .getOrElse(RECONNECT_BACKOFF_MS_DEFAULT).asInstanceOf[Long]
+
+ val bootsrapServers = {
+ if (properties.containsKey("metadata.broker.list"))
+ warn("Kafka producer configuration contains 'metadata.broker.list'. This configuration is deprecated . Samza has been upgraded " +
+ "to use Kafka's new producer API. Please update your configurations based on the documentation at http://kafka.apache.org/documentation.html#newproducerconfigs")
+ Option(properties.get("bootstrap.servers"))
+ .getOrElse(throw new SamzaException("No bootstrap servers defined in config for %s." format systemName))
+ .asInstanceOf[String]
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a528cc68/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
index eecdbe4..d588831 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
@@ -47,13 +47,14 @@ object KafkaSystemFactory extends Logging {
class KafkaSystemFactory extends SystemFactory with Logging {
def getConsumer(systemName: String, config: Config, registry: MetricsRegistry): SystemConsumer = {
+ val kafkaConfig:org.apache.samza.config_deprecated.KafkaConfig = config
val clientId = getClientId("samza-consumer", config)
val metrics = new KafkaSystemConsumerMetrics(systemName, registry)
// Kind of goofy to need a producer config for consumers, but we need metadata.
val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
val bootstrapServers = producerConfig.bootsrapServers
- val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+ val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig(systemName, clientId)
val timeout = consumerConfig.socketTimeoutMs
val bufferSize = consumerConfig.socketReceiveBufferBytes
@@ -104,10 +105,11 @@ class KafkaSystemFactory extends SystemFactory with Logging {
}
def getAdmin(systemName: String, config: Config): SystemAdmin = {
+ val kafkaConfig:org.apache.samza.config_deprecated.KafkaConfig = config
val clientId = getClientId("samza-admin", config)
val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
val bootstrapServers = producerConfig.bootsrapServers
- val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+ val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig(systemName, clientId)
val timeout = consumerConfig.socketTimeoutMs
val bufferSize = consumerConfig.socketReceiveBufferBytes
val zkConnect = Option(consumerConfig.zkConnect)
http://git-wip-us.apache.org/repos/asf/samza/blob/a528cc68/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index b8467b8..1f402c8 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -20,9 +20,11 @@
package org.apache.samza.config
import java.util.Properties
+
import org.apache.samza.config.factories.PropertiesConfigFactory
import org.junit.Assert._
import org.junit.Test
+
import scala.collection.JavaConverters._
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.clients.producer.ProducerConfig
@@ -41,56 +43,22 @@ class TestKafkaConfig {
props = new Properties
props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")
props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/")
+ props.setProperty(JobConfig.JOB_NAME, "jobName")
}
- @Test
- def testIdGeneration = {
- val factory = new PropertiesConfigFactory()
- props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory")
-
- val mapConfig = new MapConfig(props.asScala.asJava)
- val kafkaConfig = new KafkaConfig(mapConfig)
-
- val consumerConfig1 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, "TestClientId1")
- val consumerClientId1 = consumerConfig1.clientId
- val groupId1 = consumerConfig1.groupId
- val consumerConfig2 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, "TestClientId2")
- val consumerClientId2 = consumerConfig2.clientId
- val groupId2 = consumerConfig2.groupId
- assert(consumerClientId1.equals("TestClientId1"))
- assert(consumerClientId2.equals("TestClientId2"))
- assert(groupId1.startsWith("undefined-samza-consumer-group-"))
- assert(groupId2.startsWith("undefined-samza-consumer-group-"))
- assert(consumerClientId1 != consumerClientId2)
- assert(groupId1 != groupId2)
-
- val consumerConfig3 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID, TEST_GROUP_ID)
- val consumerClientId3 = consumerConfig3.clientId
- val groupId3 = consumerConfig3.groupId
- assert(consumerClientId3.equals(TEST_CLIENT_ID))
- assert(groupId3.equals(TEST_GROUP_ID))
-
- val producerConfig1 = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, "TestClientId1")
- val producerClientId1 = producerConfig1.clientId
- val producerConfig2 = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, "TestClientId2")
- val producerClientId2 = producerConfig2.clientId
-
- assert(producerClientId1.equals("TestClientId1"))
- assert(producerClientId2.equals("TestClientId2"))
- }
@Test
def testStreamLevelFetchSizeOverride() {
val mapConfig = new MapConfig(props.asScala.asJava)
val kafkaConfig = new KafkaConfig(mapConfig)
- val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
+ val consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(mapConfig, SYSTEM_NAME, TEST_CLIENT_ID)
// default fetch size
assertEquals(1024*1024, consumerConfig.fetchMessageMaxBytes)
props.setProperty("systems." + SYSTEM_NAME + ".consumer.fetch.message.max.bytes", "262144")
val mapConfig1 = new MapConfig(props.asScala.asJava)
val kafkaConfig1 = new KafkaConfig(mapConfig1)
- val consumerConfig1 = kafkaConfig1.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
+ val consumerConfig1 = KafkaConsumerConfig.getKafkaSystemConsumerConfig(mapConfig1, SYSTEM_NAME, TEST_CLIENT_ID)
// shared fetch size
assertEquals(512*512, consumerConfig1.fetchMessageMaxBytes)