You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/08/12 18:21:36 UTC

[06/15] initial import.

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/lib/kafka_2.8.1-0.8.1-SNAPSHOT.jar
----------------------------------------------------------------------
diff --git a/samza-kafka/lib/kafka_2.8.1-0.8.1-SNAPSHOT.jar b/samza-kafka/lib/kafka_2.8.1-0.8.1-SNAPSHOT.jar
new file mode 100644
index 0000000..e2c2b2d
Binary files /dev/null and b/samza-kafka/lib/kafka_2.8.1-0.8.1-SNAPSHOT.jar differ

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/lib/kafka_2.9.2-0.8.1-SNAPSHOT-test.jar
----------------------------------------------------------------------
diff --git a/samza-kafka/lib/kafka_2.9.2-0.8.1-SNAPSHOT-test.jar b/samza-kafka/lib/kafka_2.9.2-0.8.1-SNAPSHOT-test.jar
new file mode 100644
index 0000000..b91fc78
Binary files /dev/null and b/samza-kafka/lib/kafka_2.9.2-0.8.1-SNAPSHOT-test.jar differ

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/lib/kafka_2.9.2-0.8.1-SNAPSHOT.jar
----------------------------------------------------------------------
diff --git a/samza-kafka/lib/kafka_2.9.2-0.8.1-SNAPSHOT.jar b/samza-kafka/lib/kafka_2.9.2-0.8.1-SNAPSHOT.jar
new file mode 100644
index 0000000..d7c9bd6
Binary files /dev/null and b/samza-kafka/lib/kafka_2.9.2-0.8.1-SNAPSHOT.jar differ

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
new file mode 100644
index 0000000..a9ddc5c
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka
+
+import org.I0Itec.zkclient.ZkClient
+
+import grizzled.slf4j.Logging
+import kafka.admin.AdminUtils
+import kafka.api.FetchRequestBuilder
+import kafka.api.OffsetRequest
+import kafka.api.PartitionOffsetRequestInfo
+import kafka.common.ErrorMapping
+import kafka.common.TopicAndPartition
+import kafka.common.TopicExistsException
+import kafka.consumer.SimpleConsumer
+import kafka.producer.KeyedMessage
+import kafka.producer.Partitioner
+import kafka.producer.Producer
+import kafka.serializer.Decoder
+import kafka.serializer.Encoder
+import kafka.utils.Utils
+import kafka.utils.VerifiableProperties
+import org.apache.samza.Partition
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.serializers.CheckpointSerde
+import org.apache.samza.serializers.Serde
+import org.apache.samza.system.kafka.TopicMetadataCache
+import org.apache.samza.util.TopicMetadataStore
+
+/**
+ * Kafka checkpoint manager is used to store checkpoints in a Kafka topic that
+ * is uniquely identified by a job/partition combination. To read a checkpoint
+ * for a given job and partition combination (e.g. my-job, partition 1), we
+ * simply read the last message from the topic: __samza_checkpoint_my-job_1. If
+ * the topic does not yet exist, we assume that there is not yet any state for
+ * this job/partition pair, and return an empty checkpoint.
+ */
+class KafkaCheckpointManager(
+  clientId: String,
+  stateTopic: String,
+  systemName: String,
+  totalPartitions: Int,
+  replicationFactor: Int,
+  socketTimeout: Int,
+  bufferSize: Int,
+  fetchSize: Int,
+  metadataStore: TopicMetadataStore,
+  connectProducer: () => Producer[Partition, Array[Byte]],
+  connectZk: () => ZkClient,
+  failureRetryMs: Long = 10000,
+  serde: Serde[Checkpoint] = new CheckpointSerde) extends CheckpointManager with Logging {
+
+  var partitions = Set[Partition]()
+  var producer: Producer[Partition, Array[Byte]] = null
+
+  info("Creating KafkaCheckpointManager with: clientId=%s, stateTopic=%s, systemName=%s" format (clientId, stateTopic, systemName))
+
+  def writeCheckpoint(partition: Partition, checkpoint: Checkpoint) {
+    var done = false
+
+    while (!done) {
+      try {
+        if (producer == null) {
+          producer = connectProducer()
+        }
+
+        producer.send(new KeyedMessage(stateTopic, null, partition, serde.toBytes(checkpoint)))
+        done = true
+      } catch {
+        case e: Throwable =>
+          warn("Failed to send checkpoint %s for partition %s. Retrying." format (checkpoint, partition), e)
+
+          if (producer != null) {
+            producer.close
+          }
+
+          producer = null
+
+          Thread.sleep(failureRetryMs)
+      }
+    }
+  }
+
+  def readLastCheckpoint(partition: Partition): Checkpoint = {
+    var checkpoint: Option[Checkpoint] = None
+    var consumer: SimpleConsumer = null
+
+    info("Reading checkpoint for partition %s." format partition.getPartitionId)
+
+    while (!checkpoint.isDefined) {
+      try {
+        // Assume state topic exists with correct partitions, since it should be verified on start.
+        // Fetch the metadata for this state topic/partition pair.
+        val metadataMap = TopicMetadataCache.getTopicMetadata(Set(stateTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
+        val metadata = metadataMap(stateTopic)
+        val partitionMetadata = metadata.partitionsMetadata
+          .filter(_.partitionId == partition.getPartitionId)
+          .headOption
+          .getOrElse(throw new KafkaCheckpointException("Tried to find partition information for partition %d, but it didn't exist in Kafka." format partition.getPartitionId))
+        val partitionId = partitionMetadata.partitionId
+        val leader = partitionMetadata
+          .leader
+          .getOrElse(throw new SamzaException("No leader available for topic %s" format stateTopic))
+
+        info("Connecting to leader %s:%d for topic %s and partition %s to fetch last checkpoint message." format (leader.host, leader.port, stateTopic, partitionId))
+
+        consumer = new SimpleConsumer(
+          leader.host,
+          leader.port,
+          socketTimeout,
+          bufferSize,
+          clientId)
+        val topicAndPartition = new TopicAndPartition(stateTopic, partitionId)
+        val offset = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))))
+          .partitionErrorAndOffsets
+          .get(topicAndPartition)
+          .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:%d" format (stateTopic, partitionId)))
+          .offsets
+          .headOption
+          .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:%d" format (stateTopic, partitionId)))
+
+        info("Got offset %s for topic %s and partition %s. Attempting to fetch message." format (offset, stateTopic, partitionId))
+
+        if (offset <= 0) {
+          info("Got offset 0 (no messages in state topic) for topic %s and partition %s, so returning null. If you expected the state topic to have messages, you're probably going to lose data." format (stateTopic, partition))
+          return null
+        }
+
+        val request = new FetchRequestBuilder()
+          // Kafka returns 1 greater than the offset of the last message in 
+          //the topic, so subtract one to fetch the last message.
+          .addFetch(stateTopic, partitionId, offset - 1, fetchSize)
+          .maxWait(500)
+          .minBytes(1)
+          .clientId(clientId)
+          .build
+        val messageSet = consumer.fetch(request)
+        if (messageSet.hasError) {
+          warn("Got error code from broker for %s: %s" format (stateTopic, messageSet.errorCode(stateTopic, partitionId)))
+          val errorCode = messageSet.errorCode(stateTopic, partitionId)
+          if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
+            warn("Got an offset out of range exception while getting last checkpoint for topic %s and partition %s, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (stateTopic, partitionId))
+            return null
+          }
+          ErrorMapping.maybeThrowException(errorCode)
+        }
+        val messages = messageSet.messageSet(stateTopic, partitionId).toList
+
+        if (messages.length != 1) {
+          throw new KafkaCheckpointException("Something really unexpected happened. Got %s "
+            + "messages back when fetching from state checkpoint topic %s and partition %s. "
+            + "Expected one message. It would be unsafe to go on without the latest checkpoint, "
+            + "so failing." format (messages.length, stateTopic, partition))
+        }
+
+        // Some back bending to go from message to checkpoint.
+        checkpoint = Some(serde.fromBytes(Utils.readBytes(messages(0).message.payload)))
+
+        consumer.close
+      } catch {
+        case e: KafkaCheckpointException =>
+          throw e
+        case e: Throwable =>
+          warn("Got exception while trying to read last checkpoint for topic %s and partition %s. Retrying." format (stateTopic, partition), e)
+
+          if (consumer != null) {
+            consumer.close
+          }
+
+          Thread.sleep(failureRetryMs)
+      }
+    }
+
+    info("Got checkpoint state for partition %s: %s" format (partition.getPartitionId, checkpoint))
+
+    checkpoint.get
+  }
+
+  def start {
+    if (partitions.contains(new Partition(0))) {
+      createTopic
+    }
+
+    validateTopic
+  }
+
+  def register(partition: Partition) {
+    partitions += partition
+  }
+
+  def stop = producer.close
+
+  private def createTopic {
+    var done = false
+    var zkClient: ZkClient = null
+
+    info("Attempting to create state topic %s with %s partitions." format (stateTopic, totalPartitions))
+
+    while (!done) {
+      try {
+        zkClient = connectZk()
+
+        AdminUtils.createTopic(
+          zkClient,
+          stateTopic,
+          totalPartitions,
+          replicationFactor)
+
+        info("Created state topic %s." format stateTopic)
+
+        done = true
+      } catch {
+        case e: TopicExistsException =>
+          info("State topic %s already exists." format stateTopic)
+
+          done = true
+        case e: Throwable =>
+          warn("Failed to create topic %s. Retrying." format stateTopic, e)
+
+          if (zkClient != null) {
+            zkClient.close
+          }
+
+          Thread.sleep(failureRetryMs)
+      }
+    }
+
+    zkClient.close
+  }
+
+  private def validateTopic {
+    var done = false
+
+    info("Validating state topic %s." format stateTopic)
+
+    while (!done) {
+      try {
+        val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(stateTopic), systemName, metadataStore.getTopicInfo)
+        val topicMetadata = topicMetadataMap(stateTopic)
+        val errorCode = topicMetadata.errorCode
+
+        if (errorCode != ErrorMapping.NoError) {
+          throw new SamzaException("State topic validation failed for topic %s because we got error code %s from Kafka." format (stateTopic, errorCode))
+        }
+
+        val partitionCount = topicMetadata.partitionsMetadata.length
+
+        if (partitionCount != totalPartitions) {
+          throw new KafkaCheckpointException("State topic validation failed for topic %s because partition count %s did not match expected partition count %s." format (stateTopic, topicMetadata.partitionsMetadata.length, totalPartitions))
+        }
+
+        info("Successfully validated state topic %s." format stateTopic)
+
+        done = true
+      } catch {
+        case e: KafkaCheckpointException =>
+          throw e
+        case e: Throwable =>
+          warn("Got exception while trying to read validate topic %s. Retrying." format stateTopic, e)
+
+          Thread.sleep(failureRetryMs)
+      }
+    }
+  }
+}
+
+/**
+ * KafkaCheckpointManager handles retries, so we need two kinds of exceptions:
+ * one to signal a hard failure, and the other to retry. The
+ * KafkaCheckpointException is thrown to indicate a hard failure that the Kafka
+ * CheckpointManager can't recover from.
+ */
+class KafkaCheckpointException(s: String, t: Throwable) extends SamzaException(s, t) {
+  def this(s: String) = this(s, null)
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
new file mode 100644
index 0000000..bc94f6a
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka
+
+import org.apache.samza.config.{ KafkaConfig, Config }
+import org.apache.samza.SamzaException
+import java.util.Properties
+import kafka.producer.Producer
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.config.KafkaConfig.Config2Kafka
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.Partition
+import grizzled.slf4j.Logging
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.util.{ KafkaUtil, ClientUtilTopicMetadataStore }
+import org.apache.samza.util.Util
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.ZKStringSerializer
+import org.apache.samza.checkpoint.CheckpointManagerFactory
+import org.apache.samza.checkpoint.CheckpointManager
+
+class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging {
+  def getCheckpointManager(config: Config, registry: MetricsRegistry): CheckpointManager = {
+    val clientId = KafkaUtil.getClientId("samza-checkpoint-manager", config)
+    val systemName = config
+      .getCheckpointSystem
+      .getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager."))
+    val injectedProducerProps = Map(
+      "request.required.acks" -> "-1",
+      "producer.type" -> "sync",
+      // Subtract one here, because DefaultEventHandler calls messageSendMaxRetries + 1.
+      "message.send.max.retries" -> (Integer.MAX_VALUE - 1).toString)
+    val producerConfig = config.getKafkaSystemProducerConfig(
+      systemName,
+      clientId,
+      injectedProducerProps)
+    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+    val replicationFactor = config.getCheckpointReplicationFactor.getOrElse("3").toInt
+    val socketTimeout = consumerConfig.socketTimeoutMs
+    val bufferSize = consumerConfig.socketReceiveBufferBytes
+    val fetchSize = consumerConfig.fetchMessageMaxBytes // must be > buffer size
+
+    val connectProducer = () => {
+      new Producer[Partition, Array[Byte]](producerConfig)
+    }
+    val zkConnect = Option(consumerConfig.zkConnect)
+      .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
+    val connectZk = () => {
+      new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+    }
+    val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs"))
+    val jobId = config.getJobId.getOrElse("1")
+    val brokersListString = Option(producerConfig.brokerList)
+      .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName))
+    val metadataStore = new ClientUtilTopicMetadataStore(brokersListString, clientId)
+    val stateTopic = getTopic(jobName, jobId)
+    val totalPartitions = Util.getMaxInputStreamPartitions(config).size
+
+    new KafkaCheckpointManager(
+      clientId,
+      stateTopic,
+      systemName,
+      totalPartitions,
+      replicationFactor,
+      socketTimeout,
+      bufferSize,
+      fetchSize,
+      metadataStore,
+      connectProducer,
+      connectZk)
+  }
+
+  private def getTopic(jobName: String, jobId: String) =
+    "__samza_checkpoint_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
new file mode 100644
index 0000000..59d915d
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import scala.collection.JavaConversions._
+import kafka.consumer.ConsumerConfig
+import java.util.Properties
+import kafka.producer.ProducerConfig
+import java.util.UUID
+
+object KafkaConfig {
+  val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex"
+  val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
+  val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
+
+  val CHECKPOINT_SYSTEM = "task.checkpoint.system"
+  val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
+
+  val CONSUMER_KEY_DESERIALIZER = SystemConfig.SYSTEM_PREFIX + "consumer.key.deserializer.class"
+  val CONSUMER_MSG_DESERIALIZER = SystemConfig.SYSTEM_PREFIX + "consumer.deserializer.class"
+
+  implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
+}
+
+class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
+  // checkpoints
+  def getCheckpointSystem = getOption(KafkaConfig.CHECKPOINT_SYSTEM)
+  def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR)
+
+  // custom consumer config
+  def getConsumerKeyDeserializerClass(name: String) = getOption(KafkaConfig.CONSUMER_KEY_DESERIALIZER format name)
+  def getConsumerMsgDeserializerClass(name: String) = getOption(KafkaConfig.CONSUMER_MSG_DESERIALIZER format name)
+
+  /**
+   * Returns a map of topic -> auto.offset.reset value for all streams that
+   * are defined with this property in the config.
+   */
+  def getAutoOffsetResetTopics(systemName: String) = {
+    val subConf = config.subset("systems.%s.streams." format systemName, true)
+    // find all .samza.partition.manager keys, and strip the suffix
+    subConf
+      .filterKeys(k => k.endsWith(".consumer.auto.offset.reset"))
+      .map {
+        case (topicAutoOffsetReset, resetValue) =>
+          (topicAutoOffsetReset.replace(".consumer.auto.offset.reset", ""), resetValue)
+      }.toMap
+  }
+
+  // regex resolver
+  def getRegexResolvedStreams(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName)
+  def getRegexResolvedSystem(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
+  def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
+
+  // kafka config
+  def getKafkaSystemConsumerConfig(
+    systemName: String,
+    clientId: String = "undefined-samza-consumer-" format UUID.randomUUID.toString,
+    groupId: String = "undefined-samza-consumer-group-" format UUID.randomUUID.toString,
+    injectedProps: Map[String, String] = Map()) = {
+
+    val subConf = config.subset("systems.%s.consumer." format systemName, true)
+    val consumerProps = new Properties()
+    consumerProps.putAll(subConf)
+    consumerProps.put("group.id", groupId)
+    consumerProps.put("client.id", clientId)
+    consumerProps.putAll(injectedProps)
+    new ConsumerConfig(consumerProps)
+  }
+ 
+  def getKafkaSystemProducerConfig(
+    systemName: String,
+    clientId: String = "undefined-samza-producer-" format UUID.randomUUID.toString,
+    injectedProps: Map[String, String] = Map()) = {
+
+    val subConf = config.subset("systems.%s.producer." format systemName, true)
+    val producerProps = new Properties()
+    producerProps.putAll(subConf)
+    producerProps.put("client.id", clientId)
+    producerProps.putAll(injectedProps)
+    new ProducerConfig(producerProps)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/config/KafkaSerdeConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaSerdeConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaSerdeConfig.scala
new file mode 100644
index 0000000..11078e3
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaSerdeConfig.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+object KafkaSerdeConfig {
+  // kafka serde config constants
+  val ENCODER = SerializerConfig.SERIALIZER_PREFIX + ".encoder"
+  val DECODER = SerializerConfig.SERIALIZER_PREFIX + ".decoder"
+
+  implicit def Config2KafkaSerde(config: Config) = new KafkaSerdeConfig(config)
+}
+
+class KafkaSerdeConfig(config: Config) extends ScalaMapConfig(config) {
+  def getKafkaEncoder(serializer: String) =
+    getOption(KafkaSerdeConfig.ENCODER format serializer)
+
+  def getKafkaDecoder(serializer: String) =
+    getOption(KafkaSerdeConfig.DECODER format serializer)
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
new file mode 100644
index 0000000..c7c50c4
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{ ZkUtils, ZKStringSerializer }
+import org.apache.samza.config.KafkaConfig.{ Config2Kafka, REGEX_RESOLVED_STREAMS }
+import org.apache.samza.SamzaException
+import collection.JavaConversions._
+import grizzled.slf4j.Logging
+import scala.collection._
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.system.SystemStream
+import org.apache.samza.util.Util
+
+/**
+ * Dynamically determine the Kafka topics to use as input streams to the task via a regular expression.
+ * For each topic that matches the regular expression, generate a series of config values for it and
+ * add it to the task's input streams setting.
+ *
+ * job.config.rewriter.regex-input-rewriter.regex=.*stream
+ * job.config.rewriter.regex-input-rewriter.system=kafka
+ * job.config.rewriter.regex-input-rewriter.config.foo=bar
+ *
+ * Would result in:
+ *
+ * task.inputs=kafka.somestream
+ * systems.kafka.streams.somestream.foo=bar
+ *
+ * @see samza.config.KafkaConfig.getRegexResolvedStreams
+ *
+ */
+class RegexTopicGenerator extends ConfigRewriter with Logging {
+
+  def rewrite(rewriterName: String, config: Config): Config = {
+    val regex = config
+      .getRegexResolvedStreams(rewriterName)
+      .getOrElse(throw new SamzaException("No %s defined in config" format REGEX_RESOLVED_STREAMS))
+    val systemName = config
+      .getRegexResolvedSystem(rewriterName)
+      .getOrElse(throw new SamzaException("No system defined for %s." format rewriterName))
+    val topics = getTopicsFromZK(rewriterName, config)
+    val existingInputStreams = config.getInputStreams
+    val newInputStreams = new mutable.HashSet[SystemStream]
+    val keysAndValsToAdd = new mutable.HashMap[String, String]
+
+    // Find all the topics that match this regex
+    val matchingStreams = topics
+      .filter(_.matches(regex))
+      .map(new SystemStream(systemName, _))
+      .toSet
+
+    for (m <- matchingStreams) {
+      info("Generating new configs for matching stream %s." format m)
+
+      if (existingInputStreams.contains(m)) {
+        throw new SamzaException("Regex '%s' matches existing, statically defined input %s." format (regex, m))
+      }
+
+      newInputStreams.add(m)
+
+      // For each topic that matched, generate all the specified configs
+      config
+        .getRegexResolvedInheritedConfig(rewriterName)
+        .foreach(kv => keysAndValsToAdd.put("systems." + m.getSystem + ".streams." + m.getStream + "." + kv._1, kv._2))
+    }
+    // Build new inputs
+
+    info("Generated config values for %d new topics" format newInputStreams.size)
+
+    val inputStreams = TaskConfig.INPUT_STREAMS -> (existingInputStreams ++ newInputStreams)
+      .map(Util.getNameFromSystemStream(_))
+      .mkString(",")
+
+    new MapConfig((keysAndValsToAdd ++ config) += inputStreams)
+  }
+
+  def getTopicsFromZK(rewriterName: String, config: Config): Seq[String] = {
+    val systemName = config
+      .getRegexResolvedSystem(rewriterName)
+      .getOrElse(throw new SamzaException("No system defined in config for rewriter %s." format rewriterName))
+    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName)
+    val zkConnect = Option(consumerConfig.zkConnect)
+      .getOrElse(throw new SamzaException("No zookeeper.connect for system %s defined in config." format systemName))
+    val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+
+    try {
+      ZkUtils.getAllTopics(zkClient)
+    } finally {
+      zkClient.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/serializers/KafkaSerde.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/serializers/KafkaSerde.scala b/samza-kafka/src/main/scala/org/apache/samza/serializers/KafkaSerde.scala
new file mode 100644
index 0000000..82ba2a0
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/serializers/KafkaSerde.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+import java.nio.ByteBuffer
+import org.apache.samza.util.Util
+import kafka.serializer.Encoder
+import kafka.serializer.Decoder
+import org.apache.samza.config.Config
+import org.apache.samza.config.KafkaSerdeConfig.Config2KafkaSerde
+import org.apache.samza.SamzaException
+
+class KafkaSerde[T](encoder: Encoder[T], decoder: Decoder[T]) extends Serde[T] {
+  def toBytes(obj: T): Array[Byte] = encoder.toBytes(obj)
+  def fromBytes(bytes: Array[Byte]): T = decoder.fromBytes(bytes)
+}
+
+class KafkaSerdeFactory[T] extends SerdeFactory[T] {
+  def getSerde(name: String, config: Config): Serde[T] = {
+    val encoderClassName = config
+      .getKafkaEncoder(name)
+      .getOrElse(throw new SamzaException("No kafka encoder defined for %s" format name))
+    val decoderClassName = config
+      .getKafkaDecoder(name)
+      .getOrElse(throw new SamzaException("No kafka decoder defined for %s" format name))
+    val encoder = Util.getObj[Encoder[T]](encoderClassName)
+    val decoder = Util.getObj[Decoder[T]](decoderClassName)
+    new KafkaSerde(encoder, decoder)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
new file mode 100644
index 0000000..cb5015d
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -0,0 +1,215 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka
+
+import kafka.consumer.SimpleConsumer
+import kafka.api._
+import kafka.common.ErrorMapping
+import java.util.concurrent.{ CountDownLatch, ConcurrentHashMap }
+import scala.collection.JavaConversions._
+import org.apache.samza.config.Config
+import org.apache.samza.util.KafkaUtil
+import org.apache.samza.config.KafkaConfig.Config2Kafka
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.metrics.MetricsRegistry
+import kafka.common.TopicAndPartition
+import kafka.message.MessageSet
+import grizzled.slf4j.Logging
+import java.nio.channels.ClosedByInterruptException
+
+/**
+ * A BrokerProxy consolidates Kafka fetches meant for a particular broker and retrieves them all at once, providing
+ * a way for consumers to retrieve those messages by topic and partition.
+ */
+abstract class BrokerProxy(
+  val host: String,
+  val port: Int,
+  val system: String,
+  val clientID: String,
+  val metricsRegistry: MetricsRegistry,
+  tpMetrics: TopicAndPartitionMetrics,
+  val timeout: Int = Int.MaxValue,
+  val bufferSize: Int = 1024000,
+  offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging with BrokerProxyMetrics {
+
+  val messageSink: MessageSink
+
+  /**
+   * How long should the fetcher thread sleep before checking if any TopicPartitions has been added to its purview
+   */
+  val sleepMSWhileNoTopicPartitions = 1000
+
+  /** What's the next offset for a particular partition? **/
+  val nextOffsets: ConcurrentHashMap[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]()
+
+  /** Block on the first call to get message if the fetcher has not yet returned its initial results **/
+  // TODO: It should be sufficient to just use the count down latch and await on it for each of the calls, but
+  // VisualVM was showing the consumer thread spending all its time in the await method rather than returning
+  // immediately, even though the process was proceeding normally.  Hence the extra boolean.  Should be investigated.
+  val firstCallBarrier = new CountDownLatch(1)
+  var firstCall = true
+
+  var simpleConsumer = createSimpleConsumer()
+
+  def createSimpleConsumer() = {
+    val hostString = "%s:%d" format (host, port)
+    info("Creating new SimpleConsumer for host %s for system %s" format (hostString, system))
+
+    val sc = new SimpleConsumer(host, port, timeout, bufferSize, clientID) with DefaultFetch {
+      val fetchSize: Int = 256 * 1024
+    }
+
+    sc
+  }
+
+  def addTopicPartition(tp: TopicAndPartition, lastCheckpointedOffset: String) = {
+    debug("Adding new topic and partition %s to queue for %s" format (tp, host))
+    if (nextOffsets.containsKey(tp)) toss("Already consuming TopicPartition %s" format tp)
+
+    val offset = offsetGetter.getNextOffset(simpleConsumer, tp, lastCheckpointedOffset)
+    nextOffsets += tp -> offset
+
+    tpGaugeInc
+  }
+
+  def removeTopicPartition(tp: TopicAndPartition) = {
+    if (nextOffsets.containsKey(tp)) {
+      nextOffsets.remove(tp)
+      tpGaugeDec
+      debug("Removed %s" format tp)
+    } else {
+      warn("Asked to remove topic and partition %s, but not in map (keys = %s)" format (tp, nextOffsets.keys().mkString(",")))
+    }
+  }
+
+  val thread: Thread = new Thread(new Runnable() {
+    def run() {
+      info("Starting thread for BrokerProxy")
+
+      while (!Thread.currentThread.isInterrupted) {
+        if (nextOffsets.size == 0) {
+          debug("No TopicPartitions to fetch. Sleeping.")
+          Thread.sleep(sleepMSWhileNoTopicPartitions)
+        } else {
+          try {
+            fetchMessages()
+          } catch {
+            // If we're interrupted, don't try and reconnect. We should shut down.
+            case e: InterruptedException =>
+              debug("Shutting down due to interrupt exception.")
+              Thread.currentThread.interrupt
+            case e: ClosedByInterruptException =>
+              debug("Shutting down due to closed by interrupt exception.")
+              Thread.currentThread.interrupt
+            case e: Throwable => {
+              warn("Recreating simple consumer and retrying connection")
+              debug("Stack trace for fetchMessages exception.", e)
+              simpleConsumer.close()
+              simpleConsumer = createSimpleConsumer()
+              reconnectCounter.inc
+            }
+          }
+        }
+      }
+
+    }
+  }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID))
+
+  private def fetchMessages(): Unit = {
+    val response: FetchResponse = simpleConsumer.defaultFetch(nextOffsets.toList: _*)
+    firstCall = false
+    firstCallBarrier.countDown()
+    if (response.hasError) {
+      // FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves
+      case class Error(tp: TopicAndPartition, code: Short, exception: Throwable)
+
+      val errors = for (
+        error <- response.data.entrySet.filter(_.getValue.error != ErrorMapping.NoError);
+        errorCode <- Option(response.errorCode(error.getKey.topic, error.getKey.partition)); // Scala's being cranky about referring to error.getKey values...
+        exception <- Option(ErrorMapping.exceptionFor(errorCode))
+      ) yield new Error(error.getKey, errorCode, exception)
+
+      val (notLeaders, otherErrors) = errors.partition(_.code == ErrorMapping.NotLeaderForPartitionCode)
+
+      if (!notLeaders.isEmpty) {
+        info("Abdicating. Got not leader exception for: " + notLeaders.mkString(","))
+
+        notLeaders.foreach(e => {
+          // Go back one message, since the fetch for nextOffset failed, and 
+          // abdicate requires lastOffset, not nextOffset.
+          messageSink.abdicate(e.tp, nextOffsets.remove(e.tp) - 1)
+        })
+      }
+
+      if (!otherErrors.isEmpty) {
+        warn("Got error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format errors.mkString(","))
+        otherErrors.foreach(e => ErrorMapping.maybeThrowException(e.code)) // One will get thrown
+      }
+    }
+
+    def moveMessagesToTheirQueue(tp: TopicAndPartition, data: FetchResponsePartitionData) = {
+      val messageSet: MessageSet = data.messages
+      var nextOffset = nextOffsets(tp)
+
+      messageSink.setIsAtHighWatermark(tp, data.hw == 0 || data.hw == nextOffset)
+
+      for (message <- messageSet.iterator) {
+        messageSink.addMessage(tp, message, data.hw) // TODO: Verify this is correct
+
+        nextOffset = message.nextOffset
+
+        tpMetrics.getReadsCounter(tp).inc
+        tpMetrics.getBytesReadCounter(tp).inc(message.message.payloadSize + message.message.keySize)
+        tpMetrics.getOffsetCounter(tp).set(nextOffset)
+      }
+
+      nextOffsets.replace(tp, nextOffset) // use replace rather than put in case this tp was removed while we were fetching.
+
+      // Update high water mark
+      val hw = data.hw
+      if (hw >= 0) {
+        getLagGauge(tp).set(hw - nextOffset)
+      } else {
+        debug("Got a high water mark less than 0 (%d) for %s, so skipping." format (hw, tp))
+      }
+    }
+
+    response.data.foreach { case (tp, data) => moveMessagesToTheirQueue(tp, data) }
+
+  }
+
+  override def toString() = "BrokerProxy for %s:%d" format (host, port)
+
+  def start {
+    debug("Starting broker proxy for %s:%s." format (host, port))
+
+    thread.setDaemon(true)
+    thread.start
+  }
+
+  def stop {
+    debug("Shutting down broker proxy for %s:%s." format (host, port))
+
+    thread.interrupt
+    thread.join
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxyMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxyMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxyMetrics.scala
new file mode 100644
index 0000000..bdd91da
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxyMetrics.scala
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka
+
+import org.apache.samza.metrics.{Gauge, Counter, MetricsRegistry}
+import collection.mutable
+import kafka.common.TopicAndPartition
+
+private[kafka] trait BrokerProxyMetrics {
+  self:BrokerProxy =>
+  // TODO: Move topic-partition specific metrics out of brokerproxy, into system
+  val metricsRegistry:MetricsRegistry
+
+  def newCounter = metricsRegistry.newCounter(metricsGroup, _:String)
+
+  val hostPort = host + ":"  + port
+  val metricsGroup = "samza.kafka.brokerproxy"
+
+  // Counters
+  val reconnectCounter = newCounter("%s-Reconnects" format hostPort)
+
+
+
+  // Gauges
+  val lagGauges = mutable.Map[TopicAndPartition, Gauge[Long]]()
+  def getLagGauge(tp:TopicAndPartition) = lagGauges.getOrElseUpdate(tp, metricsRegistry.newGauge[Long](metricsGroup, "%s-MessagesBehindHighWaterMark" format tp, 0l))
+
+  val tpGauge = metricsRegistry.newGauge[Long](metricsGroup, "%s-NumberOfTopicsPartitions" format hostPort, 0)
+
+  def tpGaugeInc = tpGauge.set(tpGauge.getValue + 1l)
+
+  def tpGaugeDec = tpGauge.set(tpGauge.getValue - 1l)
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetch.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetch.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetch.scala
new file mode 100644
index 0000000..41710f2
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetch.scala
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka
+
+import kafka.consumer.SimpleConsumer
+import kafka.api.FetchRequestBuilder
+import kafka.common.TopicAndPartition
+
+/**
+ * Extension to a SimpleConsumer that defines the default parameters necessary for default fetch requests.  Builds
+ * such a fetch request, requests the fetch and returns the result
+ */
+trait DefaultFetch {
+  self:SimpleConsumer =>
+  val maxWait:Int = Int.MaxValue
+  val minBytes:Int = 1
+  val clientId:String
+  val fetchSize:Int
+
+  def defaultFetch(fetches:(TopicAndPartition, Long)*) = {
+    val fbr = new FetchRequestBuilder().maxWait(1000)
+                                       .minBytes(minBytes)
+                                       .clientId(clientId)
+
+    fetches.foreach(f => fbr.addFetch(f._1.topic, f._1.partition, f._2, fetchSize))
+
+    this.fetch(fbr.build())
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
new file mode 100644
index 0000000..326d6c9
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka
+
+import kafka.consumer.SimpleConsumer
+import kafka.common.{ OffsetOutOfRangeException, ErrorMapping }
+import kafka.api._
+import org.apache.samza.config.KafkaConfig
+import org.apache.samza.config.KafkaConfig.Config2Kafka
+import kafka.common.TopicAndPartition
+import kafka.api.PartitionOffsetRequestInfo
+import grizzled.slf4j.Logging
+
+class GetOffset(default: String, autoOffsetResetTopics: Map[String, String] = Map()) extends Logging with Toss {
+
+  private def getAutoOffset(topic: String): Long = {
+    info("Checking if auto.offset.reset is defined for topic %s" format (topic))
+    autoOffsetResetTopics.getOrElse(topic, default) match {
+      case OffsetRequest.LargestTimeString =>
+        info("Got reset of type %s." format OffsetRequest.LargestTimeString)
+        OffsetRequest.LatestTime
+      case OffsetRequest.SmallestTimeString =>
+        info("Got reset of type %s." format OffsetRequest.SmallestTimeString)
+        OffsetRequest.EarliestTime
+      case other => toss("Can't get offset value for topic %s due to invalid value: %s" format (topic, other))
+    }
+  }
+
+  /**
+   *  An offset was provided but may not be valid.  Verify its validity.
+   */
+  private def useLastCheckpointedOffset(sc: DefaultFetch, last: String, tp: TopicAndPartition): Option[Long] = {
+    try {
+      info("Validating offset %s for topic and partition %s" format (last, tp))
+
+      val messages = sc.defaultFetch((tp, last.toLong))
+
+      if (messages.hasError) {
+        ErrorMapping.maybeThrowException(messages.errorCode(tp.topic, tp.partition))
+      }
+
+      info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (last, tp))
+
+      val nextOffset = messages
+        .messageSet(tp.topic, tp.partition)
+        .head
+        .nextOffset
+
+      info("Got next offset %s for %s." format (nextOffset, tp))
+
+      Some(nextOffset)
+    } catch {
+      case e: OffsetOutOfRangeException =>
+        info("An out of range Kafka offset (%s) was supplied for topic and partition %s, so falling back to autooffset.reset." format (last, tp))
+        None
+    }
+  }
+
+  def getNextOffset(sc: SimpleConsumer with DefaultFetch, tp: TopicAndPartition, lastCheckpointedOffset: String): Long = {
+    val offsetRequest = new OffsetRequest(Map(tp -> new PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1)))
+    val offsetResponse = sc.getOffsetsBefore(offsetRequest)
+    val partitionOffsetResponse = offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to find offset information for %s" format tp))
+    val autoOffset = partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but no offsets defined for %s" format tp))
+
+    info("Got offset %d for topic and partition %s" format (autoOffset, tp))
+
+    val actualOffset = Option(lastCheckpointedOffset) match {
+      case Some(last) => useLastCheckpointedOffset(sc, last, tp).getOrElse(autoOffset)
+      case None => autoOffset
+    }
+
+    info("Final offset to be returned for Topic and Partition %s = %d" format (tp, actualOffset))
+    actualOffset
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
new file mode 100644
index 0000000..183c6cc
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka
+
+import org.apache.samza.Partition
+import java.util.UUID
+import org.apache.samza.util.ClientUtilTopicMetadataStore
+import kafka.api.TopicMetadata
+import scala.collection.JavaConversions._
+import org.apache.samza.system.SystemAdmin
+
+class KafkaSystemAdmin(
+  systemName: String,
+  // TODO whenever Kafka decides to make the Set[Broker] class public, let's switch to Set[Broker] here.
+  brokerListString: String,
+  clientId: String = UUID.randomUUID.toString) extends SystemAdmin {
+
+  def getPartitions(streamName: String): java.util.Set[Partition] = {
+    val getTopicMetadata = (topics: Set[String]) => {
+      new ClientUtilTopicMetadataStore(brokerListString, clientId)
+        .getTopicInfo(topics)
+    }
+
+    val metadata = TopicMetadataCache.getTopicMetadata(
+      Set(streamName),
+      systemName,
+      getTopicMetadata)
+
+    metadata(streamName)
+      .partitionsMetadata
+      .map(pm => new Partition(pm.partitionId))
+      .toSet[Partition]
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
new file mode 100644
index 0000000..bd7794a
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka
+
+import org.apache.samza.util.{ KafkaUtil, ClientUtilTopicMetadataStore }
+import kafka.common.TopicAndPartition
+import org.apache.samza.config.{ KafkaConfig, Config }
+import org.apache.samza.SamzaException
+import org.apache.samza.config.KafkaConfig.Config2Kafka
+import org.apache.samza.metrics.MetricsRegistry
+import grizzled.slf4j.Logging
+import scala.collection.JavaConversions._
+import kafka.message.MessageAndOffset
+import org.apache.samza.Partition
+import kafka.utils.Utils
+import org.apache.samza.util.Clock
+import java.util.UUID
+import kafka.serializer.DefaultDecoder
+import kafka.serializer.Decoder
+import org.apache.samza.util.BlockingEnvelopeMap
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.IncomingMessageEnvelope
+import java.nio.charset.Charset
+
+object KafkaSystemConsumer {
+  def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
+    val topic = systemStreamPartition.getStream
+    val partitionId = systemStreamPartition.getPartition.getPartitionId
+    TopicAndPartition(topic, partitionId)
+  }
+}
+
+/**
+ *  Maintain a cache of BrokerProxies, returning the appropriate one for the
+ *  requested topic and partition.
+ */
+private[kafka] class KafkaSystemConsumer(
+  systemName: String,
+  brokerListString: String,
+  metricsRegistry: MetricsRegistry,
+  clientId: String = "undefined-client-id-" + UUID.randomUUID.toString,
+  queueSize: Int = 1000,
+  timeout: Int = Int.MaxValue,
+  bufferSize: Int = 1024000,
+  brokerMetadataFailureRefreshMs: Long = 10000,
+  offsetGetter: GetOffset = new GetOffset("fail"),
+  deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
+  keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
+  clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(queueSize, metricsRegistry, new Clock {
+  def currentTimeMillis = clock()
+}) with Toss with Logging {
+
+  type HostPort = (String, Int)
+  val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
+  var lastReadOffsets = Map[SystemStreamPartition, String]()
+  val topicAndPartitionMetrics = new TopicAndPartitionMetrics(metricsRegistry)
+
+  def start() {
+    val topicPartitionsAndOffsets = lastReadOffsets.map {
+      case (systemStreamPartition, offset) =>
+        val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)
+        topicAndPartitionMetrics.addNewTopicAndPartition(topicAndPartition)
+        (topicAndPartition, offset)
+    }
+
+    refreshBrokers(topicPartitionsAndOffsets)
+
+    brokerProxies.values.foreach(_.start)
+  }
+
+  override def register(systemStreamPartition: SystemStreamPartition, lastReadOffset: String) {
+    super.register(systemStreamPartition, lastReadOffset)
+
+    lastReadOffsets += systemStreamPartition -> lastReadOffset
+  }
+
+  def stop() {
+    brokerProxies.values.foreach(_.stop)
+  }
+
+  def refreshBrokers(topicPartitionsAndOffsets: Map[TopicAndPartition, String]) {
+    var done = false
+
+    while (!done) {
+      try {
+        val getTopicMetadata = (topics: Set[String]) => {
+          new ClientUtilTopicMetadataStore(brokerListString, clientId).getTopicInfo(topics)
+        }
+
+        val partitionMetadata = TopicMetadataCache.getTopicMetadata(
+          topicPartitionsAndOffsets.keys.map(_.topic).toSet,
+          systemName,
+          getTopicMetadata)
+
+        topicPartitionsAndOffsets.map {
+          case (topicAndPartition, lastOffset) =>
+            // TODO whatever we do, we can't say Broker, even though we're 
+            // manipulating it here. Broker is a private type and Scala doesn't seem 
+            // to care about that as long as you don't explicitly declare its type.
+            val brokerOption = partitionMetadata(topicAndPartition.topic)
+              .partitionsMetadata
+              .find(_.partitionId == topicAndPartition.partition)
+              .getOrElse(toss("Can't find leader for %s" format topicAndPartition))
+              .leader
+
+            brokerOption match {
+              case Some(broker) =>
+                val brokerProxy = brokerProxies.getOrElseUpdate((broker.host, broker.port), new BrokerProxy(broker.host, broker.port, systemName, clientId, metricsRegistry, topicAndPartitionMetrics, timeout, bufferSize, offsetGetter) {
+                  val messageSink: MessageSink = sink
+                })
+
+                brokerProxy.addTopicPartition(topicAndPartition, lastOffset)
+              case _ => warn("Broker for %s not defined! " format topicAndPartition)
+            }
+        }
+
+        done = true
+      } catch {
+        case e: Throwable =>
+          warn("An exception was thrown while refreshing brokers for %s. Waiting a bit and retrying, since we can't continue without broker metadata." format topicPartitionsAndOffsets.keySet)
+          debug(e)
+
+          try {
+            Thread.sleep(brokerMetadataFailureRefreshMs)
+          } catch {
+            case e: InterruptedException =>
+              info("Interrupted while waiting to retry metadata refresh, so shutting down.")
+
+              stop
+          }
+      }
+    }
+  }
+
+  val sink = new MessageSink {
+    def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
+      setIsAtHead(toSystemStreamPartition(tp), isAtHighWatermark)
+    }
+
+    def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = {
+      trace("Incoming message %s: %s." format (tp, msg))
+
+      val systemStreamPartition = toSystemStreamPartition(tp)
+      val isAtHead = highWatermark == msg.offset
+      val offset = msg.offset.toString
+      val key = if (msg.message.key != null) {
+        keyDeserializer.fromBytes(Utils.readBytes(msg.message.key))
+      } else {
+        null
+      }
+      val message = if (msg.message.buffer != null) {
+        deserializer.fromBytes(Utils.readBytes(msg.message.payload))
+      } else {
+        null
+      }
+
+      // TODO use kafka encoder/decoder here, if they were defined in config
+      add(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message))
+
+      setIsAtHead(systemStreamPartition, isAtHead)
+    }
+
+    def abdicate(tp: TopicAndPartition, lastOffset: Long) {
+      refreshBrokers(Map(tp -> lastOffset.toString))
+    }
+
+    private def toSystemStreamPartition(tp: TopicAndPartition) = {
+      new SystemStreamPartition(systemName, tp.topic, new Partition(tp.partition))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
new file mode 100644
index 0000000..13c5baa
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka
+
+import org.apache.samza.util.KafkaUtil
+import org.apache.samza.config.Config
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.config.KafkaConfig
+import org.apache.samza.config.KafkaConfig.Config2Kafka
+import org.apache.samza.config.StreamConfig.Config2Stream
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.util.ClientUtilTopicMetadataStore
+import org.apache.samza.SamzaException
+import scala.collection.JavaConversions._
+import java.util.Properties
+import kafka.producer.Producer
+import kafka.producer.async.DefaultEventHandler
+import kafka.utils.Utils
+import org.apache.samza.util.Util
+import kafka.serializer.Decoder
+import kafka.serializer.DefaultDecoder
+import org.apache.samza.system.SystemFactory
+
+class KafkaSystemFactory extends SystemFactory {
+  def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = {
+    val clientId = KafkaUtil.getClientId("samza-consumer", config)
+
+    // Kind of goofy to need a producer config for consumers, but we need metadata.
+    val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
+    val brokerListString = Option(producerConfig.brokerList)
+      .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName))
+    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+
+    // TODO could add stream-level overrides for timeout and buffer size
+    val timeout = consumerConfig.socketTimeoutMs
+    val bufferSize = consumerConfig.socketReceiveBufferBytes
+    val autoOffsetResetDefault = consumerConfig.autoOffsetReset
+    val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
+    val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics)
+    val deserializer = config.getConsumerMsgDeserializerClass(systemName) match {
+      case Some(deserializerClass) => Util.getObj[Decoder[Object]](deserializerClass)
+      case _ => new DefaultDecoder().asInstanceOf[Decoder[Object]]
+    }
+    val keyDeserializer = config.getConsumerKeyDeserializerClass(systemName) match {
+      case Some(deserializerClass) => Util.getObj[Decoder[Object]](deserializerClass)
+      case _ => new DefaultDecoder().asInstanceOf[Decoder[Object]]
+    }
+
+    new KafkaSystemConsumer(
+      systemName = systemName,
+      brokerListString = brokerListString,
+      metricsRegistry = registry,
+      clientId = clientId,
+      // TODO make this configurable?
+      queueSize = 1000,
+      timeout = timeout,
+      bufferSize = bufferSize,
+      offsetGetter = offsetGetter)
+  }
+
+  def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = {
+    val clientId = KafkaUtil.getClientId("samza-producer", config)
+    val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
+    val batchSize = Option(producerConfig.batchNumMessages)
+      .getOrElse(1000)
+    val reconnectIntervalMs = Option(producerConfig.retryBackoffMs)
+      .getOrElse(10000)
+    val getProducer = () => { new Producer[Object, Object](producerConfig) }
+
+    // Unlike consumer, no need to use encoders here, since they come for free 
+    // inside the producer configs. Kafka's producer will handle all of this 
+    // for us.
+
+    new KafkaSystemProducer(
+      systemName,
+      batchSize,
+      reconnectIntervalMs,
+      registry,
+      getProducer)
+  }
+
+  def getAdmin(systemName: String, config: Config) = {
+    val clientId = KafkaUtil.getClientId("samza-admin", config)
+    val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
+    val brokerListString = Option(producerConfig.brokerList)
+      .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName))
+
+    new KafkaSystemAdmin(systemName, brokerListString, clientId)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
new file mode 100644
index 0000000..c4e4bec
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka
+
+import java.nio.ByteBuffer
+import java.util.Properties
+import scala.collection.mutable.ArrayBuffer
+import grizzled.slf4j.Logging
+import kafka.producer.KeyedMessage
+import kafka.producer.Producer
+import kafka.producer.ProducerConfig
+import org.apache.samza.config.Config
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.util.KafkaUtil
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.OutgoingMessageEnvelope
+
+object KafkaSystemProducerMetrics {
+  val metricsGroup = "samza.kafka.producer"
+}
+
+class KafkaSystemProducer(
+  systemName: String,
+  batchSize: Int,
+  reconnectIntervalMs: Long,
+  registry: MetricsRegistry,
+  getProducer: () => Producer[Object, Object]) extends SystemProducer with Logging {
+
+  val flushReconnectCounter = registry.newCounter(KafkaSystemProducerMetrics.metricsGroup, "Producer-%s-Reconnects" format systemName)
+  var sourceBuffers = Map[String, ArrayBuffer[KeyedMessage[Object, Object]]]()
+  var producer: Producer[Object, Object] = null
+
+  def start() {
+  }
+
+  def stop() {
+    if (producer != null) {
+      producer.close
+    }
+  }
+
+  def register(source: String) {
+    sourceBuffers += source -> ArrayBuffer()
+  }
+
+  def send(source: String, envelope: OutgoingMessageEnvelope) {
+    sourceBuffers(source) += new KeyedMessage[Object, Object](
+      envelope.getSystemStream.getStream,
+      envelope.getKey,
+      envelope.getPartitionKey,
+      envelope.getMessage)
+
+    if (sourceBuffers(source).size >= batchSize) {
+      commit(source)
+    }
+  }
+
+  def commit(source: String) {
+    val buffer = sourceBuffers(source)
+    var done = false
+
+    debug("Flushing buffer with size: %s." format buffer.size)
+
+    while (!done) {
+      try {
+        if (producer == null) {
+          info("Creating a new producer for system %s." format systemName)
+          producer = getProducer()
+          debug("Created a new producer for system %s." format systemName)
+        }
+
+        producer.send(buffer: _*)
+        done = true
+      } catch {
+        case e: Throwable =>
+          warn("Triggering a reconnect for %s because connection failed: %s" format (systemName, e.getMessage))
+          debug("Exception while producing to %s." format systemName, e)
+
+          flushReconnectCounter.inc
+
+          if (producer != null) {
+            producer.close
+            producer = null
+          }
+
+          try {
+            Thread.sleep(reconnectIntervalMs)
+          } catch {
+            case e: InterruptedException => None
+          }
+      }
+    }
+
+    buffer.clear
+    debug("Flushed buffer.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
new file mode 100644
index 0000000..71fae59
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.kafka
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndOffset
+
+private[kafka] trait MessageSink {
+  def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit
+
+  def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long): Unit
+
+  def abdicate(tp: TopicAndPartition, lastOffset: Long): Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicAndPartitionMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicAndPartitionMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicAndPartitionMetrics.scala
new file mode 100644
index 0000000..75fc022
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicAndPartitionMetrics.scala
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.system.kafka
+
+import org.apache.samza.metrics.{MetricsRegistry, Counter}
+import kafka.common.TopicAndPartition
+import java.util.concurrent.ConcurrentHashMap
+import grizzled.slf4j.Logging
+
+/**
+ * Wrapper around the metrics that BrokerProxies will be updating per topic partition.  Multiple BrokerProxies will
+ * be updating the map at the same time, but no two BrokerProxies should be updating the same key at the same time.
+ *
+ * @param metricsRegistry Registry to hook counters into.
+ */
+private[kafka] class TopicAndPartitionMetrics(metricsRegistry:MetricsRegistry) extends Logging {
+  val metricsGroup = "KafkaSystem"
+
+  val counters = new ConcurrentHashMap[TopicAndPartition, (Counter,Counter,Counter)]()
+
+  def addNewTopicAndPartition(tp:TopicAndPartition) = {
+    if(containsTopicAndPartition(tp)) {
+      warn("TopicAndPartitionsMetrics already has an entry for topic-partition %s, not adding." format tp)
+    } else {
+      counters.put(tp, (newCounter("%s-OffsetChange" format tp),  newCounter("%s-BytesRead" format tp), newCounter("%s-Reads" format tp)))
+    }
+  }
+
+  def containsTopicAndPartition(tp:TopicAndPartition) = counters.containsKey(tp)
+
+  def newCounter = metricsRegistry.newCounter(metricsGroup, _:String)
+
+  def getOffsetCounter(tp:TopicAndPartition) = counters.get(tp)._1
+
+  def getBytesReadCounter(tp:TopicAndPartition) = counters.get(tp)._2
+
+  def getReadsCounter(tp:TopicAndPartition) = counters.get(tp)._3
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
new file mode 100644
index 0000000..8a24ce3
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka
+
+import scala.annotation.implicitNotFound
+
+import grizzled.slf4j.Logging
+import kafka.api.TopicMetadata
+import kafka.common.ErrorMapping
+
+/**
+ * TopicMetadataCache is used to cache all the topic metadata for Kafka per
+ * (system, topic) partition. The cache access is thread safe. Each entry in
+ * the cache is refreshed after a specified interval. The cache uses the passed
+ * in getTopicInfoFromStore that retrieves the topic metadata from the store (usually zookeeper).
+ */
+object TopicMetadataCache extends Logging {
+  private case class MetadataInfo(var streamMetadata: TopicMetadata, var lastRefreshMs: Long)
+  private val topicMetadataMap: scala.collection.mutable.Map[(String, String), MetadataInfo] = new scala.collection.mutable.HashMap[(String, String), MetadataInfo]
+  private val lock = new Object
+
+  // used to fetch the topic metadata from the store. Accepts a topic and system
+  type FetchTopicMetadataType = (Set[String]) => Map[String, TopicMetadata]
+
+  def getTopicMetadata(topics: Set[String], systemName: String, getTopicInfoFromStore: FetchTopicMetadataType, cacheTimeout: Long = 5000L, getTime: () => Long = { System.currentTimeMillis }): Map[String, TopicMetadata] = {
+    lock synchronized {
+      val time = getTime()
+      val missingTopics = topics.filter(topic => !topicMetadataMap.contains(systemName, topic))
+      val topicsWithBadOrExpiredMetadata = (topics -- missingTopics).filter(topic => {
+        val metadata = topicMetadataMap(systemName, topic)
+        metadata.streamMetadata.errorCode != ErrorMapping.NoError || ((time - metadata.lastRefreshMs) > cacheTimeout)
+      })
+      val topicsToRefresh = missingTopics ++ topicsWithBadOrExpiredMetadata
+
+      if (topicsToRefresh.size > 0) {
+        // Refresh topic information for any missing, expired, or bad topic metadata.
+        topicMetadataMap ++= getTopicInfoFromStore(missingTopics ++ topicsWithBadOrExpiredMetadata)
+          .map { case (topic, metadata) => ((systemName, topic), MetadataInfo(metadata, getTime())) }
+          .toMap
+      }
+
+      // Use our new updated cache to return a map of topic -> metadata
+      topicMetadataMap
+        .filterKeys(topics.map(topic => (systemName, topic)))
+        .map {
+          case ((systemName, topic), metadata) =>
+            (topic, metadata.streamMetadata)
+        }.toMap
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/system/kafka/Toss.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/Toss.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/Toss.scala
new file mode 100644
index 0000000..5cda26e
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/Toss.scala
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka
+
+import org.apache.samza.SamzaException
+
+private[kafka] trait Toss {
+  def toss(s:String) = throw new SamzaException(s)
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala b/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
new file mode 100644
index 0000000..0bc1867
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util
+
+import kafka.api.{ TopicMetadataResponse, TopicMetadata }
+import org.apache.samza.SamzaException
+import kafka.client.ClientUtils
+import grizzled.slf4j.Logging
+import kafka.common.ErrorMapping
+import kafka.cluster.Broker
+import java.util.concurrent.atomic.AtomicInteger
+
+trait TopicMetadataStore extends Logging {
+  def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata]
+}
+
+class ClientUtilTopicMetadataStore(brokersListString: String, clientId: String, timeout: Int = 6000) extends TopicMetadataStore {
+  val brokers = ClientUtils.parseBrokerList(brokersListString)
+  var corrID = new AtomicInteger(0)
+
+  def getTopicInfo(topics: Set[String]) = {
+    val currCorrId = corrID.getAndIncrement
+    val response: TopicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, clientId, timeout, currCorrId)
+
+    if (response.correlationId != currCorrId) {
+      throw new SamzaException("CorrelationID did not match for request on topics %s (sent %d, got %d)" format (topics, currCorrId, response.correlationId))
+    }
+
+    response.topicsMetadata
+      .map(metadata => (metadata.topic, metadata))
+      .toMap
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5ff71e51/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
new file mode 100644
index 0000000..d660b91
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util
+
+import org.apache.samza.config.{KafkaConfig, Config, ConfigException}
+import org.apache.samza.config.JobConfig.Config2Job
+import java.util.concurrent.atomic.AtomicLong
+import kafka.client.ClientUtils
+import org.apache.samza.SamzaException
+
+object KafkaUtil {
+  val counter = new AtomicLong(0)
+
+  def getClientId(id: String, config: Config): String = getClientId(
+    id,
+    config.getName.getOrElse(throw new ConfigException("Missing job name.")),
+    config.getJobId.getOrElse("1"))
+
+  def getClientId(id: String, jobName: String, jobId: String): String =
+    "%s-%s-%s-%s-%s" format
+      (id.replaceAll("[^A-Za-z0-9]", "_"),
+        jobName.replaceAll("[^A-Za-z0-9]", "_"),
+        jobId.replaceAll("[^A-Za-z0-9]", "_"),
+        System.currentTimeMillis,
+        counter.getAndIncrement)
+
+}