You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/08/12 18:21:36 UTC
[06/15] initial import.
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/lib/kafka_2.8.1-0.8.1-SNAPSHOT.jar
----------------------------------------------------------------------
diff --git a/samza-kafka/lib/kafka_2.8.1-0.8.1-SNAPSHOT.jar b/samza-kafka/lib/kafka_2.8.1-0.8.1-SNAPSHOT.jar
new file mode 100644
index 0000000..e2c2b2d
Binary files /dev/null and b/samza-kafka/lib/kafka_2.8.1-0.8.1-SNAPSHOT.jar differ
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/lib/kafka_2.9.2-0.8.1-SNAPSHOT-test.jar
----------------------------------------------------------------------
diff --git a/samza-kafka/lib/kafka_2.9.2-0.8.1-SNAPSHOT-test.jar b/samza-kafka/lib/kafka_2.9.2-0.8.1-SNAPSHOT-test.jar
new file mode 100644
index 0000000..b91fc78
Binary files /dev/null and b/samza-kafka/lib/kafka_2.9.2-0.8.1-SNAPSHOT-test.jar differ
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/lib/kafka_2.9.2-0.8.1-SNAPSHOT.jar
----------------------------------------------------------------------
diff --git a/samza-kafka/lib/kafka_2.9.2-0.8.1-SNAPSHOT.jar b/samza-kafka/lib/kafka_2.9.2-0.8.1-SNAPSHOT.jar
new file mode 100644
index 0000000..d7c9bd6
Binary files /dev/null and b/samza-kafka/lib/kafka_2.9.2-0.8.1-SNAPSHOT.jar differ
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
new file mode 100644
index 0000000..a9ddc5c
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka
+
+import org.I0Itec.zkclient.ZkClient
+
+import grizzled.slf4j.Logging
+import kafka.admin.AdminUtils
+import kafka.api.FetchRequestBuilder
+import kafka.api.OffsetRequest
+import kafka.api.PartitionOffsetRequestInfo
+import kafka.common.ErrorMapping
+import kafka.common.TopicAndPartition
+import kafka.common.TopicExistsException
+import kafka.consumer.SimpleConsumer
+import kafka.producer.KeyedMessage
+import kafka.producer.Partitioner
+import kafka.producer.Producer
+import kafka.serializer.Decoder
+import kafka.serializer.Encoder
+import kafka.utils.Utils
+import kafka.utils.VerifiableProperties
+import org.apache.samza.Partition
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.serializers.CheckpointSerde
+import org.apache.samza.serializers.Serde
+import org.apache.samza.system.kafka.TopicMetadataCache
+import org.apache.samza.util.TopicMetadataStore
+
+/**
+ * Kafka checkpoint manager is used to store checkpoints in a Kafka topic that
+ * is uniquely identified by a job/partition combination. To read a checkpoint
+ * for a given job and partition combination (e.g. my-job, partition 1), we
+ * simply read the last message from the topic: __samza_checkpoint_my-job_1. If
+ * the topic does not yet exist, we assume that there is not yet any state for
+ * this job/partition pair, and return an empty checkpoint.
+ */
+class KafkaCheckpointManager(
+ clientId: String,
+ stateTopic: String,
+ systemName: String,
+ totalPartitions: Int,
+ replicationFactor: Int,
+ socketTimeout: Int,
+ bufferSize: Int,
+ fetchSize: Int,
+ metadataStore: TopicMetadataStore,
+ connectProducer: () => Producer[Partition, Array[Byte]],
+ connectZk: () => ZkClient,
+ failureRetryMs: Long = 10000,
+ serde: Serde[Checkpoint] = new CheckpointSerde) extends CheckpointManager with Logging {
+
+ var partitions = Set[Partition]()
+ var producer: Producer[Partition, Array[Byte]] = null
+
+ info("Creating KafkaCheckpointManager with: clientId=%s, stateTopic=%s, systemName=%s" format (clientId, stateTopic, systemName))
+
+ def writeCheckpoint(partition: Partition, checkpoint: Checkpoint) {
+ var done = false
+
+ while (!done) {
+ try {
+ if (producer == null) {
+ producer = connectProducer()
+ }
+
+ producer.send(new KeyedMessage(stateTopic, null, partition, serde.toBytes(checkpoint)))
+ done = true
+ } catch {
+ case e: Throwable =>
+ warn("Failed to send checkpoint %s for partition %s. Retrying." format (checkpoint, partition), e)
+
+ if (producer != null) {
+ producer.close
+ }
+
+ producer = null
+
+ Thread.sleep(failureRetryMs)
+ }
+ }
+ }
+
+ def readLastCheckpoint(partition: Partition): Checkpoint = {
+ var checkpoint: Option[Checkpoint] = None
+ var consumer: SimpleConsumer = null
+
+ info("Reading checkpoint for partition %s." format partition.getPartitionId)
+
+ while (!checkpoint.isDefined) {
+ try {
+ // Assume state topic exists with correct partitions, since it should be verified on start.
+ // Fetch the metadata for this state topic/partition pair.
+ val metadataMap = TopicMetadataCache.getTopicMetadata(Set(stateTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
+ val metadata = metadataMap(stateTopic)
+ val partitionMetadata = metadata.partitionsMetadata
+ .filter(_.partitionId == partition.getPartitionId)
+ .headOption
+ .getOrElse(throw new KafkaCheckpointException("Tried to find partition information for partition %d, but it didn't exist in Kafka." format partition.getPartitionId))
+ val partitionId = partitionMetadata.partitionId
+ val leader = partitionMetadata
+ .leader
+ .getOrElse(throw new SamzaException("No leader available for topic %s" format stateTopic))
+
+ info("Connecting to leader %s:%d for topic %s and partition %s to fetch last checkpoint message." format (leader.host, leader.port, stateTopic, partitionId))
+
+ consumer = new SimpleConsumer(
+ leader.host,
+ leader.port,
+ socketTimeout,
+ bufferSize,
+ clientId)
+ val topicAndPartition = new TopicAndPartition(stateTopic, partitionId)
+ val offset = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))))
+ .partitionErrorAndOffsets
+ .get(topicAndPartition)
+ .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:%d" format (stateTopic, partitionId)))
+ .offsets
+ .headOption
+ .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:%d" format (stateTopic, partitionId)))
+
+ info("Got offset %s for topic %s and partition %s. Attempting to fetch message." format (offset, stateTopic, partitionId))
+
+ if (offset <= 0) {
+ info("Got offset 0 (no messages in state topic) for topic %s and partition %s, so returning null. If you expected the state topic to have messages, you're probably going to lose data." format (stateTopic, partition))
+ return null
+ }
+
+ val request = new FetchRequestBuilder()
+ // Kafka returns 1 greater than the offset of the last message in
+ //the topic, so subtract one to fetch the last message.
+ .addFetch(stateTopic, partitionId, offset - 1, fetchSize)
+ .maxWait(500)
+ .minBytes(1)
+ .clientId(clientId)
+ .build
+ val messageSet = consumer.fetch(request)
+ if (messageSet.hasError) {
+ warn("Got error code from broker for %s: %s" format (stateTopic, messageSet.errorCode(stateTopic, partitionId)))
+ val errorCode = messageSet.errorCode(stateTopic, partitionId)
+ if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
+ warn("Got an offset out of range exception while getting last checkpoint for topic %s and partition %s, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (stateTopic, partitionId))
+ return null
+ }
+ ErrorMapping.maybeThrowException(errorCode)
+ }
+ val messages = messageSet.messageSet(stateTopic, partitionId).toList
+
+ if (messages.length != 1) {
+ throw new KafkaCheckpointException("Something really unexpected happened. Got %s "
+ + "messages back when fetching from state checkpoint topic %s and partition %s. "
+ + "Expected one message. It would be unsafe to go on without the latest checkpoint, "
+ + "so failing." format (messages.length, stateTopic, partition))
+ }
+
+ // Some back bending to go from message to checkpoint.
+ checkpoint = Some(serde.fromBytes(Utils.readBytes(messages(0).message.payload)))
+
+ consumer.close
+ } catch {
+ case e: KafkaCheckpointException =>
+ throw e
+ case e: Throwable =>
+ warn("Got exception while trying to read last checkpoint for topic %s and partition %s. Retrying." format (stateTopic, partition), e)
+
+ if (consumer != null) {
+ consumer.close
+ }
+
+ Thread.sleep(failureRetryMs)
+ }
+ }
+
+ info("Got checkpoint state for partition %s: %s" format (partition.getPartitionId, checkpoint))
+
+ checkpoint.get
+ }
+
+ def start {
+ if (partitions.contains(new Partition(0))) {
+ createTopic
+ }
+
+ validateTopic
+ }
+
+ def register(partition: Partition) {
+ partitions += partition
+ }
+
+ def stop = producer.close
+
+ private def createTopic {
+ var done = false
+ var zkClient: ZkClient = null
+
+ info("Attempting to create state topic %s with %s partitions." format (stateTopic, totalPartitions))
+
+ while (!done) {
+ try {
+ zkClient = connectZk()
+
+ AdminUtils.createTopic(
+ zkClient,
+ stateTopic,
+ totalPartitions,
+ replicationFactor)
+
+ info("Created state topic %s." format stateTopic)
+
+ done = true
+ } catch {
+ case e: TopicExistsException =>
+ info("State topic %s already exists." format stateTopic)
+
+ done = true
+ case e: Throwable =>
+ warn("Failed to create topic %s. Retrying." format stateTopic, e)
+
+ if (zkClient != null) {
+ zkClient.close
+ }
+
+ Thread.sleep(failureRetryMs)
+ }
+ }
+
+ zkClient.close
+ }
+
+ private def validateTopic {
+ var done = false
+
+ info("Validating state topic %s." format stateTopic)
+
+ while (!done) {
+ try {
+ val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(stateTopic), systemName, metadataStore.getTopicInfo)
+ val topicMetadata = topicMetadataMap(stateTopic)
+ val errorCode = topicMetadata.errorCode
+
+ if (errorCode != ErrorMapping.NoError) {
+ throw new SamzaException("State topic validation failed for topic %s because we got error code %s from Kafka." format (stateTopic, errorCode))
+ }
+
+ val partitionCount = topicMetadata.partitionsMetadata.length
+
+ if (partitionCount != totalPartitions) {
+ throw new KafkaCheckpointException("State topic validation failed for topic %s because partition count %s did not match expected partition count %s." format (stateTopic, topicMetadata.partitionsMetadata.length, totalPartitions))
+ }
+
+ info("Successfully validated state topic %s." format stateTopic)
+
+ done = true
+ } catch {
+ case e: KafkaCheckpointException =>
+ throw e
+ case e: Throwable =>
+ warn("Got exception while trying to read validate topic %s. Retrying." format stateTopic, e)
+
+ Thread.sleep(failureRetryMs)
+ }
+ }
+ }
+}
+
+/**
+ * KafkaCheckpointManager handles retries, so we need two kinds of exceptions:
+ * one to signal a hard failure, and the other to retry. The
+ * KafkaCheckpointException is thrown to indicate a hard failure that the Kafka
+ * CheckpointManager can't recover from.
+ */
+class KafkaCheckpointException(s: String, t: Throwable) extends SamzaException(s, t) {
+ def this(s: String) = this(s, null)
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
new file mode 100644
index 0000000..bc94f6a
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka
+
+import org.apache.samza.config.{ KafkaConfig, Config }
+import org.apache.samza.SamzaException
+import java.util.Properties
+import kafka.producer.Producer
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.config.KafkaConfig.Config2Kafka
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.Partition
+import grizzled.slf4j.Logging
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.util.{ KafkaUtil, ClientUtilTopicMetadataStore }
+import org.apache.samza.util.Util
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.ZKStringSerializer
+import org.apache.samza.checkpoint.CheckpointManagerFactory
+import org.apache.samza.checkpoint.CheckpointManager
+
+class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging {
+ def getCheckpointManager(config: Config, registry: MetricsRegistry): CheckpointManager = {
+ val clientId = KafkaUtil.getClientId("samza-checkpoint-manager", config)
+ val systemName = config
+ .getCheckpointSystem
+ .getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager."))
+ val injectedProducerProps = Map(
+ "request.required.acks" -> "-1",
+ "producer.type" -> "sync",
+ // Subtract one here, because DefaultEventHandler calls messageSendMaxRetries + 1.
+ "message.send.max.retries" -> (Integer.MAX_VALUE - 1).toString)
+ val producerConfig = config.getKafkaSystemProducerConfig(
+ systemName,
+ clientId,
+ injectedProducerProps)
+ val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+ val replicationFactor = config.getCheckpointReplicationFactor.getOrElse("3").toInt
+ val socketTimeout = consumerConfig.socketTimeoutMs
+ val bufferSize = consumerConfig.socketReceiveBufferBytes
+ val fetchSize = consumerConfig.fetchMessageMaxBytes // must be > buffer size
+
+ val connectProducer = () => {
+ new Producer[Partition, Array[Byte]](producerConfig)
+ }
+ val zkConnect = Option(consumerConfig.zkConnect)
+ .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
+ val connectZk = () => {
+ new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+ }
+ val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs"))
+ val jobId = config.getJobId.getOrElse("1")
+ val brokersListString = Option(producerConfig.brokerList)
+ .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName))
+ val metadataStore = new ClientUtilTopicMetadataStore(brokersListString, clientId)
+ val stateTopic = getTopic(jobName, jobId)
+ val totalPartitions = Util.getMaxInputStreamPartitions(config).size
+
+ new KafkaCheckpointManager(
+ clientId,
+ stateTopic,
+ systemName,
+ totalPartitions,
+ replicationFactor,
+ socketTimeout,
+ bufferSize,
+ fetchSize,
+ metadataStore,
+ connectProducer,
+ connectZk)
+ }
+
+ private def getTopic(jobName: String, jobId: String) =
+ "__samza_checkpoint_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
new file mode 100644
index 0000000..59d915d
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import scala.collection.JavaConversions._
+import kafka.consumer.ConsumerConfig
+import java.util.Properties
+import kafka.producer.ProducerConfig
+import java.util.UUID
+
+object KafkaConfig {
+ val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex"
+ val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
+ val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
+
+ val CHECKPOINT_SYSTEM = "task.checkpoint.system"
+ val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
+
+ val CONSUMER_KEY_DESERIALIZER = SystemConfig.SYSTEM_PREFIX + "consumer.key.deserializer.class"
+ val CONSUMER_MSG_DESERIALIZER = SystemConfig.SYSTEM_PREFIX + "consumer.deserializer.class"
+
+ implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
+}
+
+class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
+ // checkpoints
+ def getCheckpointSystem = getOption(KafkaConfig.CHECKPOINT_SYSTEM)
+ def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR)
+
+ // custom consumer config
+ def getConsumerKeyDeserializerClass(name: String) = getOption(KafkaConfig.CONSUMER_KEY_DESERIALIZER format name)
+ def getConsumerMsgDeserializerClass(name: String) = getOption(KafkaConfig.CONSUMER_MSG_DESERIALIZER format name)
+
+ /**
+ * Returns a map of topic -> auto.offset.reset value for all streams that
+ * are defined with this property in the config.
+ */
+ def getAutoOffsetResetTopics(systemName: String) = {
+ val subConf = config.subset("systems.%s.streams." format systemName, true)
+ // find all .samza.partition.manager keys, and strip the suffix
+ subConf
+ .filterKeys(k => k.endsWith(".consumer.auto.offset.reset"))
+ .map {
+ case (topicAutoOffsetReset, resetValue) =>
+ (topicAutoOffsetReset.replace(".consumer.auto.offset.reset", ""), resetValue)
+ }.toMap
+ }
+
+ // regex resolver
+ def getRegexResolvedStreams(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName)
+ def getRegexResolvedSystem(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
+ def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
+
+ // kafka config
+ def getKafkaSystemConsumerConfig(
+ systemName: String,
+ clientId: String = "undefined-samza-consumer-" format UUID.randomUUID.toString,
+ groupId: String = "undefined-samza-consumer-group-" format UUID.randomUUID.toString,
+ injectedProps: Map[String, String] = Map()) = {
+
+ val subConf = config.subset("systems.%s.consumer." format systemName, true)
+ val consumerProps = new Properties()
+ consumerProps.putAll(subConf)
+ consumerProps.put("group.id", groupId)
+ consumerProps.put("client.id", clientId)
+ consumerProps.putAll(injectedProps)
+ new ConsumerConfig(consumerProps)
+ }
+
+ def getKafkaSystemProducerConfig(
+ systemName: String,
+ clientId: String = "undefined-samza-producer-" format UUID.randomUUID.toString,
+ injectedProps: Map[String, String] = Map()) = {
+
+ val subConf = config.subset("systems.%s.producer." format systemName, true)
+ val producerProps = new Properties()
+ producerProps.putAll(subConf)
+ producerProps.put("client.id", clientId)
+ producerProps.putAll(injectedProps)
+ new ProducerConfig(producerProps)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/config/KafkaSerdeConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaSerdeConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaSerdeConfig.scala
new file mode 100644
index 0000000..11078e3
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaSerdeConfig.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+object KafkaSerdeConfig {
+ // kafka serde config constants
+ val ENCODER = SerializerConfig.SERIALIZER_PREFIX + ".encoder"
+ val DECODER = SerializerConfig.SERIALIZER_PREFIX + ".decoder"
+
+ implicit def Config2KafkaSerde(config: Config) = new KafkaSerdeConfig(config)
+}
+
+class KafkaSerdeConfig(config: Config) extends ScalaMapConfig(config) {
+ def getKafkaEncoder(serializer: String) =
+ getOption(KafkaSerdeConfig.ENCODER format serializer)
+
+ def getKafkaDecoder(serializer: String) =
+ getOption(KafkaSerdeConfig.DECODER format serializer)
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
new file mode 100644
index 0000000..c7c50c4
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{ ZkUtils, ZKStringSerializer }
+import org.apache.samza.config.KafkaConfig.{ Config2Kafka, REGEX_RESOLVED_STREAMS }
+import org.apache.samza.SamzaException
+import collection.JavaConversions._
+import grizzled.slf4j.Logging
+import scala.collection._
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.system.SystemStream
+import org.apache.samza.util.Util
+
+/**
+ * Dynamically determine the Kafka topics to use as input streams to the task via a regular expression.
+ * For each topic that matches the regular expression, generate a series of config values for it and
+ * add it to the task's input streams setting.
+ *
+ * job.config.rewriter.regex-input-rewriter.regex=.*stream
+ * job.config.rewriter.regex-input-rewriter.system=kafka
+ * job.config.rewriter.regex-input-rewriter.config.foo=bar
+ *
+ * Would result in:
+ *
+ * task.inputs=kafka.somestream
+ * systems.kafka.streams.somestream.foo=bar
+ *
+ * @see samza.config.KafkaConfig.getRegexResolvedStreams
+ *
+ */
+class RegexTopicGenerator extends ConfigRewriter with Logging {
+
+ def rewrite(rewriterName: String, config: Config): Config = {
+ val regex = config
+ .getRegexResolvedStreams(rewriterName)
+ .getOrElse(throw new SamzaException("No %s defined in config" format REGEX_RESOLVED_STREAMS))
+ val systemName = config
+ .getRegexResolvedSystem(rewriterName)
+ .getOrElse(throw new SamzaException("No system defined for %s." format rewriterName))
+ val topics = getTopicsFromZK(rewriterName, config)
+ val existingInputStreams = config.getInputStreams
+ val newInputStreams = new mutable.HashSet[SystemStream]
+ val keysAndValsToAdd = new mutable.HashMap[String, String]
+
+ // Find all the topics that match this regex
+ val matchingStreams = topics
+ .filter(_.matches(regex))
+ .map(new SystemStream(systemName, _))
+ .toSet
+
+ for (m <- matchingStreams) {
+ info("Generating new configs for matching stream %s." format m)
+
+ if (existingInputStreams.contains(m)) {
+ throw new SamzaException("Regex '%s' matches existing, statically defined input %s." format (regex, m))
+ }
+
+ newInputStreams.add(m)
+
+ // For each topic that matched, generate all the specified configs
+ config
+ .getRegexResolvedInheritedConfig(rewriterName)
+ .foreach(kv => keysAndValsToAdd.put("systems." + m.getSystem + ".streams." + m.getStream + "." + kv._1, kv._2))
+ }
+ // Build new inputs
+
+ info("Generated config values for %d new topics" format newInputStreams.size)
+
+ val inputStreams = TaskConfig.INPUT_STREAMS -> (existingInputStreams ++ newInputStreams)
+ .map(Util.getNameFromSystemStream(_))
+ .mkString(",")
+
+ new MapConfig((keysAndValsToAdd ++ config) += inputStreams)
+ }
+
+ def getTopicsFromZK(rewriterName: String, config: Config): Seq[String] = {
+ val systemName = config
+ .getRegexResolvedSystem(rewriterName)
+ .getOrElse(throw new SamzaException("No system defined in config for rewriter %s." format rewriterName))
+ val consumerConfig = config.getKafkaSystemConsumerConfig(systemName)
+ val zkConnect = Option(consumerConfig.zkConnect)
+ .getOrElse(throw new SamzaException("No zookeeper.connect for system %s defined in config." format systemName))
+ val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+
+ try {
+ ZkUtils.getAllTopics(zkClient)
+ } finally {
+ zkClient.close()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/serializers/KafkaSerde.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/serializers/KafkaSerde.scala b/samza-kafka/src/main/scala/org/apache/samza/serializers/KafkaSerde.scala
new file mode 100644
index 0000000..82ba2a0
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/serializers/KafkaSerde.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+import java.nio.ByteBuffer
+import org.apache.samza.util.Util
+import kafka.serializer.Encoder
+import kafka.serializer.Decoder
+import org.apache.samza.config.Config
+import org.apache.samza.config.KafkaSerdeConfig.Config2KafkaSerde
+import org.apache.samza.SamzaException
+
+class KafkaSerde[T](encoder: Encoder[T], decoder: Decoder[T]) extends Serde[T] {
+ def toBytes(obj: T): Array[Byte] = encoder.toBytes(obj)
+ def fromBytes(bytes: Array[Byte]): T = decoder.fromBytes(bytes)
+}
+
+class KafkaSerdeFactory[T] extends SerdeFactory[T] {
+ def getSerde(name: String, config: Config): Serde[T] = {
+ val encoderClassName = config
+ .getKafkaEncoder(name)
+ .getOrElse(throw new SamzaException("No kafka encoder defined for %s" format name))
+ val decoderClassName = config
+ .getKafkaDecoder(name)
+ .getOrElse(throw new SamzaException("No kafka decoder defined for %s" format name))
+ val encoder = Util.getObj[Encoder[T]](encoderClassName)
+ val decoder = Util.getObj[Decoder[T]](decoderClassName)
+ new KafkaSerde(encoder, decoder)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
new file mode 100644
index 0000000..cb5015d
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -0,0 +1,215 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka
+
+import kafka.consumer.SimpleConsumer
+import kafka.api._
+import kafka.common.ErrorMapping
+import java.util.concurrent.{ CountDownLatch, ConcurrentHashMap }
+import scala.collection.JavaConversions._
+import org.apache.samza.config.Config
+import org.apache.samza.util.KafkaUtil
+import org.apache.samza.config.KafkaConfig.Config2Kafka
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.metrics.MetricsRegistry
+import kafka.common.TopicAndPartition
+import kafka.message.MessageSet
+import grizzled.slf4j.Logging
+import java.nio.channels.ClosedByInterruptException
+
+/**
+ * A BrokerProxy consolidates Kafka fetches meant for a particular broker and retrieves them all at once, providing
+ * a way for consumers to retrieve those messages by topic and partition.
+ */
+abstract class BrokerProxy(
+ val host: String,
+ val port: Int,
+ val system: String,
+ val clientID: String,
+ val metricsRegistry: MetricsRegistry,
+ tpMetrics: TopicAndPartitionMetrics,
+ val timeout: Int = Int.MaxValue,
+ val bufferSize: Int = 1024000,
+ offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging with BrokerProxyMetrics {
+
+ val messageSink: MessageSink
+
+ /**
+ * How long should the fetcher thread sleep before checking if any TopicPartitions has been added to its purview
+ */
+ val sleepMSWhileNoTopicPartitions = 1000
+
+ /** What's the next offset for a particular partition? **/
+ val nextOffsets: ConcurrentHashMap[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]()
+
+ /** Block on the first call to get message if the fetcher has not yet returned its initial results **/
+ // TODO: It should be sufficient to just use the count down latch and await on it for each of the calls, but
+ // VisualVM was showing the consumer thread spending all its time in the await method rather than returning
+ // immediately, even though the process was proceeding normally. Hence the extra boolean. Should be investigated.
+ val firstCallBarrier = new CountDownLatch(1)
+ var firstCall = true
+
+ var simpleConsumer = createSimpleConsumer()
+
+ def createSimpleConsumer() = {
+ val hostString = "%s:%d" format (host, port)
+ info("Creating new SimpleConsumer for host %s for system %s" format (hostString, system))
+
+ val sc = new SimpleConsumer(host, port, timeout, bufferSize, clientID) with DefaultFetch {
+ val fetchSize: Int = 256 * 1024
+ }
+
+ sc
+ }
+
+ def addTopicPartition(tp: TopicAndPartition, lastCheckpointedOffset: String) = {
+ debug("Adding new topic and partition %s to queue for %s" format (tp, host))
+ if (nextOffsets.containsKey(tp)) toss("Already consuming TopicPartition %s" format tp)
+
+ val offset = offsetGetter.getNextOffset(simpleConsumer, tp, lastCheckpointedOffset)
+ nextOffsets += tp -> offset
+
+ tpGaugeInc
+ }
+
+ def removeTopicPartition(tp: TopicAndPartition) = {
+ if (nextOffsets.containsKey(tp)) {
+ nextOffsets.remove(tp)
+ tpGaugeDec
+ debug("Removed %s" format tp)
+ } else {
+ warn("Asked to remove topic and partition %s, but not in map (keys = %s)" format (tp, nextOffsets.keys().mkString(",")))
+ }
+ }
+
+ val thread: Thread = new Thread(new Runnable() {
+ def run() {
+ info("Starting thread for BrokerProxy")
+
+ while (!Thread.currentThread.isInterrupted) {
+ if (nextOffsets.size == 0) {
+ debug("No TopicPartitions to fetch. Sleeping.")
+ Thread.sleep(sleepMSWhileNoTopicPartitions)
+ } else {
+ try {
+ fetchMessages()
+ } catch {
+ // If we're interrupted, don't try and reconnect. We should shut down.
+ case e: InterruptedException =>
+ debug("Shutting down due to interrupt exception.")
+ Thread.currentThread.interrupt
+ case e: ClosedByInterruptException =>
+ debug("Shutting down due to closed by interrupt exception.")
+ Thread.currentThread.interrupt
+ case e: Throwable => {
+ warn("Recreating simple consumer and retrying connection")
+ debug("Stack trace for fetchMessages exception.", e)
+ simpleConsumer.close()
+ simpleConsumer = createSimpleConsumer()
+ reconnectCounter.inc
+ }
+ }
+ }
+ }
+
+ }
+ }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID))
+
+ private def fetchMessages(): Unit = {
+ val response: FetchResponse = simpleConsumer.defaultFetch(nextOffsets.toList: _*)
+ firstCall = false
+ firstCallBarrier.countDown()
+ if (response.hasError) {
+ // FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves
+ case class Error(tp: TopicAndPartition, code: Short, exception: Throwable)
+
+ val errors = for (
+ error <- response.data.entrySet.filter(_.getValue.error != ErrorMapping.NoError);
+ errorCode <- Option(response.errorCode(error.getKey.topic, error.getKey.partition)); // Scala's being cranky about referring to error.getKey values...
+ exception <- Option(ErrorMapping.exceptionFor(errorCode))
+ ) yield new Error(error.getKey, errorCode, exception)
+
+ val (notLeaders, otherErrors) = errors.partition(_.code == ErrorMapping.NotLeaderForPartitionCode)
+
+ if (!notLeaders.isEmpty) {
+ info("Abdicating. Got not leader exception for: " + notLeaders.mkString(","))
+
+ notLeaders.foreach(e => {
+ // Go back one message, since the fetch for nextOffset failed, and
+ // abdicate requires lastOffset, not nextOffset.
+ messageSink.abdicate(e.tp, nextOffsets.remove(e.tp) - 1)
+ })
+ }
+
+ if (!otherErrors.isEmpty) {
+ warn("Got error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format errors.mkString(","))
+ otherErrors.foreach(e => ErrorMapping.maybeThrowException(e.code)) // One will get thrown
+ }
+ }
+
+ def moveMessagesToTheirQueue(tp: TopicAndPartition, data: FetchResponsePartitionData) = {
+ val messageSet: MessageSet = data.messages
+ var nextOffset = nextOffsets(tp)
+
+ messageSink.setIsAtHighWatermark(tp, data.hw == 0 || data.hw == nextOffset)
+
+ for (message <- messageSet.iterator) {
+ messageSink.addMessage(tp, message, data.hw) // TODO: Verify this is correct
+
+ nextOffset = message.nextOffset
+
+ tpMetrics.getReadsCounter(tp).inc
+ tpMetrics.getBytesReadCounter(tp).inc(message.message.payloadSize + message.message.keySize)
+ tpMetrics.getOffsetCounter(tp).set(nextOffset)
+ }
+
+ nextOffsets.replace(tp, nextOffset) // use replace rather than put in case this tp was removed while we were fetching.
+
+ // Update high water mark
+ val hw = data.hw
+ if (hw >= 0) {
+ getLagGauge(tp).set(hw - nextOffset)
+ } else {
+ debug("Got a high water mark less than 0 (%d) for %s, so skipping." format (hw, tp))
+ }
+ }
+
+ response.data.foreach { case (tp, data) => moveMessagesToTheirQueue(tp, data) }
+
+ }
+
+ override def toString() = "BrokerProxy for %s:%d" format (host, port)
+
+ def start {
+ debug("Starting broker proxy for %s:%s." format (host, port))
+
+ thread.setDaemon(true)
+ thread.start
+ }
+
+ def stop {
+ debug("Shutting down broker proxy for %s:%s." format (host, port))
+
+ thread.interrupt
+ thread.join
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxyMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxyMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxyMetrics.scala
new file mode 100644
index 0000000..bdd91da
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxyMetrics.scala
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka
+
+import org.apache.samza.metrics.{Gauge, Counter, MetricsRegistry}
+import collection.mutable
+import kafka.common.TopicAndPartition
+
+private[kafka] trait BrokerProxyMetrics {
+ self:BrokerProxy =>
+ // TODO: Move topic-partition specific metrics out of brokerproxy, into system
+ val metricsRegistry:MetricsRegistry
+
+ def newCounter = metricsRegistry.newCounter(metricsGroup, _:String)
+
+ val hostPort = host + ":" + port
+ val metricsGroup = "samza.kafka.brokerproxy"
+
+ // Counters
+ val reconnectCounter = newCounter("%s-Reconnects" format hostPort)
+
+
+
+ // Gauges
+ val lagGauges = mutable.Map[TopicAndPartition, Gauge[Long]]()
+ def getLagGauge(tp:TopicAndPartition) = lagGauges.getOrElseUpdate(tp, metricsRegistry.newGauge[Long](metricsGroup, "%s-MessagesBehindHighWaterMark" format tp, 0l))
+
+ val tpGauge = metricsRegistry.newGauge[Long](metricsGroup, "%s-NumberOfTopicsPartitions" format hostPort, 0)
+
+ def tpGaugeInc = tpGauge.set(tpGauge.getValue + 1l)
+
+ def tpGaugeDec = tpGauge.set(tpGauge.getValue - 1l)
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetch.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetch.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetch.scala
new file mode 100644
index 0000000..41710f2
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetch.scala
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka
+
+import kafka.consumer.SimpleConsumer
+import kafka.api.FetchRequestBuilder
+import kafka.common.TopicAndPartition
+
+/**
+ * Extension to a SimpleConsumer that defines the default parameters necessary for default fetch requests. Builds
+ * such a fetch request, requests the fetch and returns the result
+ */
+trait DefaultFetch {
+ self:SimpleConsumer =>
+ val maxWait:Int = Int.MaxValue
+ val minBytes:Int = 1
+ val clientId:String
+ val fetchSize:Int
+
+ def defaultFetch(fetches:(TopicAndPartition, Long)*) = {
+ val fbr = new FetchRequestBuilder().maxWait(1000)
+ .minBytes(minBytes)
+ .clientId(clientId)
+
+ fetches.foreach(f => fbr.addFetch(f._1.topic, f._1.partition, f._2, fetchSize))
+
+ this.fetch(fbr.build())
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
new file mode 100644
index 0000000..326d6c9
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka
+
+import kafka.consumer.SimpleConsumer
+import kafka.common.{ OffsetOutOfRangeException, ErrorMapping }
+import kafka.api._
+import org.apache.samza.config.KafkaConfig
+import org.apache.samza.config.KafkaConfig.Config2Kafka
+import kafka.common.TopicAndPartition
+import kafka.api.PartitionOffsetRequestInfo
+import grizzled.slf4j.Logging
+
+class GetOffset(default: String, autoOffsetResetTopics: Map[String, String] = Map()) extends Logging with Toss {
+
+ private def getAutoOffset(topic: String): Long = {
+ info("Checking if auto.offset.reset is defined for topic %s" format (topic))
+ autoOffsetResetTopics.getOrElse(topic, default) match {
+ case OffsetRequest.LargestTimeString =>
+ info("Got reset of type %s." format OffsetRequest.LargestTimeString)
+ OffsetRequest.LatestTime
+ case OffsetRequest.SmallestTimeString =>
+ info("Got reset of type %s." format OffsetRequest.SmallestTimeString)
+ OffsetRequest.EarliestTime
+ case other => toss("Can't get offset value for topic %s due to invalid value: %s" format (topic, other))
+ }
+ }
+
+ /**
+ * An offset was provided but may not be valid. Verify its validity.
+ */
+ private def useLastCheckpointedOffset(sc: DefaultFetch, last: String, tp: TopicAndPartition): Option[Long] = {
+ try {
+ info("Validating offset %s for topic and partition %s" format (last, tp))
+
+ val messages = sc.defaultFetch((tp, last.toLong))
+
+ if (messages.hasError) {
+ ErrorMapping.maybeThrowException(messages.errorCode(tp.topic, tp.partition))
+ }
+
+ info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (last, tp))
+
+ val nextOffset = messages
+ .messageSet(tp.topic, tp.partition)
+ .head
+ .nextOffset
+
+ info("Got next offset %s for %s." format (nextOffset, tp))
+
+ Some(nextOffset)
+ } catch {
+ case e: OffsetOutOfRangeException =>
+ info("An out of range Kafka offset (%s) was supplied for topic and partition %s, so falling back to autooffset.reset." format (last, tp))
+ None
+ }
+ }
+
+ def getNextOffset(sc: SimpleConsumer with DefaultFetch, tp: TopicAndPartition, lastCheckpointedOffset: String): Long = {
+ val offsetRequest = new OffsetRequest(Map(tp -> new PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1)))
+ val offsetResponse = sc.getOffsetsBefore(offsetRequest)
+ val partitionOffsetResponse = offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to find offset information for %s" format tp))
+ val autoOffset = partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but no offsets defined for %s" format tp))
+
+ info("Got offset %d for topic and partition %s" format (autoOffset, tp))
+
+ val actualOffset = Option(lastCheckpointedOffset) match {
+ case Some(last) => useLastCheckpointedOffset(sc, last, tp).getOrElse(autoOffset)
+ case None => autoOffset
+ }
+
+ info("Final offset to be returned for Topic and Partition %s = %d" format (tp, actualOffset))
+ actualOffset
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
new file mode 100644
index 0000000..183c6cc
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka
+
+import org.apache.samza.Partition
+import java.util.UUID
+import org.apache.samza.util.ClientUtilTopicMetadataStore
+import kafka.api.TopicMetadata
+import scala.collection.JavaConversions._
+import org.apache.samza.system.SystemAdmin
+
+class KafkaSystemAdmin(
+ systemName: String,
+ // TODO whenever Kafka decides to make the Set[Broker] class public, let's switch to Set[Broker] here.
+ brokerListString: String,
+ clientId: String = UUID.randomUUID.toString) extends SystemAdmin {
+
+ def getPartitions(streamName: String): java.util.Set[Partition] = {
+ val getTopicMetadata = (topics: Set[String]) => {
+ new ClientUtilTopicMetadataStore(brokerListString, clientId)
+ .getTopicInfo(topics)
+ }
+
+ val metadata = TopicMetadataCache.getTopicMetadata(
+ Set(streamName),
+ systemName,
+ getTopicMetadata)
+
+ metadata(streamName)
+ .partitionsMetadata
+ .map(pm => new Partition(pm.partitionId))
+ .toSet[Partition]
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
new file mode 100644
index 0000000..bd7794a
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka
+
+import org.apache.samza.util.{ KafkaUtil, ClientUtilTopicMetadataStore }
+import kafka.common.TopicAndPartition
+import org.apache.samza.config.{ KafkaConfig, Config }
+import org.apache.samza.SamzaException
+import org.apache.samza.config.KafkaConfig.Config2Kafka
+import org.apache.samza.metrics.MetricsRegistry
+import grizzled.slf4j.Logging
+import scala.collection.JavaConversions._
+import kafka.message.MessageAndOffset
+import org.apache.samza.Partition
+import kafka.utils.Utils
+import org.apache.samza.util.Clock
+import java.util.UUID
+import kafka.serializer.DefaultDecoder
+import kafka.serializer.Decoder
+import org.apache.samza.util.BlockingEnvelopeMap
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.IncomingMessageEnvelope
+import java.nio.charset.Charset
+
+object KafkaSystemConsumer {
+ def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
+ val topic = systemStreamPartition.getStream
+ val partitionId = systemStreamPartition.getPartition.getPartitionId
+ TopicAndPartition(topic, partitionId)
+ }
+}
+
+/**
+ * Maintain a cache of BrokerProxies, returning the appropriate one for the
+ * requested topic and partition.
+ */
+private[kafka] class KafkaSystemConsumer(
+ systemName: String,
+ brokerListString: String,
+ metricsRegistry: MetricsRegistry,
+ clientId: String = "undefined-client-id-" + UUID.randomUUID.toString,
+ queueSize: Int = 1000,
+ timeout: Int = Int.MaxValue,
+ bufferSize: Int = 1024000,
+ brokerMetadataFailureRefreshMs: Long = 10000,
+ offsetGetter: GetOffset = new GetOffset("fail"),
+ deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
+ keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
+ clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(queueSize, metricsRegistry, new Clock {
+ def currentTimeMillis = clock()
+}) with Toss with Logging {
+
+ type HostPort = (String, Int)
+ val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
+ var lastReadOffsets = Map[SystemStreamPartition, String]()
+ val topicAndPartitionMetrics = new TopicAndPartitionMetrics(metricsRegistry)
+
+ def start() {
+ val topicPartitionsAndOffsets = lastReadOffsets.map {
+ case (systemStreamPartition, offset) =>
+ val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)
+ topicAndPartitionMetrics.addNewTopicAndPartition(topicAndPartition)
+ (topicAndPartition, offset)
+ }
+
+ refreshBrokers(topicPartitionsAndOffsets)
+
+ brokerProxies.values.foreach(_.start)
+ }
+
+ override def register(systemStreamPartition: SystemStreamPartition, lastReadOffset: String) {
+ super.register(systemStreamPartition, lastReadOffset)
+
+ lastReadOffsets += systemStreamPartition -> lastReadOffset
+ }
+
+ def stop() {
+ brokerProxies.values.foreach(_.stop)
+ }
+
+ def refreshBrokers(topicPartitionsAndOffsets: Map[TopicAndPartition, String]) {
+ var done = false
+
+ while (!done) {
+ try {
+ val getTopicMetadata = (topics: Set[String]) => {
+ new ClientUtilTopicMetadataStore(brokerListString, clientId).getTopicInfo(topics)
+ }
+
+ val partitionMetadata = TopicMetadataCache.getTopicMetadata(
+ topicPartitionsAndOffsets.keys.map(_.topic).toSet,
+ systemName,
+ getTopicMetadata)
+
+ topicPartitionsAndOffsets.map {
+ case (topicAndPartition, lastOffset) =>
+ // TODO whatever we do, we can't say Broker, even though we're
+ // manipulating it here. Broker is a private type and Scala doesn't seem
+ // to care about that as long as you don't explicitly declare its type.
+ val brokerOption = partitionMetadata(topicAndPartition.topic)
+ .partitionsMetadata
+ .find(_.partitionId == topicAndPartition.partition)
+ .getOrElse(toss("Can't find leader for %s" format topicAndPartition))
+ .leader
+
+ brokerOption match {
+ case Some(broker) =>
+ val brokerProxy = brokerProxies.getOrElseUpdate((broker.host, broker.port), new BrokerProxy(broker.host, broker.port, systemName, clientId, metricsRegistry, topicAndPartitionMetrics, timeout, bufferSize, offsetGetter) {
+ val messageSink: MessageSink = sink
+ })
+
+ brokerProxy.addTopicPartition(topicAndPartition, lastOffset)
+ case _ => warn("Broker for %s not defined! " format topicAndPartition)
+ }
+ }
+
+ done = true
+ } catch {
+ case e: Throwable =>
+ warn("An exception was thrown while refreshing brokers for %s. Waiting a bit and retrying, since we can't continue without broker metadata." format topicPartitionsAndOffsets.keySet)
+ debug(e)
+
+ try {
+ Thread.sleep(brokerMetadataFailureRefreshMs)
+ } catch {
+ case e: InterruptedException =>
+ info("Interrupted while waiting to retry metadata refresh, so shutting down.")
+
+ stop
+ }
+ }
+ }
+ }
+
+ val sink = new MessageSink {
+ def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
+ setIsAtHead(toSystemStreamPartition(tp), isAtHighWatermark)
+ }
+
+ def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = {
+ trace("Incoming message %s: %s." format (tp, msg))
+
+ val systemStreamPartition = toSystemStreamPartition(tp)
+ val isAtHead = highWatermark == msg.offset
+ val offset = msg.offset.toString
+ val key = if (msg.message.key != null) {
+ keyDeserializer.fromBytes(Utils.readBytes(msg.message.key))
+ } else {
+ null
+ }
+ val message = if (msg.message.buffer != null) {
+ deserializer.fromBytes(Utils.readBytes(msg.message.payload))
+ } else {
+ null
+ }
+
+ // TODO use kafka encoder/decoder here, if they were defined in config
+ add(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message))
+
+ setIsAtHead(systemStreamPartition, isAtHead)
+ }
+
+ def abdicate(tp: TopicAndPartition, lastOffset: Long) {
+ refreshBrokers(Map(tp -> lastOffset.toString))
+ }
+
+ private def toSystemStreamPartition(tp: TopicAndPartition) = {
+ new SystemStreamPartition(systemName, tp.topic, new Partition(tp.partition))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
new file mode 100644
index 0000000..13c5baa
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka
+
+import org.apache.samza.util.KafkaUtil
+import org.apache.samza.config.Config
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.config.KafkaConfig
+import org.apache.samza.config.KafkaConfig.Config2Kafka
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.util.ClientUtilTopicMetadataStore
+import org.apache.samza.SamzaException
+import scala.collection.JavaConversions._
+import java.util.Properties
+import kafka.producer.Producer
+import kafka.producer.async.DefaultEventHandler
+import kafka.utils.Utils
+import org.apache.samza.util.Util
+import kafka.serializer.Decoder
+import kafka.serializer.DefaultDecoder
+import org.apache.samza.system.SystemFactory
+
+class KafkaSystemFactory extends SystemFactory {
+ def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = {
+ val clientId = KafkaUtil.getClientId("samza-consumer", config)
+
+ // Kind of goofy to need a producer config for consumers, but we need metadata.
+ val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
+ val brokerListString = Option(producerConfig.brokerList)
+ .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName))
+ val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+
+ // TODO could add stream-level overrides for timeout and buffer size
+ val timeout = consumerConfig.socketTimeoutMs
+ val bufferSize = consumerConfig.socketReceiveBufferBytes
+ val autoOffsetResetDefault = consumerConfig.autoOffsetReset
+ val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
+ val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics)
+ val deserializer = config.getConsumerMsgDeserializerClass(systemName) match {
+ case Some(deserializerClass) => Util.getObj[Decoder[Object]](deserializerClass)
+ case _ => new DefaultDecoder().asInstanceOf[Decoder[Object]]
+ }
+ val keyDeserializer = config.getConsumerKeyDeserializerClass(systemName) match {
+ case Some(deserializerClass) => Util.getObj[Decoder[Object]](deserializerClass)
+ case _ => new DefaultDecoder().asInstanceOf[Decoder[Object]]
+ }
+
+ new KafkaSystemConsumer(
+ systemName = systemName,
+ brokerListString = brokerListString,
+ metricsRegistry = registry,
+ clientId = clientId,
+ // TODO make this configurable?
+ queueSize = 1000,
+ timeout = timeout,
+ bufferSize = bufferSize,
+ offsetGetter = offsetGetter)
+ }
+
+ def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = {
+ val clientId = KafkaUtil.getClientId("samza-producer", config)
+ val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
+ val batchSize = Option(producerConfig.batchNumMessages)
+ .getOrElse(1000)
+ val reconnectIntervalMs = Option(producerConfig.retryBackoffMs)
+ .getOrElse(10000)
+ val getProducer = () => { new Producer[Object, Object](producerConfig) }
+
+ // Unlike consumer, no need to use encoders here, since they come for free
+ // inside the producer configs. Kafka's producer will handle all of this
+ // for us.
+
+ new KafkaSystemProducer(
+ systemName,
+ batchSize,
+ reconnectIntervalMs,
+ registry,
+ getProducer)
+ }
+
+ def getAdmin(systemName: String, config: Config) = {
+ val clientId = KafkaUtil.getClientId("samza-admin", config)
+ val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
+ val brokerListString = Option(producerConfig.brokerList)
+ .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName))
+
+ new KafkaSystemAdmin(systemName, brokerListString, clientId)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
new file mode 100644
index 0000000..c4e4bec
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka
+
+import java.nio.ByteBuffer
+import java.util.Properties
+import scala.collection.mutable.ArrayBuffer
+import grizzled.slf4j.Logging
+import kafka.producer.KeyedMessage
+import kafka.producer.Producer
+import kafka.producer.ProducerConfig
+import org.apache.samza.config.Config
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.util.KafkaUtil
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.OutgoingMessageEnvelope
+
+object KafkaSystemProducerMetrics {
+ val metricsGroup = "samza.kafka.producer"
+}
+
+class KafkaSystemProducer(
+ systemName: String,
+ batchSize: Int,
+ reconnectIntervalMs: Long,
+ registry: MetricsRegistry,
+ getProducer: () => Producer[Object, Object]) extends SystemProducer with Logging {
+
+ val flushReconnectCounter = registry.newCounter(KafkaSystemProducerMetrics.metricsGroup, "Producer-%s-Reconnects" format systemName)
+ var sourceBuffers = Map[String, ArrayBuffer[KeyedMessage[Object, Object]]]()
+ var producer: Producer[Object, Object] = null
+
+ def start() {
+ }
+
+ def stop() {
+ if (producer != null) {
+ producer.close
+ }
+ }
+
+ def register(source: String) {
+ sourceBuffers += source -> ArrayBuffer()
+ }
+
+ def send(source: String, envelope: OutgoingMessageEnvelope) {
+ sourceBuffers(source) += new KeyedMessage[Object, Object](
+ envelope.getSystemStream.getStream,
+ envelope.getKey,
+ envelope.getPartitionKey,
+ envelope.getMessage)
+
+ if (sourceBuffers(source).size >= batchSize) {
+ commit(source)
+ }
+ }
+
+ def commit(source: String) {
+ val buffer = sourceBuffers(source)
+ var done = false
+
+ debug("Flushing buffer with size: %s." format buffer.size)
+
+ while (!done) {
+ try {
+ if (producer == null) {
+ info("Creating a new producer for system %s." format systemName)
+ producer = getProducer()
+ debug("Created a new producer for system %s." format systemName)
+ }
+
+ producer.send(buffer: _*)
+ done = true
+ } catch {
+ case e: Throwable =>
+ warn("Triggering a reconnect for %s because connection failed: %s" format (systemName, e.getMessage))
+ debug("Exception while producing to %s." format systemName, e)
+
+ flushReconnectCounter.inc
+
+ if (producer != null) {
+ producer.close
+ producer = null
+ }
+
+ try {
+ Thread.sleep(reconnectIntervalMs)
+ } catch {
+ case e: InterruptedException => None
+ }
+ }
+ }
+
+ buffer.clear
+ debug("Flushed buffer.")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
new file mode 100644
index 0000000..71fae59
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.kafka
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndOffset
+
+private[kafka] trait MessageSink {
+ def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit
+
+ def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long): Unit
+
+ def abdicate(tp: TopicAndPartition, lastOffset: Long): Unit
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicAndPartitionMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicAndPartitionMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicAndPartitionMetrics.scala
new file mode 100644
index 0000000..75fc022
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicAndPartitionMetrics.scala
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.system.kafka
+
+import org.apache.samza.metrics.{MetricsRegistry, Counter}
+import kafka.common.TopicAndPartition
+import java.util.concurrent.ConcurrentHashMap
+import grizzled.slf4j.Logging
+
+/**
+ * Wrapper around the metrics that BrokerProxies will be updating per topic partition. Multiple BrokerProxies will
+ * be updating the map at the same time, but no two BrokerProxies should be updating the same key at the same time.
+ *
+ * @param metricsRegistry Registry to hook counters into.
+ */
+private[kafka] class TopicAndPartitionMetrics(metricsRegistry:MetricsRegistry) extends Logging {
+ val metricsGroup = "KafkaSystem"
+
+ val counters = new ConcurrentHashMap[TopicAndPartition, (Counter,Counter,Counter)]()
+
+ def addNewTopicAndPartition(tp:TopicAndPartition) = {
+ if(containsTopicAndPartition(tp)) {
+ warn("TopicAndPartitionsMetrics already has an entry for topic-partition %s, not adding." format tp)
+ } else {
+ counters.put(tp, (newCounter("%s-OffsetChange" format tp), newCounter("%s-BytesRead" format tp), newCounter("%s-Reads" format tp)))
+ }
+ }
+
+ def containsTopicAndPartition(tp:TopicAndPartition) = counters.containsKey(tp)
+
+ def newCounter = metricsRegistry.newCounter(metricsGroup, _:String)
+
+ def getOffsetCounter(tp:TopicAndPartition) = counters.get(tp)._1
+
+ def getBytesReadCounter(tp:TopicAndPartition) = counters.get(tp)._2
+
+ def getReadsCounter(tp:TopicAndPartition) = counters.get(tp)._3
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
new file mode 100644
index 0000000..8a24ce3
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka
+
+import scala.annotation.implicitNotFound
+
+import grizzled.slf4j.Logging
+import kafka.api.TopicMetadata
+import kafka.common.ErrorMapping
+
+/**
+ * TopicMetadataCache is used to cache all the topic metadata for Kafka per
+ * (system, topic) partition. The cache access is thread safe. Each entry in
+ * the cache is refreshed after a specified interval. The cache uses the passed
+ * in getTopicInfoFromStore that retrieves the topic metadata from the store (usually zookeeper).
+ */
+object TopicMetadataCache extends Logging {
+ private case class MetadataInfo(var streamMetadata: TopicMetadata, var lastRefreshMs: Long)
+ private val topicMetadataMap: scala.collection.mutable.Map[(String, String), MetadataInfo] = new scala.collection.mutable.HashMap[(String, String), MetadataInfo]
+ private val lock = new Object
+
+ // used to fetch the topic metadata from the store. Accepts a topic and system
+ type FetchTopicMetadataType = (Set[String]) => Map[String, TopicMetadata]
+
+ def getTopicMetadata(topics: Set[String], systemName: String, getTopicInfoFromStore: FetchTopicMetadataType, cacheTimeout: Long = 5000L, getTime: () => Long = { System.currentTimeMillis }): Map[String, TopicMetadata] = {
+ lock synchronized {
+ val time = getTime()
+ val missingTopics = topics.filter(topic => !topicMetadataMap.contains(systemName, topic))
+ val topicsWithBadOrExpiredMetadata = (topics -- missingTopics).filter(topic => {
+ val metadata = topicMetadataMap(systemName, topic)
+ metadata.streamMetadata.errorCode != ErrorMapping.NoError || ((time - metadata.lastRefreshMs) > cacheTimeout)
+ })
+ val topicsToRefresh = missingTopics ++ topicsWithBadOrExpiredMetadata
+
+ if (topicsToRefresh.size > 0) {
+ // Refresh topic information for any missing, expired, or bad topic metadata.
+ topicMetadataMap ++= getTopicInfoFromStore(missingTopics ++ topicsWithBadOrExpiredMetadata)
+ .map { case (topic, metadata) => ((systemName, topic), MetadataInfo(metadata, getTime())) }
+ .toMap
+ }
+
+ // Use our new updated cache to return a map of topic -> metadata
+ topicMetadataMap
+ .filterKeys(topics.map(topic => (systemName, topic)))
+ .map {
+ case ((systemName, topic), metadata) =>
+ (topic, metadata.streamMetadata)
+ }.toMap
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/Toss.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/Toss.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/Toss.scala
new file mode 100644
index 0000000..5cda26e
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/Toss.scala
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka
+
+import org.apache.samza.SamzaException
+
+private[kafka] trait Toss {
+ def toss(s:String) = throw new SamzaException(s)
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala b/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
new file mode 100644
index 0000000..0bc1867
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util
+
+import kafka.api.{ TopicMetadataResponse, TopicMetadata }
+import org.apache.samza.SamzaException
+import kafka.client.ClientUtils
+import grizzled.slf4j.Logging
+import kafka.common.ErrorMapping
+import kafka.cluster.Broker
+import java.util.concurrent.atomic.AtomicInteger
+
+trait TopicMetadataStore extends Logging {
+ def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata]
+}
+
+class ClientUtilTopicMetadataStore(brokersListString: String, clientId: String, timeout: Int = 6000) extends TopicMetadataStore {
+ val brokers = ClientUtils.parseBrokerList(brokersListString)
+ var corrID = new AtomicInteger(0)
+
+ def getTopicInfo(topics: Set[String]) = {
+ val currCorrId = corrID.getAndIncrement
+ val response: TopicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, clientId, timeout, currCorrId)
+
+ if (response.correlationId != currCorrId) {
+ throw new SamzaException("CorrelationID did not match for request on topics %s (sent %d, got %d)" format (topics, currCorrId, response.correlationId))
+ }
+
+ response.topicsMetadata
+ .map(metadata => (metadata.topic, metadata))
+ .toMap
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
new file mode 100644
index 0000000..d660b91
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util
+
+import org.apache.samza.config.{KafkaConfig, Config, ConfigException}
+import org.apache.samza.config.JobConfig.Config2Job
+import java.util.concurrent.atomic.AtomicLong
+import kafka.client.ClientUtils
+import org.apache.samza.SamzaException
+
+object KafkaUtil {
+ val counter = new AtomicLong(0)
+
+ def getClientId(id: String, config: Config): String = getClientId(
+ id,
+ config.getName.getOrElse(throw new ConfigException("Missing job name.")),
+ config.getJobId.getOrElse("1"))
+
+ def getClientId(id: String, jobName: String, jobId: String): String =
+ "%s-%s-%s-%s-%s" format
+ (id.replaceAll("[^A-Za-z0-9]", "_"),
+ jobName.replaceAll("[^A-Za-z0-9]", "_"),
+ jobId.replaceAll("[^A-Za-z0-9]", "_"),
+ System.currentTimeMillis,
+ counter.getAndIncrement)
+
+}