You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/26 01:08:10 UTC
[2/2] samza git commit: NewSystemConsumer for kafka system
NewSystemConsumer for kafka system
Remove SimpleConsumer and BrokerProxy from Samza's KafkaSystemConsumer implementation.
Instead use KafkaConsumerProxy with high-level kafka consumer.
Author: Boris S <bo...@apache.org>
Author: Boris Shkolnik <bs...@linkedin.com>
Reviewers: Shanthoosh Venktataraman <sp...@usc.edu>, Prateek Maheshwari <pm...@linkedin.com>
Closes #624 from sborya/NewConsumer2
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/003ad106
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/003ad106
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/003ad106
Branch: refs/heads/master
Commit: 003ad1068ac7d36f462b49ded0dbb2e4dedc1089
Parents: 078afb5
Author: Boris S <bo...@apache.org>
Authored: Tue Sep 25 18:07:44 2018 -0700
Committer: Boris S <bo...@apache.org>
Committed: Tue Sep 25 18:07:44 2018 -0700
----------------------------------------------------------------------
.../samza/system/IncomingMessageEnvelope.java | 3 +-
.../ClusterBasedJobCoordinator.java | 2 +-
.../apache/samza/storage/StorageRecovery.java | 2 +-
.../samza/checkpoint/CheckpointTool.scala | 2 +-
.../apache/samza/container/SamzaContainer.scala | 2 +-
.../samza/coordinator/JobModelManager.scala | 6 +-
.../samza/job/local/ProcessJobFactory.scala | 3 +-
.../samza/job/local/ThreadJobFactory.scala | 20 +-
.../samza/coordinator/TestJobCoordinator.scala | 4 +-
.../org/apache/samza/config/KafkaConfig.scala | 5 +-
.../samza/config/KafkaConsumerConfig.java | 210 +++++++++
.../apache/samza/system/kafka/BrokerProxy.scala | 332 --------------
.../samza/system/kafka/KafkaConsumerProxy.java | 423 ++++++++++++++++++
.../samza/system/kafka/KafkaSystemConsumer.java | 371 ++++++++++++++++
.../system/kafka/KafkaSystemConsumer.scala | 309 -------------
.../kafka/KafkaSystemConsumerMetrics.scala | 68 ++-
.../samza/system/kafka/KafkaSystemFactory.scala | 80 ++--
.../samza/config/TestKafkaConsumerConfig.java | 150 +++++++
.../system/kafka/TestKafkaSystemAdminJava.java | 18 +-
.../samza/system/kafka/TestBrokerProxy.scala | 434 -------------------
.../system/kafka/TestKafkaSystemConsumer.java | 220 ++++++++++
.../system/kafka/TestKafkaSystemConsumer.scala | 191 --------
.../test/integration/StreamTaskTestUtil.scala | 17 +-
.../integration/TestShutdownStatefulTask.scala | 4 +-
.../samza/validation/YarnJobValidationTool.java | 2 +-
.../yarn/TestSamzaYarnAppMasterService.scala | 4 +-
26 files changed, 1491 insertions(+), 1391 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index 4d0ce2f..c5aed31 100644
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -59,7 +59,8 @@ public class IncomingMessageEnvelope {
* @param message A deserialized message received from the partition offset.
* @param size size of the message and key in bytes.
*/
- public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key, Object message, int size) {
+ public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset,
+ Object key, Object message, int size) {
this.systemStreamPartition = systemStreamPartition;
this.offset = offset;
this.key = key;
http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 016d171..12e26f7 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -174,7 +174,7 @@ public class ClusterBasedJobCoordinator {
// build a JobModelManager and ChangelogStreamManager and perform partition assignments.
changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager);
- jobModelManager = JobModelManager.apply(coordinatorStreamManager, changelogStreamManager.readPartitionMapping());
+ jobModelManager = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping());
config = jobModelManager.jobModel().getConfig();
hasDurableStores = new StorageConfig(config).hasDurableStores();
http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index bf46018..9a76d75 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -131,7 +131,7 @@ public class StorageRecovery extends CommandLine {
coordinatorStreamManager.start();
coordinatorStreamManager.bootstrap();
ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager);
- JobModel jobModel = JobModelManager.apply(coordinatorStreamManager, changelogStreamManager.readPartitionMapping()).jobModel();
+ JobModel jobModel = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping()).jobModel();
containers = jobModel.getContainers();
coordinatorStreamManager.stop();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 0ca8a3d..65fb419 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -170,7 +170,7 @@ class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap, manage
coordinatorStreamManager.start
coordinatorStreamManager.bootstrap
val changelogManager = new ChangelogStreamManager(coordinatorStreamManager)
- val jobModelManager = JobModelManager(coordinatorStreamManager, changelogManager.readPartitionMapping())
+ val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig, changelogManager.readPartitionMapping())
val taskNames = jobModelManager
.jobModel
.getContainers
http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 7b64f5e..fba7329 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -822,7 +822,7 @@ class SamzaContainer(
}
try {
- info("Shutting down.")
+ info("Shutting down SamzaContainer.")
removeShutdownHook
jmxServer.stop
http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index f7698c0..600b7a1 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -35,7 +35,6 @@ import org.apache.samza.container.LocalityManager
import org.apache.samza.container.TaskName
import org.apache.samza.coordinator.server.HttpServer
import org.apache.samza.coordinator.server.JobServlet
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager
import org.apache.samza.job.model.JobModel
import org.apache.samza.job.model.TaskModel
import org.apache.samza.metrics.MetricsRegistryMap
@@ -64,12 +63,11 @@ object JobModelManager extends Logging {
* a) Reads the jobModel from coordinator stream using the job's configuration.
* b) Recomputes changelog partition mapping based on jobModel and job's configuration.
* c) Builds JobModelManager using the jobModel read from coordinator stream.
- * @param coordinatorStreamManager Coordinator stream manager.
+ * @param config Config from the coordinator stream.
* @param changelogPartitionMapping The changelog partition-to-task mapping.
* @return JobModelManager
*/
- def apply(coordinatorStreamManager: CoordinatorStreamManager, changelogPartitionMapping: util.Map[TaskName, Integer]) = {
- val config = coordinatorStreamManager.getConfig
+ def apply(config: Config, changelogPartitionMapping: util.Map[TaskName, Integer]) = {
val localityManager = new LocalityManager(config, new MetricsRegistryMap())
// Map the name of each system to the corresponding SystemAdmin
http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index 642a484..64f516b 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -50,7 +50,7 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
coordinatorStreamManager.bootstrap
val changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager)
- val coordinator = JobModelManager(coordinatorStreamManager, changelogStreamManager.readPartitionMapping())
+ val coordinator = JobModelManager(coordinatorStreamManager.getConfig, changelogStreamManager.readPartitionMapping())
val jobModel = coordinator.jobModel
val taskPartitionMappings: util.Map[TaskName, Integer] = new util.HashMap[TaskName, Integer]
@@ -61,6 +61,7 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
}
changelogStreamManager.writePartitionMapping(taskPartitionMappings)
+ coordinatorStreamManager.stop()
//create necessary checkpoint and changelog streams
val checkpointManager = new TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index abd7f65..bec4ec0 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -20,9 +20,9 @@
package org.apache.samza.job.local
import org.apache.samza.application.{ApplicationDescriptorUtil, ApplicationUtil}
-import org.apache.samza.config.{Config, TaskConfigJava}
import org.apache.samza.config.JobConfig._
import org.apache.samza.config.ShellCommandConfig._
+import org.apache.samza.config.{Config, TaskConfigJava}
import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, TaskName}
import org.apache.samza.coordinator.JobModelManager
import org.apache.samza.coordinator.stream.CoordinatorStreamManager
@@ -30,16 +30,15 @@ import org.apache.samza.job.{StreamJob, StreamJobFactory}
import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap, MetricsReporter}
import org.apache.samza.runtime.ProcessorContext
import org.apache.samza.storage.ChangelogStreamManager
-import org.apache.samza.task.TaskFactory
-import org.apache.samza.task.TaskFactoryUtil
+import org.apache.samza.task.{TaskFactory, TaskFactoryUtil}
import org.apache.samza.util.Logging
import scala.collection.JavaConversions._
import scala.collection.mutable
/**
- * Creates a new Thread job with the given config
- */
+ * Creates a new Thread job with the given config
+ */
class ThreadJobFactory extends StreamJobFactory with Logging {
def getJob(config: Config): StreamJob = {
info("Creating a ThreadJob, which is only meant for debugging.")
@@ -51,7 +50,8 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
coordinatorStreamManager.bootstrap
val changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager)
- val coordinator = JobModelManager(coordinatorStreamManager, changelogStreamManager.readPartitionMapping())
+ val coordinator = JobModelManager(coordinatorStreamManager.getConfig, changelogStreamManager.readPartitionMapping())
+
val jobModel = coordinator.jobModel
val taskPartitionMappings: mutable.Map[TaskName, Integer] = mutable.Map[TaskName, Integer]()
@@ -67,6 +67,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
val checkpointManager = new TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
if (checkpointManager != null) {
checkpointManager.createResources()
+ checkpointManager.stop()
}
ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, jobModel.maxChangeLogStreamPartitions)
@@ -74,17 +75,17 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
val jmxServer = new JmxServer
val appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config)
- val taskFactory : TaskFactory[_] = TaskFactoryUtil.getTaskFactory(appDesc)
+ val taskFactory: TaskFactory[_] = TaskFactoryUtil.getTaskFactory(appDesc)
// Give developers a nice friendly warning if they've specified task.opts and are using a threaded job.
config.getTaskOpts match {
case Some(taskOpts) => warn("%s was specified in config, but is not being used because job is being executed with ThreadJob. " +
- "You probably want to run %s=%s." format (TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS, classOf[ProcessJobFactory].getName))
+ "You probably want to run %s=%s." format(TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS, classOf[ProcessJobFactory].getName))
case _ => None
}
val containerListener = {
- val processorLifecycleListener = appDesc.getProcessorLifecycleListenerFactory().createInstance(new ProcessorContext() { }, config)
+ val processorLifecycleListener = appDesc.getProcessorLifecycleListenerFactory().createInstance(new ProcessorContext() {}, config)
new SamzaContainerListener {
override def afterFailure(t: Throwable): Unit = {
processorLifecycleListener.afterFailure(t)
@@ -120,6 +121,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
threadJob
} finally {
coordinator.stop
+ coordinatorStreamManager.stop()
jmxServer.stop
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 2488355..b7a9bec 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -275,7 +275,9 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
coordinatorStreamManager.start
coordinatorStreamManager.bootstrap
val changelogPartitionManager = new ChangelogStreamManager(coordinatorStreamManager)
- JobModelManager(coordinatorStreamManager, changelogPartitionManager.readPartitionMapping())
+ val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig, changelogPartitionManager.readPartitionMapping())
+ coordinatorStreamManager.stop()
+ jobModelManager
}
@Before
http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 26664ea..ef43e72 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -289,7 +289,10 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
properties
}
- // kafka config
+ /**
+ * @deprecated Use KafkaConsumerConfig
+ */
+ @Deprecated
def getKafkaSystemConsumerConfig( systemName: String,
clientId: String,
groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString,
http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
new file mode 100644
index 0000000..3fa66e5
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java
@@ -0,0 +1,210 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.config;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.samza.SamzaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.runtime.AbstractFunction0;
+
+
+/**
+ * The configuration class for KafkaConsumer
+ */
+public class KafkaConsumerConfig extends HashMap<String, Object> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class);
+
+ static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer";
+ static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer";
+ static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin";
+
+ /*
+ * By default, KafkaConsumer will fetch some big number of available messages for all the partitions.
+ * This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll().
+ */
+ static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100";
+
+ private KafkaConsumerConfig(Map<String, Object> map) {
+ super(map);
+ }
+
+ /**
+ * Helper method to create configs for use in Kafka consumer.
+ * The values are based on the "consumer" subset of the configs provided by the app and Samza overrides.
+ *
+ * @param config config provided by the app.
+ * @param systemName system name to get the consumer configuration for.
+ * @param clientId client id to be used in the Kafka consumer.
+ * @return KafkaConsumerConfig
+ */
+ public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId) {
+
+ Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true);
+
+ //Kafka client configuration
+ String groupId = getConsumerGroupId(config);
+
+ Map<String, Object> consumerProps = new HashMap<>();
+ consumerProps.putAll(subConf);
+
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+
+ // These are values we enforce in sazma, and they cannot be overwritten.
+
+ // Disable consumer auto-commit because Samza controls commits
+ consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+ // Translate samza config value to kafka config value
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)));
+
+ // if consumer bootstrap servers are not configured, get them from the producer configs
+ if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
+ String bootstrapServers =
+ config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ if (StringUtils.isEmpty(bootstrapServers)) {
+ throw new SamzaException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config for " + systemName);
+ }
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ }
+
+ // Always use default partition assignment strategy. Do not allow override.
+ consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
+
+ // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should
+ // default to byte[]
+ if (!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
+ LOG.info("setting key serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ }
+ if (!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
+ LOG.info("setting value serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName);
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ }
+
+ // Override default max poll config if there is no value
+ consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
+ (k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
+
+ return new KafkaConsumerConfig(consumerProps);
+ }
+
+ // group id should be unique per job
+ static String getConsumerGroupId(Config config) {
+ JobConfig jobConfig = new JobConfig(config);
+ Option jobNameOption = jobConfig.getName();
+ if (jobNameOption.isEmpty()) {
+ throw new ConfigException("Missing job name");
+ }
+ String jobName = (String) jobNameOption.get();
+
+ Option jobIdOption = jobConfig.getJobId();
+ String jobId = "1";
+ if (! jobIdOption.isEmpty()) {
+ jobId = (String) jobIdOption.get();
+ }
+
+ return String.format("%s-%s", jobName, jobId);
+ }
+
+ // client id should be unique per job
+ public static String getConsumerClientId(Config config) {
+ return getConsumerClientId(CONSUMER_CLIENT_ID_PREFIX, config);
+ }
+
+ public static String getProducerClientId(Config config) {
+ return getConsumerClientId(PRODUCER_CLIENT_ID_PREFIX, config);
+ }
+
+ public static String getAdminClientId(Config config) {
+ return getConsumerClientId(ADMIN_CLIENT_ID_PREFIX, config);
+ }
+
+ static String getConsumerClientId(String id, Config config) {
+ JobConfig jobConfig = new JobConfig(config);
+ Option jobNameOption = jobConfig.getName();
+ if (jobNameOption.isEmpty()) {
+ throw new ConfigException("Missing job name");
+ }
+ String jobName = (String) jobNameOption.get();
+
+ Option jobIdOption = jobConfig.getJobId();
+ String jobId = "1";
+ if (! jobIdOption.isEmpty()) {
+ jobId = (String) jobIdOption.get();
+ }
+
+ return String.format("%s-%s-%s", id.replaceAll("\\W", "_"), jobName.replaceAll("\\W", "_"),
+ jobId.replaceAll("\\W", "_"));
+ }
+
+ /**
+ * If settings for auto.reset in samza are different from settings in Kafka (auto.offset.reset),
+ * then need to convert them (see kafka.apache.org/documentation):
+ * "largest" -> "latest"
+ * "smallest" -> "earliest"
+ *
+ * If no setting specified we return "latest" (same as Kafka).
+ * @param autoOffsetReset value from the app provided config
+ * @return String representing the config value for "auto.offset.reset" property
+ */
+ static String getAutoOffsetResetValue(final String autoOffsetReset) {
+ final String SAMZA_OFFSET_LARGEST = "largest";
+ final String SAMZA_OFFSET_SMALLEST = "smallest";
+ final String KAFKA_OFFSET_LATEST = "latest";
+ final String KAFKA_OFFSET_EARLIEST = "earliest";
+ final String KAFKA_OFFSET_NONE = "none";
+
+ if (autoOffsetReset == null) {
+ return KAFKA_OFFSET_LATEST; // return default
+ }
+
+ // accept kafka values directly
+ if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || autoOffsetReset.equals(KAFKA_OFFSET_LATEST)
+ || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) {
+ return autoOffsetReset;
+ }
+
+ String newAutoOffsetReset;
+ switch (autoOffsetReset) {
+ case SAMZA_OFFSET_LARGEST:
+ newAutoOffsetReset = KAFKA_OFFSET_LATEST;
+ break;
+ case SAMZA_OFFSET_SMALLEST:
+ newAutoOffsetReset = KAFKA_OFFSET_EARLIEST;
+ break;
+ default:
+ newAutoOffsetReset = KAFKA_OFFSET_LATEST;
+ }
+ LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, newAutoOffsetReset);
+ return newAutoOffsetReset;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
deleted file mode 100644
index 423b68a..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.samza.system.kafka
-
-import java.lang.Thread.UncaughtExceptionHandler
-import java.nio.channels.ClosedByInterruptException
-import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
-
-import kafka.api._
-import kafka.common.{ErrorMapping, NotLeaderForPartitionException, TopicAndPartition, UnknownTopicOrPartitionException}
-import kafka.consumer.ConsumerConfig
-import kafka.message.MessageSet
-import org.apache.samza.SamzaException
-import org.apache.samza.util.ExponentialSleepStrategy
-import org.apache.samza.util.KafkaUtil
-import org.apache.samza.util.Logging
-
-import scala.collection.JavaConverters._
-import scala.collection.concurrent
-
-/**
- * A BrokerProxy consolidates Kafka fetches meant for a particular broker and retrieves them all at once, providing
- * a way for consumers to retrieve those messages by topic and partition.
- */
-class BrokerProxy(
- val host: String,
- val port: Int,
- val system: String,
- val clientID: String,
- val metrics: KafkaSystemConsumerMetrics,
- val messageSink: MessageSink,
- val timeout: Int = ConsumerConfig.SocketTimeout,
- val bufferSize: Int = ConsumerConfig.SocketBufferSize,
- val fetchSize: StreamFetchSizes = new StreamFetchSizes,
- val consumerMinSize:Int = ConsumerConfig.MinFetchBytes,
- val consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs,
- offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging {
-
- /**
- * How long should the fetcher thread sleep before checking if any TopicPartitions has been added to its purview
- */
- val sleepMSWhileNoTopicPartitions = 100
-
- /** What's the next offset for a particular partition? **/
- val nextOffsets:concurrent.Map[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]().asScala
-
- /** Block on the first call to get message if the fetcher has not yet returned its initial results **/
- // TODO: It should be sufficient to just use the count down latch and await on it for each of the calls, but
- // VisualVM was showing the consumer thread spending all its time in the await method rather than returning
- // immediately, even though the process was proceeding normally. Hence the extra boolean. Should be investigated.
- val firstCallBarrier = new CountDownLatch(1)
- var firstCall = true
-
- var simpleConsumer = createSimpleConsumer()
-
- metrics.registerBrokerProxy(host, port)
-
- def createSimpleConsumer() = {
- val hostString = "%s:%d" format (host, port)
- info("Creating new SimpleConsumer for host %s for system %s" format (hostString, system))
-
- val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, clientID, fetchSize, consumerMinSize, consumerMaxWait)
- sc
- }
-
- def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
- debug("Adding new topic and partition %s to queue for %s" format (tp, host))
-
- if (nextOffsets.asJava.containsKey(tp)) {
- toss("Already consuming TopicPartition %s" format tp)
- }
-
- val offset = if (nextOffset.isDefined && offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
- nextOffset
- .get
- .toLong
- } else {
- warn("It appears that we received an invalid or empty offset %s for %s. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues." format (nextOffset, tp))
-
- offsetGetter.getResetOffset(simpleConsumer, tp)
- }
-
- debug("Got offset %s for new topic and partition %s." format (offset, tp))
-
- nextOffsets += tp -> offset
-
- metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
- }
-
- def removeTopicPartition(tp: TopicAndPartition) = {
- if (nextOffsets.asJava.containsKey(tp)) {
- val offset = nextOffsets.remove(tp)
- metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
- debug("Removed %s" format tp)
- offset
- } else {
- warn("Asked to remove topic and partition %s, but not in map (keys = %s)" format (tp, nextOffsets.keys.mkString(",")))
- None
- }
- }
-
- val thread = new Thread(new Runnable {
- def run {
- var reconnect = false
-
- try {
- (new ExponentialSleepStrategy).run(
- loop => {
- if (reconnect) {
- metrics.reconnects.get((host, port)).inc
- simpleConsumer.close()
- simpleConsumer = createSimpleConsumer()
- }
-
- while (!Thread.currentThread.isInterrupted) {
- messageSink.refreshDropped
- if (nextOffsets.size == 0) {
- debug("No TopicPartitions to fetch. Sleeping.")
- Thread.sleep(sleepMSWhileNoTopicPartitions)
- } else {
- fetchMessages
-
- // If we got here, fetchMessages didn't throw an exception, i.e. it was successful.
- // In that case, reset the loop delay, so that the next time an error occurs,
- // we start with a short retry delay.
- loop.reset
- }
- }
- },
-
- (exception, loop) => {
- warn("Restarting consumer due to %s. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace." format exception)
- debug("Exception detail:", exception)
- abdicateAll
- reconnect = true
- })
- } catch {
- case e: InterruptedException => info("Got interrupt exception in broker proxy thread.")
- case e: ClosedByInterruptException => info("Got closed by interrupt exception in broker proxy thread.")
- case e: OutOfMemoryError => throw new SamzaException("Got out of memory error in broker proxy thread.")
- case e: StackOverflowError => throw new SamzaException("Got stack overflow error in broker proxy thread.")
- }
-
- if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt.")
- }
- }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID))
-
- private def fetchMessages(): Unit = {
- val topicAndPartitionsToFetch = nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList
-
- if (topicAndPartitionsToFetch.size > 0) {
- metrics.brokerReads.get((host, port)).inc
- val response: FetchResponse = simpleConsumer.defaultFetch(topicAndPartitionsToFetch: _*)
- firstCall = false
- firstCallBarrier.countDown()
-
- // Split response into errors and non errors, processing the errors first
- val (nonErrorResponses, errorResponses) = response.data.toSet.partition(_._2.error.code() == ErrorMapping.NoError)
-
- handleErrors(errorResponses, response)
-
- nonErrorResponses.foreach { case (tp, data) => moveMessagesToTheirQueue(tp, data) }
- } else {
- refreshLatencyMetrics
-
- debug("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions))
-
- metrics.brokerSkippedFetchRequests.get((host, port)).inc
-
- Thread.sleep(sleepMSWhileNoTopicPartitions)
- }
- }
-
- /**
- * Releases ownership for a single TopicAndPartition. The
- * KafkaSystemConsumer will try and find a new broker for the
- * TopicAndPartition.
- */
- def abdicate(tp: TopicAndPartition) = removeTopicPartition(tp) match {
- // Need to be mindful of a tp that was removed by another thread
- case Some(offset) => messageSink.abdicate(tp, offset)
- case None => warn("Tried to abdicate for topic partition not in map. Removed in interim?")
- }
-
- /**
- * Releases all TopicAndPartition ownership for this BrokerProxy thread. The
- * KafkaSystemConsumer will try and find a new broker for the
- * TopicAndPartition.
- */
- def abdicateAll {
- info("Abdicating all topic partitions.")
- val immutableNextOffsetsCopy = nextOffsets.toMap
- immutableNextOffsetsCopy.keySet.foreach(abdicate(_))
- }
-
- def handleErrors(errorResponses: Set[(TopicAndPartition, FetchResponsePartitionData)], response: FetchResponse) = {
- // FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves
- case class Error(tp: TopicAndPartition, code: Short, exception: Exception)
-
- // Now subdivide the errors into three types: non-recoverable, not leader (== abdicate) and offset out of range (== get new offset)
-
- // Convert FetchResponse into easier-to-work-with Errors
- val errors = for (
- (topicAndPartition, responseData) <- errorResponses;
- error <- Option(response.error(topicAndPartition.topic, topicAndPartition.partition)) // Scala's being cranky about referring to error.getKey values...
- ) yield new Error(topicAndPartition, error.code(), error.exception())
-
- val (notLeaderOrUnknownTopic, otherErrors) = errors.partition { case (e) => e.code == ErrorMapping.NotLeaderForPartitionCode || e.code == ErrorMapping.UnknownTopicOrPartitionCode }
- val (offsetOutOfRangeErrors, remainingErrors) = otherErrors.partition(_.code == ErrorMapping.OffsetOutOfRangeCode)
-
- // Can recover from two types of errors: not leader (go find the new leader) and offset out of range (go get the new offset)
- // However, we want to bail as quickly as possible if there are non recoverable errors so that the state of the other
- // topic-partitions remains the same. That way, when we've rebuilt the simple consumer, we can come around and
- // handle the recoverable errors.
- remainingErrors.foreach(e => {
- warn("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format remainingErrors.mkString(","))
- KafkaUtil.maybeThrowException(e.exception) })
-
- notLeaderOrUnknownTopic.foreach(e => {
- warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating" format(e.code, e.tp))
- abdicate(e.tp)
- })
-
- offsetOutOfRangeErrors.foreach(e => {
- warn("Received OffsetOutOfRange exception for %s. Current offset = %s" format (e.tp, nextOffsets.getOrElse(e.tp, "not found in map, likely removed in the interim")))
-
- try {
- val newOffset = offsetGetter.getResetOffset(simpleConsumer, e.tp)
- // Put the new offset into the map (if the tp still exists). Will catch it on the next go-around
- nextOffsets.replace(e.tp, newOffset)
- } catch {
- // UnknownTopic or NotLeader are routine events and handled via abdication. All others, bail.
- case _ @ (_:UnknownTopicOrPartitionException | _: NotLeaderForPartitionException) => warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating" format(e.code, e.tp))
- abdicate(e.tp)
- }
- })
- }
-
- def moveMessagesToTheirQueue(tp: TopicAndPartition, data: FetchResponsePartitionData) = {
- val messageSet: MessageSet = data.messages
- var nextOffset = nextOffsets(tp)
-
- messageSink.setIsAtHighWatermark(tp, data.hw == 0 || data.hw == nextOffset)
- require(messageSet != null)
- for (message <- messageSet.iterator) {
- messageSink.addMessage(tp, message, data.hw) // TODO: Verify this is correct
-
- nextOffset = message.nextOffset
-
- val bytesSize = message.message.payloadSize + message.message.keySize
- metrics.reads.get(tp).inc
- metrics.bytesRead.get(tp).inc(bytesSize)
- metrics.brokerBytesRead.get((host, port)).inc(bytesSize)
- metrics.offsets.get(tp).set(nextOffset)
- }
-
- nextOffsets.replace(tp, nextOffset) // use replace rather than put in case this tp was removed while we were fetching.
-
- // Update high water mark
- val hw = data.hw
- if (hw >= 0) {
- metrics.highWatermark.get(tp).set(hw)
- metrics.lag.get(tp).set(hw - nextOffset)
- } else {
- debug("Got a high water mark less than 0 (%d) for %s, so skipping." format (hw, tp))
- }
- }
- override def toString() = "BrokerProxy for %s:%d" format (host, port)
-
- def start {
- if (!thread.isAlive) {
- info("Starting " + toString)
- thread.setDaemon(true)
- thread.setName("Samza BrokerProxy " + thread.getName)
- thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
- override def uncaughtException(t: Thread, e: Throwable) = error("Uncaught exception in broker proxy:", e)
- })
- thread.start
- } else {
- debug("Tried to start an already started broker proxy (%s). Ignoring." format toString)
- }
- }
-
- def stop {
- info("Shutting down " + toString)
-
- if (simpleConsumer != null) {
- info("closing simple consumer...")
- simpleConsumer.close
- }
-
- thread.interrupt
- thread.join
- }
-
- private def refreshLatencyMetrics {
- nextOffsets.foreach{
- case (topicAndPartition, offset) => {
- val latestOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, -1, Request.OrdinaryConsumerId)
- trace("latest offset of %s is %s" format (topicAndPartition, latestOffset))
- if (latestOffset >= 0) {
- // only update the registered topicAndpartitions
- if(metrics.highWatermark.containsKey(topicAndPartition)) {
- metrics.highWatermark.get(topicAndPartition).set(latestOffset)
- }
- if(metrics.lag.containsKey(topicAndPartition)) {
- metrics.lag.get(topicAndPartition).set(latestOffset - offset)
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
new file mode 100644
index 0000000..04071c1
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -0,0 +1,423 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import kafka.common.TopicAndPartition;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class contains a separate thread that reads messages from kafka and puts them into the BlockingEnvelopeMap
+ * through KafkaSystemConsumer.KafkaConsumerMessageSink object.
+ * This class is not thread safe. There will be only one instance of this class per KafkaSystemConsumer object.
+ * We still need some synchronization around kafkaConsumer. See pollConsumer() method for details.
+ */
+class KafkaConsumerProxy<K, V> {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProxy.class);
+
+ private static final int SLEEP_MS_WHILE_NO_TOPIC_PARTITION = 100;
+
+ final Thread consumerPollThread;
+ private final Consumer<K, V> kafkaConsumer;
+ private final KafkaSystemConsumer.KafkaConsumerMessageSink sink;
+ private final KafkaSystemConsumerMetrics kafkaConsumerMetrics;
+ private final String metricName;
+ private final String systemName;
+ private final String clientId;
+ private final Map<TopicPartition, SystemStreamPartition> topicPartitionToSSP = new HashMap<>();
+ private final Map<SystemStreamPartition, MetricName> perPartitionMetrics = new HashMap<>();
+ // list of all the SSPs we poll from, with their next(most recently read + 1) offsets correspondingly.
+ private final Map<SystemStreamPartition, Long> nextOffsets = new ConcurrentHashMap<>();
+ // lags behind the high water mark, as reported by the Kafka consumer.
+ private final Map<SystemStreamPartition, Long> latestLags = new HashMap<>();
+
+ private volatile boolean isRunning = false;
+ private volatile Throwable failureCause = null;
+ private final CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1);
+
+ KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId,
+ KafkaSystemConsumer.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics,
+ String metricName) {
+
+ this.kafkaConsumer = kafkaConsumer;
+ this.systemName = systemName;
+ this.sink = messageSink;
+ this.kafkaConsumerMetrics = samzaConsumerMetrics;
+ this.metricName = metricName;
+ this.clientId = clientId;
+
+ this.kafkaConsumerMetrics.registerClientProxy(metricName);
+
+ consumerPollThread = new Thread(createProxyThreadRunnable());
+ consumerPollThread.setDaemon(true);
+ consumerPollThread.setName(
+ "Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName);
+ }
+
+ /**
+ * Add new partition to the list of polled partitions.
+ * Bust only be called before {@link KafkaConsumerProxy#start} is called..
+ */
+ public void addTopicPartition(SystemStreamPartition ssp, long nextOffset) {
+ LOG.info(String.format("Adding new topicPartition %s with offset %s to queue for consumer %s", ssp, nextOffset,
+ this));
+ topicPartitionToSSP.put(KafkaSystemConsumer.toTopicPartition(ssp), ssp); //registered SSPs
+
+ // this is already vetted offset so there is no need to validate it
+ nextOffsets.put(ssp, nextOffset);
+
+ kafkaConsumerMetrics.setNumTopicPartitions(metricName, nextOffsets.size());
+ }
+
+ /**
+ * Stop this KafkaConsumerProxy and wait for at most {@code timeoutMs}.
+ * @param timeoutMs maximum time to wait to stop this KafkaConsumerProxy
+ */
+ public void stop(long timeoutMs) {
+ LOG.info("Shutting down KafkaConsumerProxy poll thread {} for {}", consumerPollThread.getName(), this);
+
+ isRunning = false;
+ try {
+ consumerPollThread.join(timeoutMs/2);
+ // join() may timeout
+ // in this case we should interrupt it and wait again
+ if (consumerPollThread.isAlive()) {
+ consumerPollThread.interrupt();
+ consumerPollThread.join(timeoutMs/2);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Join in KafkaConsumerProxy has failed", e);
+ consumerPollThread.interrupt();
+ }
+ }
+
+ public void start() {
+ if (!consumerPollThread.isAlive()) {
+ LOG.info("Starting KafkaConsumerProxy polling thread for " + this.toString());
+
+ consumerPollThread.start();
+
+ // we need to wait until the thread starts
+ while (!isRunning && failureCause == null) {
+ try {
+ consumerPollThreadStartLatch.await(3000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.info("Ignoring InterruptedException while waiting for consumer poll thread to start.", e);
+ }
+ }
+ } else {
+ LOG.warn("Tried to start an already started KafkaConsumerProxy (%s). Ignoring.", this.toString());
+ }
+
+ if (topicPartitionToSSP.size() == 0) {
+ String msg = String.format("Cannot start KafkaConsumerProxy without any registered TopicPartitions for %s", systemName);
+ LOG.error(msg);
+ throw new SamzaException(msg);
+ }
+ }
+
+ boolean isRunning() {
+ return isRunning;
+ }
+
+ Throwable getFailureCause() {
+ return failureCause;
+ }
+
+ private void initializeLags() {
+ // This is expensive, so only do it once at the beginning. After the first poll, we can rely on metrics for lag.
+ Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitionToSSP.keySet());
+ endOffsets.forEach((tp, offset) -> {
+ SystemStreamPartition ssp = topicPartitionToSSP.get(tp);
+ long startingOffset = nextOffsets.get(ssp);
+ // End offsets are the offset of the newest message + 1
+ // If the message we are about to consume is < end offset, we are starting with a lag.
+ long initialLag = endOffsets.get(tp) - startingOffset;
+
+ LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset);
+ latestLags.put(ssp, initialLag);
+ sink.setIsAtHighWatermark(ssp, initialLag == 0);
+ });
+
+ // initialize lag metrics
+ refreshLagMetrics();
+ }
+
+ // creates a separate thread for getting the messages.
+ private Runnable createProxyThreadRunnable() {
+ Runnable runnable = () -> {
+ isRunning = true;
+
+ try {
+ consumerPollThreadStartLatch.countDown();
+ LOG.info("Starting consumer poll thread {} for system {}", consumerPollThread.getName(), systemName);
+ initializeLags();
+ while (isRunning) {
+ fetchMessages();
+ }
+ } catch (Throwable throwable) {
+ LOG.error(String.format("Error in KafkaConsumerProxy poll thread for system: %s.", systemName), throwable);
+ // KafkaSystemConsumer uses the failureCause to propagate the throwable to the container
+ failureCause = throwable;
+ isRunning = false;
+ }
+
+ if (!isRunning) {
+ LOG.info("KafkaConsumerProxy for system {} has stopped.", systemName);
+ }
+ };
+
+ return runnable;
+ }
+
+ private void fetchMessages() {
+ Set<SystemStreamPartition> sspsToFetch = new HashSet<>();
+ for (SystemStreamPartition ssp : nextOffsets.keySet()) {
+ if (sink.needsMoreMessages(ssp)) {
+ sspsToFetch.add(ssp);
+ }
+ }
+ LOG.debug("pollConsumer for {} SSPs: {}", sspsToFetch.size(), sspsToFetch);
+ if (!sspsToFetch.isEmpty()) {
+ kafkaConsumerMetrics.incClientReads(metricName);
+
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response;
+
+ response = pollConsumer(sspsToFetch, 500L);
+
+ // move the responses into the queue
+ for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : response.entrySet()) {
+ List<IncomingMessageEnvelope> envelopes = e.getValue();
+ if (envelopes != null) {
+ moveMessagesToTheirQueue(e.getKey(), envelopes);
+ }
+ }
+
+ populateCurrentLags(sspsToFetch); // find current lags for for each SSP
+ } else { // nothing to read
+
+ LOG.debug("No topic/partitions need to be fetched for system {} right now. Sleeping {}ms.", systemName,
+ SLEEP_MS_WHILE_NO_TOPIC_PARTITION);
+
+ kafkaConsumerMetrics.incClientSkippedFetchRequests(metricName);
+
+ try {
+ Thread.sleep(SLEEP_MS_WHILE_NO_TOPIC_PARTITION);
+ } catch (InterruptedException e) {
+ LOG.warn("Sleep in fetchMessages was interrupted");
+ }
+ }
+ refreshLagMetrics();
+ }
+
+ // the actual polling of the messages from kafka
+ private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollConsumer(
+ Set<SystemStreamPartition> systemStreamPartitions, long timeoutMs) {
+
+ // Since we need to poll only from some subset of TopicPartitions (passed as the argument),
+ // we need to pause the rest.
+ List<TopicPartition> topicPartitionsToPause = new ArrayList<>();
+ List<TopicPartition> topicPartitionsToPoll = new ArrayList<>();
+
+ for (Map.Entry<TopicPartition, SystemStreamPartition> e : topicPartitionToSSP.entrySet()) {
+ TopicPartition tp = e.getKey();
+ SystemStreamPartition ssp = e.getValue();
+ if (systemStreamPartitions.contains(ssp)) {
+ topicPartitionsToPoll.add(tp); // consume
+ } else {
+ topicPartitionsToPause.add(tp); // ignore
+ }
+ }
+
+ ConsumerRecords<K, V> records;
+ try {
+ // Synchronize, in case the consumer is used in some other thread (metadata or something else)
+ synchronized (kafkaConsumer) {
+ // Since we are not polling from ALL the subscribed topics, so we need to "change" the subscription temporarily
+ kafkaConsumer.pause(topicPartitionsToPause);
+ kafkaConsumer.resume(topicPartitionsToPoll);
+ records = kafkaConsumer.poll(timeoutMs);
+ }
+ } catch (Exception e) {
+ // we may get InvalidOffsetException | AuthorizationException | KafkaException exceptions,
+ // but we still just rethrow, and log it up the stack.
+ LOG.error("Caught a Kafka exception in pollConsumer for system " + systemName, e);
+ throw e;
+ }
+
+ return processResults(records);
+ }
+
+ private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> processResults(ConsumerRecords<K, V> records) {
+ if (records == null) {
+ throw new SamzaException("Received null 'records' after polling consumer in KafkaConsumerProxy " + this);
+ }
+
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = new HashMap<>(records.count());
+ // Parse the returned records and convert them into the IncomingMessageEnvelope.
+ for (ConsumerRecord<K, V> record : records) {
+ int partition = record.partition();
+ String topic = record.topic();
+ TopicPartition tp = new TopicPartition(topic, partition);
+
+ updateMetrics(record, tp);
+
+ SystemStreamPartition ssp = topicPartitionToSSP.get(tp);
+ List<IncomingMessageEnvelope> messages = results.get(ssp);
+ if (messages == null) {
+ messages = new ArrayList<>();
+ results.put(ssp, messages);
+ }
+
+ K key = record.key();
+ Object value = record.value();
+ IncomingMessageEnvelope imEnvelope =
+ new IncomingMessageEnvelope(ssp, String.valueOf(record.offset()), key, value, getRecordSize(record));
+ messages.add(imEnvelope);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("# records per SSP:");
+ for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : results.entrySet()) {
+ List<IncomingMessageEnvelope> list = e.getValue();
+ LOG.debug(e.getKey() + " = " + ((list == null) ? 0 : list.size()));
+ }
+ }
+
+ return results;
+ }
+
+ private int getRecordSize(ConsumerRecord<K, V> r) {
+ int keySize = (r.key() == null) ? 0 : r.serializedKeySize();
+ return keySize + r.serializedValueSize();
+ }
+
+ private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) {
+ TopicAndPartition tap = KafkaSystemConsumer.toTopicAndPartition(tp);
+ SystemStreamPartition ssp = new SystemStreamPartition(systemName, tp.topic(), new Partition(tp.partition()));
+
+ Long lag = latestLags.get(ssp);
+ if (lag == null) {
+ throw new SamzaException("Unknown/unregistered ssp in latestLags. ssp=" + ssp + "; system=" + systemName);
+ }
+ long currentSSPLag = lag.longValue(); // lag between the current offset and the highwatermark
+ if (currentSSPLag < 0) {
+ return;
+ }
+
+ long recordOffset = r.offset();
+ long highWatermark = recordOffset + currentSSPLag; // derived value for the highwatermark
+
+ int size = getRecordSize(r);
+ kafkaConsumerMetrics.incReads(tap);
+ kafkaConsumerMetrics.incBytesReads(tap, size);
+ kafkaConsumerMetrics.setOffsets(tap, recordOffset);
+ kafkaConsumerMetrics.incClientBytesReads(metricName, size);
+ kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark);
+ }
+
+ private void moveMessagesToTheirQueue(SystemStreamPartition ssp, List<IncomingMessageEnvelope> envelopes) {
+ long nextOffset = nextOffsets.get(ssp);
+
+ for (IncomingMessageEnvelope env : envelopes) {
+ sink.addMessage(ssp, env); // move message to the BlockingEnvelopeMap's queue
+
+ LOG.trace("IncomingMessageEnvelope. got envelope with offset:{} for ssp={}", env.getOffset(), ssp);
+ nextOffset = Long.valueOf(env.getOffset()) + 1;
+ }
+
+ nextOffsets.put(ssp, nextOffset);
+ }
+
+ // The only way to figure out lag for the KafkaConsumer is to look at the metrics after each poll() call.
+ // One of the metrics (records-lag) shows how far behind the HighWatermark the consumer is.
+ // This method populates the lag information for each SSP into latestLags member variable.
+ private void populateCurrentLags(Set<SystemStreamPartition> ssps) {
+
+ Map<MetricName, ? extends Metric> consumerMetrics = kafkaConsumer.metrics();
+
+ // populate the MetricNames first time
+ if (perPartitionMetrics.isEmpty()) {
+ HashMap<String, String> tags = new HashMap<>();
+ tags.put("client-id", clientId); // this is required by the KafkaConsumer to get the metrics
+
+ for (SystemStreamPartition ssp : ssps) {
+ TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp);
+ perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags));
+ }
+ }
+
+ for (SystemStreamPartition ssp : ssps) {
+ MetricName mn = perPartitionMetrics.get(ssp);
+ Metric currentLagMetric = consumerMetrics.get(mn);
+
+ // High watermark is fixed to be the offset of last available message,
+ // so the lag is now at least 0, which is the same as Samza's definition.
+ // If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling.
+ long currentLag = (currentLagMetric != null) ? (long) currentLagMetric.value() : -1L;
+ latestLags.put(ssp, currentLag);
+
+ // calls the setIsAtHead for the BlockingEnvelopeMap
+ sink.setIsAtHighWatermark(ssp, currentLag == 0);
+ }
+ }
+
+ private void refreshLagMetrics() {
+ for (Map.Entry<SystemStreamPartition, Long> e : nextOffsets.entrySet()) {
+ SystemStreamPartition ssp = e.getKey();
+ Long offset = e.getValue();
+ TopicAndPartition tp = new TopicAndPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
+ Long lag = latestLags.get(ssp);
+ LOG.trace("Latest offset of {} is {}; lag = {}", ssp, offset, lag);
+ if (lag != null && offset != null && lag >= 0) {
+ long streamEndOffset = offset.longValue() + lag.longValue();
+ // update the metrics
+ kafkaConsumerMetrics.setHighWatermarkValue(tp, streamEndOffset);
+ kafkaConsumerMetrics.setLagValue(tp, lag.longValue());
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("consumerProxy-%s-%s", systemName, clientId);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
new file mode 100644
index 0000000..10ce274
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java
@@ -0,0 +1,371 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import kafka.common.TopicAndPartition;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.KafkaConsumerConfig;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+
+public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements SystemConsumer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSystemConsumer.class);
+
+ private static final long FETCH_THRESHOLD = 50000;
+ private static final long FETCH_THRESHOLD_BYTES = -1L;
+
+ private final Consumer<K, V> kafkaConsumer;
+ private final String systemName;
+ private final String clientId;
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final Config config;
+ private final boolean fetchThresholdBytesEnabled;
+ private final KafkaSystemConsumerMetrics metrics;
+
+ // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap.
+ final KafkaConsumerMessageSink messageSink;
+
+ // This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates
+ // BlockingEnvelopMap's buffers.
+ final private KafkaConsumerProxy proxy;
+
+ // keep registration data until the start - mapping between registered SSPs and topicPartitions, and their offsets
+ final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>();
+ final Map<TopicPartition, SystemStreamPartition> topicPartitionsToSSP = new HashMap<>();
+
+ long perPartitionFetchThreshold;
+ long perPartitionFetchThresholdBytes;
+
+ /**
+ * Create a KafkaSystemConsumer for the provided {@code systemName}
+ * @param systemName system name for which we create the consumer
+ * @param config application config
+ * @param metrics metrics for this KafkaSystemConsumer
+ * @param clock system clock
+ */
+ public KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId,
+ KafkaSystemConsumerMetrics metrics, Clock clock) {
+
+ super(metrics.registry(), clock, metrics.getClass().getName());
+
+ this.kafkaConsumer = kafkaConsumer;
+ this.clientId = clientId;
+ this.systemName = systemName;
+ this.config = config;
+ this.metrics = metrics;
+
+ fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
+
+ // create a sink for passing the messages between the proxy and the consumer
+ messageSink = new KafkaConsumerMessageSink();
+
+ // Create the proxy to do the actual message reading.
+ String metricName = String.format("%s", systemName);
+ proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, metrics, metricName);
+ LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy);
+ }
+
+ /**
+ * Create internal kafka consumer object, which will be used in the Proxy.
+ * @param systemName system name for which we create the consumer
+ * @param clientId client id to use int the kafka client
+ * @param config config
+ * @return kafka consumer object
+ */
+ public static KafkaConsumer<byte[], byte[]> getKafkaConsumerImpl(String systemName, String clientId, Config config) {
+
+ // extract kafka client configs
+ KafkaConsumerConfig consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId);
+
+ LOG.info("{}: KafkaClient properties {}", systemName, consumerConfig);
+
+ return new KafkaConsumer(consumerConfig);
+ }
+
+ @Override
+ public void start() {
+ if (!started.compareAndSet(false, true)) {
+ LOG.warn("{}: Attempting to start the consumer for the second (or more) time.", this);
+ return;
+ }
+ if (stopped.get()) {
+ LOG.error("{}: Attempting to start a stopped consumer", this);
+ return;
+ }
+ // initialize the subscriptions for all the registered TopicPartitions
+ startSubscription();
+ // needs to be called after all the registrations are completed
+ setFetchThresholds();
+
+ startConsumer();
+ LOG.info("{}: Consumer started", this);
+ }
+
+ private void startSubscription() {
+ //subscribe to all the registered TopicPartitions
+ LOG.info("{}: Consumer subscribes to {}", this, topicPartitionsToSSP.keySet());
+ try {
+ synchronized (kafkaConsumer) {
+ // we are using assign (and not subscribe), so we need to specify both topic and partition
+ kafkaConsumer.assign(topicPartitionsToSSP.keySet());
+ }
+ } catch (Exception e) {
+ throw new SamzaException("Consumer subscription failed for " + this, e);
+ }
+ }
+
+ /**
+ * Set the offsets to start from.
+ * Register the TopicPartitions with the proxy.
+ * Start the proxy.
+ */
+ void startConsumer() {
+ // set the offset for each TopicPartition
+ if (topicPartitionsToOffset.size() <= 0) {
+ LOG.error ("{}: Consumer is not subscribed to any SSPs", this);
+ }
+
+ topicPartitionsToOffset.forEach((tp, startingOffsetString) -> {
+ long startingOffset = Long.valueOf(startingOffsetString);
+
+ try {
+ synchronized (kafkaConsumer) {
+ kafkaConsumer.seek(tp, startingOffset); // this value should already be the 'upcoming' value
+ }
+ } catch (Exception e) {
+ // all recoverable execptions are handled by the client.
+ // if we get here there is nothing left to do but bail out.
+ String msg =
+ String.format("%s: Got Exception while seeking to %s for partition %s", this, startingOffsetString, tp);
+ LOG.error(msg, e);
+ throw new SamzaException(msg, e);
+ }
+
+ LOG.info("{}: Changing consumer's starting offset for tp = %s to %s", this, tp, startingOffsetString);
+
+ // add the partition to the proxy
+ proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset);
+ });
+
+ // start the proxy thread
+ if (proxy != null && !proxy.isRunning()) {
+ LOG.info("{}: Starting proxy {}", this, proxy);
+ proxy.start();
+ }
+ }
+
+ private void setFetchThresholds() {
+ // get the thresholds, and set defaults if not defined.
+ KafkaConfig kafkaConfig = new KafkaConfig(config);
+
+ Option<String> fetchThresholdOption = kafkaConfig.getConsumerFetchThreshold(systemName);
+ long fetchThreshold = FETCH_THRESHOLD;
+ if (fetchThresholdOption.isDefined()) {
+ fetchThreshold = Long.valueOf(fetchThresholdOption.get());
+ }
+
+ Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName);
+ long fetchThresholdBytes = FETCH_THRESHOLD_BYTES;
+ if (fetchThresholdBytesOption.isDefined()) {
+ fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get());
+ }
+
+ int numPartitions = topicPartitionsToSSP.size();
+ if (numPartitions != topicPartitionsToOffset.size()) {
+ throw new SamzaException("topicPartitionsToSSP.size() doesn't match topicPartitionsToOffset.size()");
+ }
+
+
+ if (numPartitions > 0) {
+ perPartitionFetchThreshold = fetchThreshold / numPartitions;
+ if (fetchThresholdBytesEnabled) {
+ // currently this feature cannot be enabled, because we do not have the size of the messages available.
+ // messages get double buffered, hence divide by 2
+ perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numPartitions;
+ }
+ }
+ LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; numPartitions={}, perPartitionFetchThreshold={}, perPartitionFetchThresholdBytes(0 if disabled)={}",
+ this, fetchThresholdBytes, fetchThreshold, numPartitions, perPartitionFetchThreshold, perPartitionFetchThresholdBytes);
+ }
+
+ @Override
+ public void stop() {
+ if (!stopped.compareAndSet(false, true)) {
+ LOG.warn("{}: Attempting to stop stopped consumer.", this);
+ return;
+ }
+
+ LOG.info("{}: Stopping Samza kafkaConsumer ", this);
+
+ // stop the proxy (with 1 minute timeout)
+ if (proxy != null) {
+ LOG.info("{}: Stopping proxy {}", this, proxy);
+ proxy.stop(TimeUnit.SECONDS.toMillis(60));
+ }
+
+ try {
+ synchronized (kafkaConsumer) {
+ LOG.info("{}: Closing kafkaSystemConsumer {}", this, kafkaConsumer);
+ kafkaConsumer.close();
+ }
+ } catch (Exception e) {
+ LOG.warn("{}: Failed to stop KafkaSystemConsumer.", this, e);
+ }
+ }
+
+ /**
+ * record the ssp and the offset. Do not submit it to the consumer yet.
+ * @param systemStreamPartition ssp to register
+ * @param offset offset to register with
+ */
+ @Override
+ public void register(SystemStreamPartition systemStreamPartition, String offset) {
+ if (started.get()) {
+ String msg = String.format("%s: Trying to register partition after consumer has been started. ssp=%s", this,
+ systemStreamPartition);
+ throw new SamzaException(msg);
+ }
+
+ if (!systemStreamPartition.getSystem().equals(systemName)) {
+ LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition);
+ return;
+ }
+ LOG.info("{}: Registering ssp = {} with offset {}", this, systemStreamPartition, offset);
+
+ super.register(systemStreamPartition, offset);
+
+ TopicPartition tp = toTopicPartition(systemStreamPartition);
+
+ topicPartitionsToSSP.put(tp, systemStreamPartition);
+
+ String existingOffset = topicPartitionsToOffset.get(tp);
+ // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages.
+ if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) {
+ topicPartitionsToOffset.put(tp, offset);
+ }
+
+ metrics.registerTopicAndPartition(toTopicAndPartition(tp));
+ }
+
+ /**
+ * Compare two String offsets.
+ * Note. There is a method in KafkaSystemAdmin that does that, but that would require instantiation of systemadmin for each consumer.
+ * @return see {@link Long#compareTo(Long)}
+ */
+ private static int compareOffsets(String offset1, String offset2) {
+ return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s:%s", systemName, clientId);
+ }
+
+ @Override
+ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+ Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
+
+ // check if the proxy is running
+ if (!proxy.isRunning()) {
+ stop();
+ String message = String.format("%s: KafkaConsumerProxy has stopped.", this);
+ throw new SamzaException(message, proxy.getFailureCause());
+ }
+
+ return super.poll(systemStreamPartitions, timeout);
+ }
+
+ /**
+ * convert from TopicPartition to TopicAndPartition
+ */
+ public static TopicAndPartition toTopicAndPartition(TopicPartition tp) {
+ return new TopicAndPartition(tp.topic(), tp.partition());
+ }
+
+ /**
+ * convert to TopicPartition from SystemStreamPartition
+ */
+ public static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
+ return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
+ }
+
+ /**
+ * return system name for this consumer
+ * @return system name
+ */
+ public String getSystemName() {
+ return systemName;
+ }
+
+ public class KafkaConsumerMessageSink {
+
+ public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean isAtHighWatermark) {
+ setIsAtHead(ssp, isAtHighWatermark);
+ }
+
+ boolean needsMoreMessages(SystemStreamPartition ssp) {
+ LOG.debug("{}: needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};"
+ + "(limit={}); messagesNumInQueue={}(limit={};", this, ssp, fetchThresholdBytesEnabled,
+ getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp),
+ perPartitionFetchThreshold);
+
+ if (fetchThresholdBytesEnabled) {
+ return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes;
+ } else {
+ return getNumMessagesInQueue(ssp) < perPartitionFetchThreshold;
+ }
+ }
+
+ void addMessage(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) {
+ LOG.trace("{}: Incoming message ssp = {}: envelope = {}.", this, ssp, envelope);
+
+ try {
+ put(ssp, envelope);
+ } catch (InterruptedException e) {
+ throw new SamzaException(
+ String.format("%s: Consumer was interrupted while trying to add message with offset %s for ssp %s", this,
+ envelope.getOffset(), ssp));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/003ad106/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
deleted file mode 100644
index fd84c4a..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.kafka
-
-import kafka.common.TopicAndPartition
-import org.apache.samza.util.Logging
-import kafka.message.Message
-import kafka.message.MessageAndOffset
-import org.apache.samza.Partition
-import org.apache.kafka.common.utils.Utils
-import org.apache.samza.util.Clock
-import kafka.serializer.DefaultDecoder
-import kafka.serializer.Decoder
-import org.apache.samza.util.BlockingEnvelopeMap
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.IncomingMessageEnvelope
-import kafka.consumer.ConsumerConfig
-import org.apache.samza.util.TopicMetadataStore
-import kafka.api.PartitionMetadata
-import kafka.api.TopicMetadata
-import org.apache.samza.util.ExponentialSleepStrategy
-import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
-import org.apache.samza.system.SystemAdmin
-
-object KafkaSystemConsumer {
-
- // Approximate additional shallow heap overhead per message in addition to the raw bytes
- // received from Kafka 4 + 64 + 4 + 4 + 4 = 80 bytes overhead.
- // As this overhead is a moving target, and not very large
- // compared to the message size its being ignore in the computation for now.
- val MESSAGE_SIZE_OVERHEAD = 4 + 64 + 4 + 4 + 4;
-
- def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
- val topic = systemStreamPartition.getStream
- val partitionId = systemStreamPartition.getPartition.getPartitionId
- TopicAndPartition(topic, partitionId)
- }
-}
-
-/**
- * Maintain a cache of BrokerProxies, returning the appropriate one for the
- * requested topic and partition.
- */
-private[kafka] class KafkaSystemConsumer(
- systemName: String,
- systemAdmin: SystemAdmin,
- metrics: KafkaSystemConsumerMetrics,
- metadataStore: TopicMetadataStore,
- clientId: String,
- timeout: Int = ConsumerConfig.ConsumerTimeoutMs,
- bufferSize: Int = ConsumerConfig.SocketBufferSize,
- fetchSize: StreamFetchSizes = new StreamFetchSizes,
- consumerMinSize: Int = ConsumerConfig.MinFetchBytes,
- consumerMaxWait: Int = ConsumerConfig.MaxFetchWaitMs,
-
- /**
- * Defines a low water mark for how many messages we buffer before we start
- * executing fetch requests against brokers to get more messages. This value
- * is divided equally among all registered SystemStreamPartitions. For
- * example, if fetchThreshold is set to 50000, and there are 50
- * SystemStreamPartitions registered, then the per-partition threshold is
- * 1000. As soon as a SystemStreamPartition's buffered message count drops
- * below 1000, a fetch request will be executed to get more data for it.
- *
- * Increasing this parameter will decrease the latency between when a queue
- * is drained of messages and when new messages are enqueued, but also leads
- * to an increase in memory usage since more messages will be held in memory.
- */
- fetchThreshold: Int = 50000,
- /**
- * Defines a low water mark for how many bytes we buffer before we start
- * executing fetch requests against brokers to get more messages. This
- * value is divided by 2 because the messages are buffered twice, once in
- * KafkaConsumer and then in SystemConsumers. This value
- * is divided equally among all registered SystemStreamPartitions.
- * However this is a soft limit per partition, as the
- * bytes are cached at the message boundaries, and the actual usage can be
- * 1000 bytes + size of max message in the partition for a given stream.
- * The bytes if the size of the bytebuffer in Message. Hence, the
- * Object overhead is not taken into consideration. In this codebase
- * it seems to be quite small. Hence, even for 500000 messages this is around 4MB x 2 = 8MB,
- * which is not considerable.
- *
- * For example,
- * if fetchThresholdBytes is set to 100000 bytes, and there are 50
- * SystemStreamPartitions registered, then the per-partition threshold is
- * (100000 / 2) / 50 = 1000 bytes.
- * As this is a soft limit, the actual usage can be 1000 bytes + size of max message.
- * As soon as a SystemStreamPartition's buffered messages bytes drops
- * below 1000, a fetch request will be executed to get more data for it.
- *
- * Increasing this parameter will decrease the latency between when a queue
- * is drained of messages and when new messages are enqueued, but also leads
- * to an increase in memory usage since more messages will be held in memory.
- *
- * The default value is -1, which means this is not used. When the value
- * is > 0, then the fetchThreshold which is count based is ignored.
- */
- fetchThresholdBytes: Long = -1,
- /**
- * if(fetchThresholdBytes > 0) true else false
- */
- fetchLimitByBytesEnabled: Boolean = false,
- offsetGetter: GetOffset = new GetOffset("fail"),
- deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
- keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
- retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
- clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(
- metrics.registry,
- new Clock {
- def currentTimeMillis = clock()
- },
- classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging {
-
- type HostPort = (String, Int)
- val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
- val topicPartitionsAndOffsets: scala.collection.concurrent.Map[TopicAndPartition, String] = new ConcurrentHashMap[TopicAndPartition, String]().asScala
- var perPartitionFetchThreshold = fetchThreshold
- var perPartitionFetchThresholdBytes = 0L
-
- def start() {
- if (topicPartitionsAndOffsets.size > 0) {
- perPartitionFetchThreshold = fetchThreshold / topicPartitionsAndOffsets.size
- // messages get double buffered, hence divide by 2
- if(fetchLimitByBytesEnabled) {
- perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / topicPartitionsAndOffsets.size
- }
- }
-
- systemAdmin.start()
- refreshBrokers
- }
-
- override def register(systemStreamPartition: SystemStreamPartition, offset: String) {
- super.register(systemStreamPartition, offset)
-
- val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)
- val existingOffset = topicPartitionsAndOffsets.getOrElseUpdate(topicAndPartition, offset)
- // register the older offset in the consumer
- if (systemAdmin.offsetComparator(existingOffset, offset) >= 0) {
- topicPartitionsAndOffsets.replace(topicAndPartition, offset)
- }
-
- metrics.registerTopicAndPartition(KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition))
- }
-
- def stop() {
- systemAdmin.stop()
- brokerProxies.values.foreach(_.stop)
- }
-
- protected def createBrokerProxy(host: String, port: Int): BrokerProxy = {
- info("Creating new broker proxy for host: %s and port: %s" format(host, port))
- new BrokerProxy(host, port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter)
- }
-
- protected def getPartitionMetadata(topicMetadata: TopicMetadata, partition: Int): Option[PartitionMetadata] = {
- topicMetadata.partitionsMetadata.find(_.partitionId == partition)
- }
-
- protected def getLeaderHostPort(partitionMetadata: Option[PartitionMetadata]): Option[(String, Int)] = {
- // Whatever we do, we can't say Broker, even though we're
- // manipulating it here. Broker is a private type and Scala doesn't seem
- // to care about that as long as you don't explicitly declare its type.
- val brokerOption = partitionMetadata.flatMap(_.leader)
-
- brokerOption match {
- case Some(broker) => Some(broker.host, broker.port)
- case _ => None
- }
- }
-
- def refreshBrokers {
- var tpToRefresh = topicPartitionsAndOffsets.keySet.toList
- info("Refreshing brokers for: %s" format topicPartitionsAndOffsets)
- retryBackoff.run(
- loop => {
- val topics = tpToRefresh.map(_.topic).toSet
- val topicMetadata = TopicMetadataCache.getTopicMetadata(topics, systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
-
- // addTopicPartition one at a time, leaving the to-be-done list intact in case of exceptions.
- // This avoids trying to re-add the same topic partition repeatedly
- def refresh() = {
- val head = tpToRefresh.head
- // refreshBrokers can be called from abdicate and refreshDropped,
- // both of which are triggered from BrokerProxy threads. To prevent
- // accidentally creating multiple objects for the same broker, or
- // accidentally not updating the topicPartitionsAndOffsets variable,
- // we need to lock.
- this.synchronized {
- // Check if we still need this TopicAndPartition inside the
- // critical section. If we don't, then notAValidEvent it.
- topicPartitionsAndOffsets.get(head) match {
- case Some(nextOffset) =>
- val partitionMetadata = getPartitionMetadata(topicMetadata(head.topic), head.partition)
- getLeaderHostPort(partitionMetadata) match {
- case Some((host, port)) =>
- debug("Got partition metadata for %s: %s" format(head, partitionMetadata.get))
- val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port))
- brokerProxy.addTopicPartition(head, Option(nextOffset))
- brokerProxy.start
- debug("Claimed topic-partition (%s) for (%s)".format(head, brokerProxy))
- topicPartitionsAndOffsets -= head
- case None => info("No metadata available for: %s. Will try to refresh and add to a consumer thread later." format head)
- }
- case _ => debug("Ignoring refresh for %s because we already added it from another thread." format head)
- }
- }
- tpToRefresh.tail
- }
-
- while (!tpToRefresh.isEmpty) {
- tpToRefresh = refresh()
- }
-
- loop.done
- },
-
- (exception, loop) => {
- warn("While refreshing brokers for %s: %s. Retrying." format (tpToRefresh.head, exception))
- debug("Exception detail:", exception)
- })
- }
-
- val sink = new MessageSink {
- var lastDroppedRefresh = clock()
-
- def refreshDropped() {
- if (topicPartitionsAndOffsets.size > 0 && clock() - lastDroppedRefresh > 10000) {
- refreshBrokers
- lastDroppedRefresh = clock()
- }
- }
-
- def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
- setIsAtHead(toSystemStreamPartition(tp), isAtHighWatermark)
- }
-
- def needsMoreMessages(tp: TopicAndPartition) = {
- if(fetchLimitByBytesEnabled) {
- getMessagesSizeInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThresholdBytes
- } else {
- getNumMessagesInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThreshold
- }
- }
-
- def getMessageSize(message: Message): Integer = {
- message.size + KafkaSystemConsumer.MESSAGE_SIZE_OVERHEAD
- }
-
- def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = {
- trace("Incoming message %s: %s." format (tp, msg))
-
- val systemStreamPartition = toSystemStreamPartition(tp)
- val isAtHead = highWatermark == msg.offset
- val offset = msg.offset.toString
- val key = if (msg.message.key != null) {
- keyDeserializer.fromBytes(Utils.readBytes(msg.message.key))
- } else {
- null
- }
- val message = if (!msg.message.isNull) {
- deserializer.fromBytes(Utils.readBytes(msg.message.payload))
- } else {
- null
- }
-
- if(fetchLimitByBytesEnabled ) {
- val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, getMessageSize(msg.message))
- ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L)
- put(systemStreamPartition, ime)
- } else {
- val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message)
- ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L)
- put(systemStreamPartition, ime)
- }
-
- setIsAtHead(systemStreamPartition, isAtHead)
- }
-
- def abdicate(tp: TopicAndPartition, nextOffset: Long) {
- info("Abdicating for %s" format (tp))
- topicPartitionsAndOffsets += tp -> nextOffset.toString
- refreshBrokers
- }
-
- private def toSystemStreamPartition(tp: TopicAndPartition) = {
- new SystemStreamPartition(systemName, tp.topic, new Partition(tp.partition))
- }
- }
-}