You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/18 16:56:07 UTC

[2/2] samza git commit: Keep a version of deprecated KafkaSystemConsumer

Keep a version of deprecated KafkaSystemConsumer

Author: Boris S <bo...@apache.org>
Author: Boris S <bs...@linkedin.com>
Author: Boris Shkolnik <bs...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@apache.org>

Closes #740 from sborya/OldKafkaConsumer


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/44d0685d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/44d0685d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/44d0685d

Branch: refs/heads/master
Commit: 44d0685dfbd979b11209903fadaee2468b001092
Parents: 5f1e752
Author: Boris S <bo...@apache.org>
Authored: Thu Oct 18 09:56:01 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Thu Oct 18 09:56:01 2018 -0700

----------------------------------------------------------------------
 .../system/kafka_deprecated/BrokerProxy.scala   | 332 ++++++++++
 .../DefaultFetchSimpleConsumer.scala            |  66 ++
 .../system/kafka_deprecated/GetOffset.scala     | 116 ++++
 .../kafka_deprecated/KafkaSystemAdmin.scala     | 609 +++++++++++++++++++
 .../kafka_deprecated/KafkaSystemConsumer.scala  | 309 ++++++++++
 .../KafkaSystemConsumerMetrics.scala            | 100 +++
 .../kafka_deprecated/KafkaSystemFactory.scala   | 178 ++++++
 .../kafka_deprecated/KafkaSystemProducer.scala  | 235 +++++++
 .../KafkaSystemProducerMetrics.scala            |  42 ++
 .../system/kafka_deprecated/MessageSink.scala   |  35 ++
 .../kafka_deprecated/TopicMetadataCache.scala   |  78 +++
 .../samza/system/kafka_deprecated/Toss.scala    |  28 +
 12 files changed, 2128 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/44d0685d/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/BrokerProxy.scala
new file mode 100644
index 0000000..a39752b
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/BrokerProxy.scala
@@ -0,0 +1,332 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka_deprecated
+
+import java.lang.Thread.UncaughtExceptionHandler
+import java.nio.channels.ClosedByInterruptException
+import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
+
+import kafka.api._
+import kafka.common.{ErrorMapping, NotLeaderForPartitionException, TopicAndPartition, UnknownTopicOrPartitionException}
+import kafka.consumer.ConsumerConfig
+import kafka.message.MessageSet
+import org.apache.samza.SamzaException
+import org.apache.samza.util.ExponentialSleepStrategy
+import org.apache.samza.util.KafkaUtil
+import org.apache.samza.util.Logging
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent
+
+/**
+ * A BrokerProxy consolidates Kafka fetches meant for a particular broker and retrieves them all at once, providing
+ * a way for consumers to retrieve those messages by topic and partition.
+ */
+class BrokerProxy(
+  val host: String,
+  val port: Int,
+  val system: String,
+  val clientID: String,
+  val metrics: KafkaSystemConsumerMetrics,
+  val messageSink: MessageSink,
+  val timeout: Int = ConsumerConfig.SocketTimeout,
+  val bufferSize: Int = ConsumerConfig.SocketBufferSize,
+  val fetchSize: StreamFetchSizes = new StreamFetchSizes,
+  val consumerMinSize:Int = ConsumerConfig.MinFetchBytes,
+  val consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs,
+  offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging {
+
+  /**
+   * How long should the fetcher thread sleep before checking if any TopicPartitions has been added to its purview
+   */
+  val sleepMSWhileNoTopicPartitions = 100
+
+  /** What's the next offset for a particular partition? **/
+  val nextOffsets:concurrent.Map[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]().asScala
+
+  /** Block on the first call to get message if the fetcher has not yet returned its initial results **/
+  // TODO: It should be sufficient to just use the count down latch and await on it for each of the calls, but
+  // VisualVM was showing the consumer thread spending all its time in the await method rather than returning
+  // immediately, even though the process was proceeding normally.  Hence the extra boolean.  Should be investigated.
+  val firstCallBarrier = new CountDownLatch(1)
+  var firstCall = true
+
+  var simpleConsumer = createSimpleConsumer()
+
+  metrics.registerBrokerProxy(host, port)
+
+  def createSimpleConsumer() = {
+    val hostString = "%s:%d" format (host, port)
+    info("Creating new SimpleConsumer for host %s for system %s" format (hostString, system))
+
+    val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, clientID, fetchSize, consumerMinSize, consumerMaxWait)
+    sc
+  }
+
+  def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
+    debug("Adding new topic and partition %s to queue for %s" format (tp, host))
+
+    if (nextOffsets.asJava.containsKey(tp)) {
+      toss("Already consuming TopicPartition %s" format tp)
+    }
+
+    val offset = if (nextOffset.isDefined && offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
+      nextOffset
+        .get
+        .toLong
+    } else {
+      warn("It appears that we received an invalid or empty offset %s for %s. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues." format (nextOffset, tp))
+
+      offsetGetter.getResetOffset(simpleConsumer, tp)
+    }
+
+    debug("Got offset %s for new topic and partition %s." format (offset, tp))
+
+    nextOffsets += tp -> offset
+
+    metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
+  }
+
+  def removeTopicPartition(tp: TopicAndPartition) = {
+    if (nextOffsets.asJava.containsKey(tp)) {
+      val offset = nextOffsets.remove(tp)
+      metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
+      debug("Removed %s" format tp)
+      offset
+    } else {
+      warn("Asked to remove topic and partition %s, but not in map (keys = %s)" format (tp, nextOffsets.keys.mkString(",")))
+      None
+    }
+  }
+
+  val thread = new Thread(new Runnable {
+    def run {
+      var reconnect = false
+
+      try {
+        (new ExponentialSleepStrategy).run(
+          loop => {
+            if (reconnect) {
+              metrics.reconnects.get((host, port)).inc
+              simpleConsumer.close()
+              simpleConsumer = createSimpleConsumer()
+            }
+
+            while (!Thread.currentThread.isInterrupted) {
+              messageSink.refreshDropped
+              if (nextOffsets.size == 0) {
+                debug("No TopicPartitions to fetch. Sleeping.")
+                Thread.sleep(sleepMSWhileNoTopicPartitions)
+              } else {
+                fetchMessages
+
+                // If we got here, fetchMessages didn't throw an exception, i.e. it was successful.
+                // In that case, reset the loop delay, so that the next time an error occurs,
+                // we start with a short retry delay.
+                loop.reset
+              }
+            }
+          },
+
+          (exception, loop) => {
+            warn("Restarting consumer due to %s. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace." format exception)
+            debug("Exception detail:", exception)
+            abdicateAll
+            reconnect = true
+          })
+      } catch {
+        case e: InterruptedException       => info("Got interrupt exception in broker proxy thread.")
+        case e: ClosedByInterruptException => info("Got closed by interrupt exception in broker proxy thread.")
+        case e: OutOfMemoryError           => throw new SamzaException("Got out of memory error in broker proxy thread.")
+        case e: StackOverflowError         => throw new SamzaException("Got stack overflow error in broker proxy thread.")
+      }
+
+      if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt.")
+    }
+  }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID))
+
+  private def fetchMessages(): Unit = {
+    val topicAndPartitionsToFetch = nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList
+
+    if (topicAndPartitionsToFetch.size > 0) {
+      metrics.brokerReads.get((host, port)).inc
+      val response: FetchResponse = simpleConsumer.defaultFetch(topicAndPartitionsToFetch: _*)
+      firstCall = false
+      firstCallBarrier.countDown()
+
+      // Split response into errors and non errors, processing the errors first
+      val (nonErrorResponses, errorResponses) = response.data.toSet.partition(_._2.error.code() == ErrorMapping.NoError)
+
+      handleErrors(errorResponses, response)
+
+      nonErrorResponses.foreach { case (tp, data) => moveMessagesToTheirQueue(tp, data) }
+    } else {
+      refreshLatencyMetrics
+
+      debug("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions))
+
+      metrics.brokerSkippedFetchRequests.get((host, port)).inc
+
+      Thread.sleep(sleepMSWhileNoTopicPartitions)
+    }
+  }
+
+  /**
+   * Releases ownership for a single TopicAndPartition. The
+   * KafkaSystemConsumer will try and find a new broker for the
+   * TopicAndPartition.
+   */
+  def abdicate(tp: TopicAndPartition) = removeTopicPartition(tp) match {
+    // Need to be mindful of a tp that was removed by another thread
+    case Some(offset) => messageSink.abdicate(tp, offset)
+    case None => warn("Tried to abdicate for topic partition not in map. Removed in interim?")
+  }
+
+  /**
+   * Releases all TopicAndPartition ownership for this BrokerProxy thread. The
+   * KafkaSystemConsumer will try and find a new broker for the
+   * TopicAndPartition.
+   */
+  def abdicateAll {
+    info("Abdicating all topic partitions.")
+    val immutableNextOffsetsCopy = nextOffsets.toMap
+    immutableNextOffsetsCopy.keySet.foreach(abdicate(_))
+  }
+
+  def handleErrors(errorResponses: Set[(TopicAndPartition, FetchResponsePartitionData)], response: FetchResponse) = {
+    // FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves
+    case class Error(tp: TopicAndPartition, code: Short, exception: Exception)
+
+    // Now subdivide the errors into three types: non-recoverable, not leader (== abdicate) and offset out of range (== get new offset)
+
+    // Convert FetchResponse into easier-to-work-with Errors
+    val errors = for (
+      (topicAndPartition, responseData) <- errorResponses;
+      error <- Option(response.error(topicAndPartition.topic, topicAndPartition.partition)) // Scala's being cranky about referring to error.getKey values...
+    ) yield new Error(topicAndPartition, error.code(), error.exception())
+
+    val (notLeaderOrUnknownTopic, otherErrors) = errors.partition { case (e) => e.code == ErrorMapping.NotLeaderForPartitionCode || e.code == ErrorMapping.UnknownTopicOrPartitionCode }
+    val (offsetOutOfRangeErrors, remainingErrors) = otherErrors.partition(_.code == ErrorMapping.OffsetOutOfRangeCode)
+
+    // Can recover from two types of errors: not leader (go find the new leader) and offset out of range (go get the new offset)
+    // However, we want to bail as quickly as possible if there are non recoverable errors so that the state of the other
+    // topic-partitions remains the same.  That way, when we've rebuilt the simple consumer, we can come around and
+    // handle the recoverable errors.
+    remainingErrors.foreach(e => {
+      warn("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format remainingErrors.mkString(","))
+      KafkaUtil.maybeThrowException(e.exception) })
+
+    notLeaderOrUnknownTopic.foreach(e => {
+      warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating" format(e.code, e.tp))
+      abdicate(e.tp)
+    })
+
+    offsetOutOfRangeErrors.foreach(e => {
+      warn("Received OffsetOutOfRange exception for %s. Current offset = %s" format (e.tp, nextOffsets.getOrElse(e.tp, "not found in map, likely removed in the interim")))
+
+      try {
+        val newOffset = offsetGetter.getResetOffset(simpleConsumer, e.tp)
+        // Put the new offset into the map (if the tp still exists).  Will catch it on the next go-around
+        nextOffsets.replace(e.tp, newOffset)
+      } catch {
+        // UnknownTopic or NotLeader are routine events and handled via abdication.  All others, bail.
+        case _ @ (_:UnknownTopicOrPartitionException | _: NotLeaderForPartitionException) => warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating" format(e.code, e.tp))
+                                                                                             abdicate(e.tp)
+      }
+    })
+  }
+
+  def moveMessagesToTheirQueue(tp: TopicAndPartition, data: FetchResponsePartitionData) = {
+    val messageSet: MessageSet = data.messages
+    var nextOffset = nextOffsets(tp)
+
+    messageSink.setIsAtHighWatermark(tp, data.hw == 0 || data.hw == nextOffset)
+    require(messageSet != null)
+    for (message <- messageSet.iterator) {
+      messageSink.addMessage(tp, message, data.hw) // TODO: Verify this is correct
+
+      nextOffset = message.nextOffset
+
+      val bytesSize = message.message.payloadSize + message.message.keySize
+      metrics.reads.get(tp).inc
+      metrics.bytesRead.get(tp).inc(bytesSize)
+      metrics.brokerBytesRead.get((host, port)).inc(bytesSize)
+      metrics.offsets.get(tp).set(nextOffset)
+    }
+
+    nextOffsets.replace(tp, nextOffset) // use replace rather than put in case this tp was removed while we were fetching.
+
+    // Update high water mark
+    val hw = data.hw
+    if (hw >= 0) {
+      metrics.highWatermark.get(tp).set(hw)
+      metrics.lag.get(tp).set(hw - nextOffset)
+    } else {
+      debug("Got a high water mark less than 0 (%d) for %s, so skipping." format (hw, tp))
+    }
+  }
+  override def toString() = "BrokerProxy for %s:%d" format (host, port)
+
+  def start {
+    if (!thread.isAlive) {
+      info("Starting " + toString)
+      thread.setDaemon(true)
+      thread.setName("Samza BrokerProxy " + thread.getName)
+      thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
+        override def uncaughtException(t: Thread, e: Throwable) = error("Uncaught exception in broker proxy:", e)
+      })
+      thread.start
+    } else {
+      debug("Tried to start an already started broker proxy (%s). Ignoring." format toString)
+    }
+  }
+
+  def stop {
+    info("Shutting down " + toString)
+
+    if (simpleConsumer != null) {
+      info("closing simple consumer...")
+      simpleConsumer.close
+    }
+
+    thread.interrupt
+    thread.join
+  }
+
+  private def refreshLatencyMetrics {
+    nextOffsets.foreach{
+      case (topicAndPartition, offset) => {
+        val latestOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, -1, Request.OrdinaryConsumerId)
+        trace("latest offset of %s is %s" format (topicAndPartition, latestOffset))
+        if (latestOffset >= 0) {
+          // only update the registered topicAndpartitions
+          if(metrics.highWatermark.containsKey(topicAndPartition)) {
+            metrics.highWatermark.get(topicAndPartition).set(latestOffset)
+          }
+          if(metrics.lag.containsKey(topicAndPartition)) {
+            metrics.lag.get(topicAndPartition).set(latestOffset - offset)
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/44d0685d/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/DefaultFetchSimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/DefaultFetchSimpleConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/DefaultFetchSimpleConsumer.scala
new file mode 100644
index 0000000..5f79ea5
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/DefaultFetchSimpleConsumer.scala
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka_deprecated
+
+import kafka.consumer.SimpleConsumer
+import kafka.api._
+import kafka.common.TopicAndPartition
+import kafka.consumer.ConsumerConfig
+
+class DefaultFetchSimpleConsumer(host: scala.Predef.String, port: scala.Int, soTimeout: scala.Int, bufferSize: scala.Int,
+  clientId: scala.Predef.String, fetchSize: StreamFetchSizes = new StreamFetchSizes,
+  minBytes: Int = ConsumerConfig.MinFetchBytes, maxWait: Int = ConsumerConfig.MaxFetchWaitMs)
+  extends SimpleConsumer(host, port, soTimeout, bufferSize, clientId) {
+
+  def defaultFetch(fetches: (TopicAndPartition, Long)*) = {
+    val fbr = new FetchRequestBuilder().maxWait(maxWait)
+      .minBytes(minBytes)
+      .clientId(clientId)
+
+    fetches.foreach(f => fbr.addFetch(f._1.topic, f._1.partition, f._2, fetchSize.streamValue.getOrElse(f._1.topic, fetchSize.defaultValue)))
+
+    this.fetch(fbr.build())
+  }
+
+  override def close(): Unit = super.close()
+
+  override def send(request: TopicMetadataRequest): TopicMetadataResponse = super.send(request)
+
+  override def fetch(request: FetchRequest): FetchResponse = super.fetch(request)
+
+  override def getOffsetsBefore(request: OffsetRequest): OffsetResponse = super.getOffsetsBefore(request)
+
+  override def commitOffsets(request: OffsetCommitRequest): OffsetCommitResponse = super.commitOffsets(request)
+
+  override def fetchOffsets(request: OffsetFetchRequest): OffsetFetchResponse = super.fetchOffsets(request)
+
+  override def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = super.earliestOrLatestOffset(topicAndPartition, earliestOrLatest, consumerId)
+}
+
+/**
+ * a simple class for holding values for the stream's fetch size (fetch.message.max.bytes).
+ * The stream-level fetch size values are put in the streamValue map streamName -> fetchSize.
+ * If stream-level fetch size is not defined, use the default value. The default value is the
+ * Kafka's default fetch size value or the system-level fetch size value (if defined).
+ */
+case class StreamFetchSizes(defaultValue: Int = ConsumerConfig.MaxFetchSize, streamValue: Map[String, Int] = Map[String, Int]())
+

http://git-wip-us.apache.org/repos/asf/samza/blob/44d0685d/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/GetOffset.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/GetOffset.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/GetOffset.scala
new file mode 100644
index 0000000..c4e7354
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/GetOffset.scala
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka_deprecated
+
+import org.apache.kafka.common.errors.OffsetOutOfRangeException
+import kafka.api._
+import kafka.common.TopicAndPartition
+import kafka.api.PartitionOffsetRequestInfo
+import org.apache.samza.util.Logging
+import org.apache.samza.util.KafkaUtil
+
+/**
+ * GetOffset validates offsets for topic partitions, and manages fetching new
+ * offsets for topics using Kafka's auto.offset.reset configuration.
+ */
+class GetOffset(
+  /**
+   * The default auto.offset.reset to use if a topic is not overridden in
+   * autoOffsetResetTopics. Any value other than "earliest" or "latest" will
+   * result in an exception when getRestOffset is called.
+   */
+  default: String,
+
+  /**
+   * Topic-level overrides for auto.offset.reset. Any value other than
+   * "earliest" or "latest" will result in an exception when getRestOffset is
+   * called.
+   */
+  autoOffsetResetTopics: Map[String, String] = Map()) extends Logging with Toss {
+
+  /**
+   * Checks if an offset is valid for a given topic/partition. Validity is
+   * defined as an offset that returns a readable non-empty message set with
+   * no exceptions.
+   */
+  def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: TopicAndPartition, offset: String) = {
+    info("Validating offset %s for topic and partition %s" format (offset, topicAndPartition))
+
+    try {
+      val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))
+
+      if (messages.hasError) {
+        KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic, topicAndPartition.partition).exception())
+      }
+
+      info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (offset, topicAndPartition))
+
+      true
+    } catch {
+      case e: OffsetOutOfRangeException => false
+    }
+  }
+
+  /**
+   * Uses a topic's auto.offset.reset setting (defined via the
+   * autoOffsetResetTopics map in the constructor) to fetch either the
+   * earliest or latest offset. If neither earliest or latest is defined for
+   * the topic in question, the default supplied in the constructor will be
+   * used.
+   */
+  def getResetOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: TopicAndPartition) = {
+    val offsetRequest = new OffsetRequest(Map(topicAndPartition -> new PartitionOffsetRequestInfo(getAutoOffset(topicAndPartition.topic), 1)))
+    val offsetResponse = consumer.getOffsetsBefore(offsetRequest)
+    val partitionOffsetResponse = offsetResponse
+      .partitionErrorAndOffsets
+      .get(topicAndPartition)
+      .getOrElse(toss("Unable to find offset information for %s" format topicAndPartition))
+
+    KafkaUtil.maybeThrowException(partitionOffsetResponse.error.exception())
+
+    partitionOffsetResponse
+      .offsets
+      .headOption
+      .getOrElse(toss("Got response, but no offsets defined for %s" format topicAndPartition))
+  }
+
+  /**
+   * Returns either the earliest or latest setting (a Kafka constant) for a
+   * given topic using the autoOffsetResetTopics map defined in the
+   * constructor. If the topic is not defined in autoOffsetResetTopics, the
+   * default value supplied in the constructor will be used. This is used in
+   * conjunction with getResetOffset to fetch either the earliest or latest
+   * offset for a topic.
+   */
+  private def getAutoOffset(topic: String): Long = {
+    info("Checking if auto.offset.reset is defined for topic %s" format (topic))
+    autoOffsetResetTopics.getOrElse(topic, default) match {
+      case OffsetRequest.LargestTimeString =>
+        info("Got reset of type %s." format OffsetRequest.LargestTimeString)
+        OffsetRequest.LatestTime
+      case OffsetRequest.SmallestTimeString =>
+        info("Got reset of type %s." format OffsetRequest.SmallestTimeString)
+        OffsetRequest.EarliestTime
+      case other => toss("Can't get offset value for topic %s due to invalid value: %s" format (topic, other))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/44d0685d/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala
new file mode 100644
index 0000000..e7ff749
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala
@@ -0,0 +1,609 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka_deprecated
+
+import java.util
+import java.util.{Properties, UUID}
+
+import com.google.common.annotations.VisibleForTesting
+import kafka.admin.{AdminClient, AdminUtils}
+import kafka.api._
+import kafka.common.TopicAndPartition
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import kafka.utils.ZkUtils
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.errors.TopicExistsException
+import org.apache.kafka.common.TopicPartition
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system._
+import org.apache.samza.system.kafka.KafkaStreamSpec
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, Logging}
+import org.apache.samza.{Partition, SamzaException}
+
+import scala.collection.JavaConverters._
+
+
+object KafkaSystemAdmin extends Logging {
+
+  @VisibleForTesting @volatile var deleteMessagesCalled = false
+  val CLEAR_STREAM_RETRIES = 3
+
+  /**
+   * A helper method that takes oldest, newest, and upcoming offsets for each
+   * system stream partition, and creates a single map from stream name to
+   * SystemStreamMetadata.
+   */
+  def assembleMetadata(oldestOffsets: Map[SystemStreamPartition, String], newestOffsets: Map[SystemStreamPartition, String], upcomingOffsets: Map[SystemStreamPartition, String]): Map[String, SystemStreamMetadata] = {
+    val allMetadata = (oldestOffsets.keySet ++ newestOffsets.keySet ++ upcomingOffsets.keySet)
+      .groupBy(_.getStream)
+      .map {
+        case (streamName, systemStreamPartitions) =>
+          val streamPartitionMetadata = systemStreamPartitions
+            .map(systemStreamPartition => {
+              val partitionMetadata = new SystemStreamPartitionMetadata(
+                // If the topic/partition is empty then oldest and newest will
+                // be stripped of their offsets, so default to null.
+                oldestOffsets.getOrElse(systemStreamPartition, null),
+                newestOffsets.getOrElse(systemStreamPartition, null),
+                upcomingOffsets(systemStreamPartition))
+              (systemStreamPartition.getPartition, partitionMetadata)
+            })
+            .toMap
+          val streamMetadata = new SystemStreamMetadata(streamName, streamPartitionMetadata.asJava)
+          (streamName, streamMetadata)
+      }
+      .toMap
+
+    // This is typically printed downstream and it can be spammy, so debug level here.
+    debug("Got metadata: %s" format allMetadata)
+
+    allMetadata
+  }
+}
+
+/**
+ * A helper class that is used to construct the changelog stream specific information
+ *
+ * @param replicationFactor The number of replicas for the changelog stream
+ * @param kafkaProps The kafka specific properties that need to be used for changelog stream creation
+ */
+case class ChangelogInfo(var replicationFactor: Int, var kafkaProps: Properties)
+
+/**
+ * A Kafka-based implementation of SystemAdmin.
+ */
+class KafkaSystemAdmin(
+  /**
+   * The system name to use when creating SystemStreamPartitions to return in
+   * the getSystemStreamMetadata responser.
+   */
+  systemName: String,
+
+  // TODO whenever Kafka decides to make the Set[Broker] class public, let's switch to Set[Broker] here.
+  /**
+   * List of brokers that are part of the Kafka system that we wish to
+   * interact with. The format is host1:port1,host2:port2.
+   */
+  brokerListString: String,
+
+  /**
+   * A method that returns a ZkUtils for the Kafka system. This is invoked
+   * when the system admin is attempting to create a coordinator stream.
+   */
+  connectZk: () => ZkUtils,
+
+  /**
+   * Custom properties to use when the system admin tries to create a new
+   * coordinator stream.
+   */
+  coordinatorStreamProperties: Properties = new Properties,
+
+  /**
+   * The replication factor to use when the system admin creates a new
+   * coordinator stream.
+   */
+  coordinatorStreamReplicationFactor: Int = 1,
+
+  /**
+   * The timeout to use for the simple consumer when fetching metadata from
+   * Kafka. Equivalent to Kafka's socket.timeout.ms configuration.
+   */
+  timeout: Int = Int.MaxValue,
+
+  /**
+   * The buffer size to use for the simple consumer when fetching metadata
+   * from Kafka. Equivalent to Kafka's socket.receive.buffer.bytes
+   * configuration.
+   */
+  bufferSize: Int = ConsumerConfig.SocketBufferSize,
+
+  /**
+   * The client ID to use for the simple consumer when fetching metadata from
+   * Kafka. Equivalent to Kafka's client.id configuration.
+   */
+  clientId: String = UUID.randomUUID.toString,
+
+  /**
+   * Replication factor for the Changelog topic in kafka
+   * Kafka properties to be used during the Changelog topic creation
+   */
+  topicMetaInformation: Map[String, ChangelogInfo] = Map[String, ChangelogInfo](),
+
+  /**
+   * Kafka properties to be used during the intermediate topic creation
+   */
+  intermediateStreamProperties: Map[String, Properties] = Map(),
+
+  /**
+   * Whether deleteMessages() API can be used
+   */
+  deleteCommittedMessages: Boolean = false) extends ExtendedSystemAdmin with Logging {
+
+  import KafkaSystemAdmin._
+
+  @volatile var running = false
+  @volatile var adminClient: AdminClient = null
+
+  override def start() = {
+    if (!running) {
+      running = true
+      adminClient = createAdminClient()
+    }
+  }
+
+  override def stop() = {
+    if (running) {
+      running = false
+      adminClient.close()
+      adminClient = null
+    }
+  }
+
+  private def createAdminClient(): AdminClient = {
+    val props = new Properties()
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerListString)
+    AdminClient.create(props)
+  }
+
+  override def getSystemStreamPartitionCounts(streams: util.Set[String], cacheTTL: Long): util.Map[String, SystemStreamMetadata] = {
+    getSystemStreamPartitionCounts(streams, new ExponentialSleepStrategy(initialDelayMs = 500), cacheTTL)
+  }
+
+  def getSystemStreamPartitionCounts(streams: util.Set[String], retryBackoff: ExponentialSleepStrategy, cacheTTL: Long = Long.MaxValue): util.Map[String, SystemStreamMetadata] = {
+    debug("Fetching system stream partition count for: %s" format streams)
+    var metadataTTL = cacheTTL
+    retryBackoff.run(
+      loop => {
+        val metadata = TopicMetadataCache.getTopicMetadata(
+          streams.asScala.toSet,
+          systemName,
+          getTopicMetadata,
+          metadataTTL)
+        val result = metadata.map {
+          case (topic, topicMetadata) => {
+            KafkaUtil.maybeThrowException(topicMetadata.error.exception())
+            val partitionsMap = topicMetadata.partitionsMetadata.map {
+              pm =>
+                new Partition(pm.partitionId) -> new SystemStreamPartitionMetadata("", "", "")
+            }.toMap[Partition, SystemStreamPartitionMetadata]
+            (topic -> new SystemStreamMetadata(topic, partitionsMap.asJava))
+          }
+        }
+        loop.done
+        result.asJava
+      },
+
+      (exception, loop) => {
+        warn("Unable to fetch last offsets for streams %s due to %s. Retrying." format (streams, exception))
+        debug("Exception detail:", exception)
+        if (metadataTTL == Long.MaxValue) {
+          metadataTTL = 5000 // Revert to the default cache expiration
+        }
+      }
+    ).getOrElse(throw new SamzaException("Failed to get system stream metadata"))
+  }
+
+  /**
+   * Returns the offset for the message after the specified offset for each
+   * SystemStreamPartition that was passed in.
+   */
+
+  override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = {
+    // This is safe to do with Kafka, even if a topic is key-deduped. If the
+    // offset doesn't exist on a compacted topic, Kafka will return the first
+    // message AFTER the offset that was specified in the fetch request.
+    offsets.asScala.mapValues(offset => (offset.toLong + 1).toString).asJava
+  }
+
+  override def getSystemStreamMetadata(streams: java.util.Set[String]) =
+    getSystemStreamMetadata(streams, new ExponentialSleepStrategy(initialDelayMs = 500)).asJava
+
+  /**
+   * Given a set of stream names (topics), fetch metadata from Kafka for each
+   * stream, and return a map from stream name to SystemStreamMetadata for
+   * each stream. This method will return null for oldest and newest offsets
+   * if a given SystemStreamPartition is empty. This method will block and
+   * retry indefinitely until it gets a successful response from Kafka.
+   */
+  def getSystemStreamMetadata(streams: java.util.Set[String], retryBackoff: ExponentialSleepStrategy) = {
+    debug("Fetching system stream metadata for: %s" format streams)
+    var metadataTTL = Long.MaxValue // Trust the cache until we get an exception
+    retryBackoff.run(
+      loop => {
+        val metadata = TopicMetadataCache.getTopicMetadata(
+          streams.asScala.toSet,
+          systemName,
+          getTopicMetadata,
+          metadataTTL)
+
+        debug("Got metadata for streams: %s" format metadata)
+
+        val brokersToTopicPartitions = getTopicsAndPartitionsByBroker(metadata)
+        var oldestOffsets = Map[SystemStreamPartition, String]()
+        var newestOffsets = Map[SystemStreamPartition, String]()
+        var upcomingOffsets = Map[SystemStreamPartition, String]()
+
+        // Get oldest, newest, and upcoming offsets for each topic and partition.
+        for ((broker, topicsAndPartitions) <- brokersToTopicPartitions) {
+          debug("Fetching offsets for %s:%s: %s" format (broker.host, broker.port, topicsAndPartitions))
+
+          val consumer = new SimpleConsumer(broker.host, broker.port, timeout, bufferSize, clientId)
+          try {
+            upcomingOffsets ++= getOffsets(consumer, topicsAndPartitions, OffsetRequest.LatestTime)
+            oldestOffsets ++= getOffsets(consumer, topicsAndPartitions, OffsetRequest.EarliestTime)
+
+            // Kafka's "latest" offset is always last message in stream's offset +
+            // 1, so get newest message in stream by subtracting one. this is safe
+            // even for key-deduplicated streams, since the last message will
+            // never be deduplicated.
+            newestOffsets = upcomingOffsets.mapValues(offset => (offset.toLong - 1).toString)
+            // Keep only oldest/newest offsets where there is a message. Should
+            // return null offsets for empty streams.
+            upcomingOffsets.foreach {
+              case (topicAndPartition, offset) =>
+                if (offset.toLong <= 0) {
+                  debug("Stripping newest offsets for %s because the topic appears empty." format topicAndPartition)
+                  newestOffsets -= topicAndPartition
+                  debug("Setting oldest offset to 0 to consume from beginning")
+                  oldestOffsets += (topicAndPartition -> "0")
+                }
+            }
+          } finally {
+            consumer.close
+          }
+        }
+
+        val result = assembleMetadata(oldestOffsets, newestOffsets, upcomingOffsets)
+        loop.done
+        result
+      },
+
+      (exception, loop) => {
+        warn("Unable to fetch last offsets for streams %s due to %s. Retrying." format (streams, exception))
+        debug("Exception detail:", exception)
+        metadataTTL = 5000 // Revert to the default cache expiration
+      }).getOrElse(throw new SamzaException("Failed to get system stream metadata"))
+  }
+
+  /**
+   * Returns the newest offset for the specified SSP.
+   * This method is fast and targeted. It minimizes the number of kafka requests.
+   * It does not retry indefinitely if there is any failure.
+   * It returns null if the topic is empty. To get the offsets for *all*
+   * partitions, it would be more efficient to call getSystemStreamMetadata
+   */
+  override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: Integer) = {
+    debug("Fetching newest offset for: %s" format ssp)
+    var offset: String = null
+    var metadataTTL = Long.MaxValue // Trust the cache until we get an exception
+    var retries = maxRetries
+    new ExponentialSleepStrategy().run(
+      loop => {
+        val metadata = TopicMetadataCache.getTopicMetadata(
+          Set(ssp.getStream),
+          systemName,
+          getTopicMetadata,
+          metadataTTL)
+        debug("Got metadata for streams: %s" format metadata)
+
+        val brokersToTopicPartitions = getTopicsAndPartitionsByBroker(metadata)
+        val topicAndPartition = new TopicAndPartition(ssp.getStream, ssp.getPartition.getPartitionId)
+        val broker = brokersToTopicPartitions.filter((e) => e._2.contains(topicAndPartition)).head._1
+
+        // Get oldest, newest, and upcoming offsets for each topic and partition.
+        debug("Fetching offset for %s:%s: %s" format (broker.host, broker.port, topicAndPartition))
+        val consumer = new SimpleConsumer(broker.host, broker.port, timeout, bufferSize, clientId)
+        try {
+          offset = getOffsets(consumer, Set(topicAndPartition), OffsetRequest.LatestTime).head._2
+
+          // Kafka's "latest" offset is always last message in stream's offset +
+          // 1, so get newest message in stream by subtracting one. this is safe
+          // even for key-deduplicated streams, since the last message will
+          // never be deduplicated.
+          if (offset.toLong <= 0) {
+            debug("Stripping newest offsets for %s because the topic appears empty." format topicAndPartition)
+            offset = null
+          } else {
+            offset = (offset.toLong - 1).toString
+          }
+        } finally {
+          consumer.close
+        }
+
+        debug("Got offset %s for %s." format(offset, ssp))
+        loop.done
+      },
+
+      (exception, loop) => {
+        if (retries > 0) {
+          warn("Exception while trying to get offset for %s: %s. Retrying." format(ssp, exception))
+          metadataTTL = 0L // Force metadata refresh
+          retries -= 1
+        } else {
+          warn("Exception while trying to get offset for %s" format(ssp), exception)
+          loop.done
+          throw exception
+        }
+      })
+
+     offset
+  }
+
+  /**
+   * Helper method to use topic metadata cache when fetching metadata, so we
+   * don't hammer Kafka more than we need to.
+   */
+  def getTopicMetadata(topics: Set[String]) = {
+    new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
+      .getTopicInfo(topics)
+  }
+
+  /**
+   * Break topic metadata topic/partitions into per-broker map so that we can
+   * execute only one offset request per broker.
+   */
+  private def getTopicsAndPartitionsByBroker(metadata: Map[String, TopicMetadata]) = {
+    val brokersToTopicPartitions = metadata
+      .values
+      // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)]
+      .flatMap(topicMetadata => {
+        KafkaUtil.maybeThrowException(topicMetadata.error.exception())
+        topicMetadata
+          .partitionsMetadata
+          // Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)]
+          .map(partitionMetadata => {
+            val topicAndPartition = new TopicAndPartition(topicMetadata.topic, partitionMetadata.partitionId)
+            val leader = partitionMetadata
+              .leader
+              .getOrElse(throw new SamzaException("Need leaders for all partitions when fetching offsets. No leader available for TopicAndPartition: %s" format topicAndPartition))
+            (leader, topicAndPartition)
+          })
+      })
+
+      // Convert to a Map[Broker, Seq[(Broker, TopicAndPartition)]]
+      .groupBy(_._1)
+      // Convert to a Map[Broker, Set[TopicAndPartition]]
+      .mapValues(_.map(_._2).toSet)
+
+    debug("Got topic partition data for brokers: %s" format brokersToTopicPartitions)
+
+    brokersToTopicPartitions
+  }
+
+  /**
+   * Use a SimpleConsumer to fetch either the earliest or latest offset from
+   * Kafka for each topic/partition in the topicsAndPartitions set. It is
+   * assumed that all topics/partitions supplied reside on the broker that the
+   * consumer is connected to.
+   */
+  private def getOffsets(consumer: SimpleConsumer, topicsAndPartitions: Set[TopicAndPartition], earliestOrLatest: Long) = {
+    debug("Getting offsets for %s using earliest/latest value of %s." format (topicsAndPartitions, earliestOrLatest))
+
+    var offsets = Map[SystemStreamPartition, String]()
+    val partitionOffsetInfo = topicsAndPartitions
+      .map(topicAndPartition => (topicAndPartition, PartitionOffsetRequestInfo(earliestOrLatest, 1)))
+      .toMap
+    val brokerOffsets = consumer
+      .getOffsetsBefore(new OffsetRequest(partitionOffsetInfo))
+      .partitionErrorAndOffsets
+      .mapValues(partitionErrorAndOffset => {
+        KafkaUtil.maybeThrowException(partitionErrorAndOffset.error.exception())
+        partitionErrorAndOffset.offsets.head
+      })
+
+    for ((topicAndPartition, offset) <- brokerOffsets) {
+      offsets += new SystemStreamPartition(systemName, topicAndPartition.topic, new Partition(topicAndPartition.partition)) -> offset.toString
+    }
+
+    debug("Got offsets for %s using earliest/latest value of %s: %s" format (topicsAndPartitions, earliestOrLatest, offsets))
+
+    offsets
+  }
+
+  /**
+   * @inheritdoc
+   */
+  override def createStream(spec: StreamSpec): Boolean = {
+    info("Create topic %s in system %s" format (spec.getPhysicalName, systemName))
+    val kSpec = toKafkaSpec(spec)
+    var streamCreated = false
+
+    new ExponentialSleepStrategy(initialDelayMs = 500).run(
+      loop => {
+        val zkClient = connectZk()
+        try {
+          AdminUtils.createTopic(
+            zkClient,
+            kSpec.getPhysicalName,
+            kSpec.getPartitionCount,
+            kSpec.getReplicationFactor,
+            kSpec.getProperties)
+        } finally {
+          zkClient.close
+        }
+
+        streamCreated = true
+        loop.done
+      },
+
+      (exception, loop) => {
+        exception match {
+          case e: TopicExistsException =>
+            streamCreated = false
+            loop.done
+          case e: Exception =>
+            warn("Failed to create topic %s: %s. Retrying." format (spec.getPhysicalName, e))
+            debug("Exception detail:", e)
+        }
+      })
+
+    streamCreated
+  }
+
+  /**
+   * Converts a StreamSpec into a KafakStreamSpec. Special handling for coordinator and changelog stream.
+   * @param spec a StreamSpec object
+   * @return KafkaStreamSpec object
+   */
+  def toKafkaSpec(spec: StreamSpec): KafkaStreamSpec = {
+    if (spec.isChangeLogStream) {
+      val topicName = spec.getPhysicalName
+      val topicMeta = topicMetaInformation.getOrElse(topicName, throw new StreamValidationException("Unable to find topic information for topic " + topicName))
+      new KafkaStreamSpec(spec.getId, topicName, systemName, spec.getPartitionCount, topicMeta.replicationFactor,
+        topicMeta.kafkaProps)
+    } else if (spec.isCoordinatorStream){
+      new KafkaStreamSpec(spec.getId, spec.getPhysicalName, systemName, 1, coordinatorStreamReplicationFactor,
+        coordinatorStreamProperties)
+    } else if (intermediateStreamProperties.contains(spec.getId)) {
+      KafkaStreamSpec.fromSpec(spec).copyWithProperties(intermediateStreamProperties(spec.getId))
+    } else {
+      KafkaStreamSpec.fromSpec(spec)
+    }
+  }
+
+  /**
+    * @inheritdoc
+    *
+    * Validates a stream in Kafka. Should not be called before createStream(),
+    * since ClientUtils.fetchTopicMetadata(), used by different Kafka clients,
+    * is not read-only and will auto-create a new topic.
+    */
+  override def validateStream(spec: StreamSpec): Unit = {
+    val topicName = spec.getPhysicalName
+    info("Validating topic %s." format topicName)
+
+    val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
+    var metadataTTL = Long.MaxValue // Trust the cache until we get an exception
+    retryBackoff.run(
+      loop => {
+        val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
+        val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo, metadataTTL)
+        val topicMetadata = topicMetadataMap(topicName)
+        KafkaUtil.maybeThrowException(topicMetadata.error.exception())
+
+        val partitionCount = topicMetadata.partitionsMetadata.length
+        if (partitionCount != spec.getPartitionCount) {
+          throw new StreamValidationException("Topic validation failed for topic %s because partition count %s did not match expected partition count of %d" format (topicName, topicMetadata.partitionsMetadata.length, spec.getPartitionCount))
+        }
+
+        info("Successfully validated topic %s." format topicName)
+        loop.done
+      },
+
+      (exception, loop) => {
+        exception match {
+          case e: StreamValidationException => throw e
+          case e: Exception =>
+            warn("While trying to validate topic %s: %s. Retrying." format (topicName, e))
+            debug("Exception detail:", e)
+            metadataTTL = 5000L // Revert to the default value
+        }
+      })
+  }
+
+  /**
+   * @inheritdoc
+   *
+   * Delete a stream in Kafka. Deleting topics works only when the broker is configured with "delete.topic.enable=true".
+   * Otherwise it's a no-op.
+   */
+  override def clearStream(spec: StreamSpec): Boolean = {
+    info("Delete topic %s in system %s" format (spec.getPhysicalName, systemName))
+    val kSpec = KafkaStreamSpec.fromSpec(spec)
+    var retries = CLEAR_STREAM_RETRIES
+    new ExponentialSleepStrategy().run(
+      loop => {
+        val zkClient = connectZk()
+        try {
+          AdminUtils.deleteTopic(
+            zkClient,
+            kSpec.getPhysicalName)
+        } finally {
+          zkClient.close
+        }
+
+        loop.done
+      },
+
+      (exception, loop) => {
+        if (retries > 0) {
+          warn("Exception while trying to delete topic %s: %s. Retrying." format (spec.getPhysicalName, exception))
+          retries -= 1
+        } else {
+          warn("Fail to delete topic %s: %s" format (spec.getPhysicalName, exception))
+          loop.done
+          throw exception
+        }
+      })
+
+    val topicMetadata = getTopicMetadata(Set(kSpec.getPhysicalName)).get(kSpec.getPhysicalName).get
+    topicMetadata.partitionsMetadata.isEmpty
+  }
+
+  /**
+    * @inheritdoc
+    *
+    * Delete records up to (and including) the provided ssp offsets for all system stream partitions specified in the map
+    * This only works with Kafka cluster 0.11 or later. Otherwise it's a no-op.
+    */
+  override def deleteMessages(offsets: util.Map[SystemStreamPartition, String]) {
+    if (!running) {
+      throw new SamzaException(s"KafkaSystemAdmin has not started yet for system $systemName")
+    }
+    if (deleteCommittedMessages) {
+      val nextOffsets = offsets.asScala.toSeq.map { case (systemStreamPartition, offset) =>
+        (new TopicPartition(systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId), offset.toLong + 1)
+      }.toMap
+      adminClient.deleteRecordsBefore(nextOffsets)
+      deleteMessagesCalled = true
+    }
+  }
+
+  /**
+   * Compare the two offsets. Returns x where x < 0 if offset1 < offset2;
+   * x == 0 if offset1 == offset2; x > 0 if offset1 > offset2.
+   *
+   * Currently it's used in the context of the broadcast streams to detect
+   * the mismatch between two streams when consuming the broadcast streams.
+   */
+  override def offsetComparator(offset1: String, offset2: String): Integer = {
+    offset1.toLong compare offset2.toLong
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/44d0685d/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemConsumer.scala
new file mode 100644
index 0000000..b7c4368
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemConsumer.scala
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka_deprecated
+
+import kafka.common.TopicAndPartition
+import org.apache.samza.util.Logging
+import kafka.message.Message
+import kafka.message.MessageAndOffset
+import org.apache.samza.Partition
+import org.apache.kafka.common.utils.Utils
+import org.apache.samza.util.Clock
+import kafka.serializer.DefaultDecoder
+import kafka.serializer.Decoder
+import org.apache.samza.util.BlockingEnvelopeMap
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.IncomingMessageEnvelope
+import kafka.consumer.ConsumerConfig
+import org.apache.samza.util.TopicMetadataStore
+import kafka.api.PartitionMetadata
+import kafka.api.TopicMetadata
+import org.apache.samza.util.ExponentialSleepStrategy
+import java.util.concurrent.ConcurrentHashMap
+import scala.collection.JavaConverters._
+import org.apache.samza.system.SystemAdmin
+
+object KafkaSystemConsumer {
+
+  // Approximate additional shallow heap overhead per message in addition to the raw bytes
+  // received from Kafka  4 + 64 + 4 + 4 + 4 = 80 bytes overhead.
+  // As this overhead is a moving target, and not very large
+  // compared to the message size its being ignore in the computation for now.
+  val MESSAGE_SIZE_OVERHEAD =  4 + 64 + 4 + 4 + 4;
+
+  def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
+    val topic = systemStreamPartition.getStream
+    val partitionId = systemStreamPartition.getPartition.getPartitionId
+    TopicAndPartition(topic, partitionId)
+  }
+}
+
+/**
+ *  Maintain a cache of BrokerProxies, returning the appropriate one for the
+ *  requested topic and partition.
+ */
+private[kafka_deprecated] class KafkaSystemConsumer(
+  systemName: String,
+  systemAdmin: SystemAdmin,
+  metrics: KafkaSystemConsumerMetrics,
+  metadataStore: TopicMetadataStore,
+  clientId: String,
+  timeout: Int = ConsumerConfig.ConsumerTimeoutMs,
+  bufferSize: Int = ConsumerConfig.SocketBufferSize,
+  fetchSize: StreamFetchSizes = new StreamFetchSizes,
+  consumerMinSize: Int = ConsumerConfig.MinFetchBytes,
+  consumerMaxWait: Int = ConsumerConfig.MaxFetchWaitMs,
+
+  /**
+   * Defines a low water mark for how many messages we buffer before we start
+   * executing fetch requests against brokers to get more messages. This value
+   * is divided equally among all registered SystemStreamPartitions. For
+   * example, if fetchThreshold is set to 50000, and there are 50
+   * SystemStreamPartitions registered, then the per-partition threshold is
+   * 1000. As soon as a SystemStreamPartition's buffered message count drops
+   * below 1000, a fetch request will be executed to get more data for it.
+   *
+   * Increasing this parameter will decrease the latency between when a queue
+   * is drained of messages and when new messages are enqueued, but also leads
+   * to an increase in memory usage since more messages will be held in memory.
+   */
+  fetchThreshold: Int = 50000,
+  /**
+   * Defines a low water mark for how many bytes we buffer before we start
+   * executing fetch requests against brokers to get more messages. This
+   * value is divided by 2 because the messages are buffered twice, once in
+   * KafkaConsumer and then in SystemConsumers. This value
+   * is divided equally among all registered SystemStreamPartitions.
+   * However this is a soft limit per partition, as the
+   * bytes are cached at the message boundaries, and the actual usage can be
+   * 1000 bytes + size of max message in the partition for a given stream.
+   * The bytes if the size of the bytebuffer in Message. Hence, the
+   * Object overhead is not taken into consideration. In this codebase
+   * it seems to be quite small. Hence, even for 500000 messages this is around 4MB x 2 = 8MB,
+   * which is not considerable.
+   *
+   * For example,
+   * if fetchThresholdBytes is set to 100000 bytes, and there are 50
+   * SystemStreamPartitions registered, then the per-partition threshold is
+   * (100000 / 2) / 50 = 1000 bytes.
+   * As this is a soft limit, the actual usage can be 1000 bytes + size of max message.
+   * As soon as a SystemStreamPartition's buffered messages bytes drops
+   * below 1000, a fetch request will be executed to get more data for it.
+   *
+   * Increasing this parameter will decrease the latency between when a queue
+   * is drained of messages and when new messages are enqueued, but also leads
+   * to an increase in memory usage since more messages will be held in memory.
+   *
+   * The default value is -1, which means this is not used. When the value
+   * is > 0, then the fetchThreshold which is count based is ignored.
+   */
+  fetchThresholdBytes: Long = -1,
+  /**
+   * if(fetchThresholdBytes > 0) true else false
+   */
+  fetchLimitByBytesEnabled: Boolean = false,
+  offsetGetter: GetOffset = new GetOffset("fail"),
+  deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
+  keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
+  retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
+  clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(
+    metrics.registry,
+    new Clock {
+      def currentTimeMillis = clock()
+    },
+    classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging {
+
+  type HostPort = (String, Int)
+  val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
+  val topicPartitionsAndOffsets: scala.collection.concurrent.Map[TopicAndPartition, String] = new ConcurrentHashMap[TopicAndPartition, String]().asScala
+  var perPartitionFetchThreshold = fetchThreshold
+  var perPartitionFetchThresholdBytes = 0L
+
+  def start() {
+    if (topicPartitionsAndOffsets.size > 0) {
+      perPartitionFetchThreshold = fetchThreshold / topicPartitionsAndOffsets.size
+      // messages get double buffered, hence divide by 2
+      if(fetchLimitByBytesEnabled) {
+        perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / topicPartitionsAndOffsets.size
+      }
+    }
+
+    systemAdmin.start()
+    refreshBrokers
+  }
+
+  override def register(systemStreamPartition: SystemStreamPartition, offset: String) {
+    super.register(systemStreamPartition, offset)
+
+    val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)
+    val existingOffset = topicPartitionsAndOffsets.getOrElseUpdate(topicAndPartition, offset)
+    // register the older offset in the consumer
+    if (systemAdmin.offsetComparator(existingOffset, offset) >= 0) {
+      topicPartitionsAndOffsets.replace(topicAndPartition, offset)
+    }
+
+    metrics.registerTopicAndPartition(KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition))
+  }
+
+  def stop() {
+    systemAdmin.stop()
+    brokerProxies.values.foreach(_.stop)
+  }
+
+  protected def createBrokerProxy(host: String, port: Int): BrokerProxy = {
+    info("Creating new broker proxy for host: %s and port: %s" format(host, port))
+    new BrokerProxy(host, port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter)
+  }
+
+  protected def getPartitionMetadata(topicMetadata: TopicMetadata, partition: Int): Option[PartitionMetadata] = {
+    topicMetadata.partitionsMetadata.find(_.partitionId == partition)
+  }
+
+  protected def getLeaderHostPort(partitionMetadata: Option[PartitionMetadata]): Option[(String, Int)] = {
+    // Whatever we do, we can't say Broker, even though we're
+    // manipulating it here. Broker is a private type and Scala doesn't seem
+    // to care about that as long as you don't explicitly declare its type.
+    val brokerOption = partitionMetadata.flatMap(_.leader)
+
+    brokerOption match {
+      case Some(broker) => Some(broker.host, broker.port)
+      case _ => None
+    }
+  }
+
+  def refreshBrokers {
+    var tpToRefresh = topicPartitionsAndOffsets.keySet.toList
+    info("Refreshing brokers for: %s" format topicPartitionsAndOffsets)
+    retryBackoff.run(
+      loop => {
+        val topics = tpToRefresh.map(_.topic).toSet
+        val topicMetadata = TopicMetadataCache.getTopicMetadata(topics, systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
+
+        // addTopicPartition one at a time, leaving the to-be-done list intact in case of exceptions.
+        // This avoids trying to re-add the same topic partition repeatedly
+        def refresh() = {
+          val head = tpToRefresh.head
+          // refreshBrokers can be called from abdicate and refreshDropped,
+          // both of which are triggered from BrokerProxy threads. To prevent
+          // accidentally creating multiple objects for the same broker, or
+          // accidentally not updating the topicPartitionsAndOffsets variable,
+          // we need to lock.
+          this.synchronized {
+            // Check if we still need this TopicAndPartition inside the
+            // critical section. If we don't, then notAValidEvent it.
+            topicPartitionsAndOffsets.get(head) match {
+              case Some(nextOffset) =>
+                val partitionMetadata = getPartitionMetadata(topicMetadata(head.topic), head.partition)
+                getLeaderHostPort(partitionMetadata) match {
+                  case Some((host, port)) =>
+                    debug("Got partition metadata for %s: %s" format(head, partitionMetadata.get))
+                    val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port))
+                    brokerProxy.addTopicPartition(head, Option(nextOffset))
+                    brokerProxy.start
+                    debug("Claimed topic-partition (%s) for (%s)".format(head, brokerProxy))
+                    topicPartitionsAndOffsets -= head
+                  case None => info("No metadata available for: %s. Will try to refresh and add to a consumer thread later." format head)
+                }
+              case _ => debug("Ignoring refresh for %s because we already added it from another thread." format head)
+            }
+          }
+          tpToRefresh.tail
+        }
+
+        while (!tpToRefresh.isEmpty) {
+          tpToRefresh = refresh()
+        }
+
+        loop.done
+      },
+
+      (exception, loop) => {
+        warn("While refreshing brokers for %s: %s. Retrying." format (tpToRefresh.head, exception))
+        debug("Exception detail:", exception)
+      })
+  }
+
+  val sink = new MessageSink {
+    var lastDroppedRefresh = clock()
+
+    def refreshDropped() {
+      if (topicPartitionsAndOffsets.size > 0 && clock() - lastDroppedRefresh > 10000) {
+        refreshBrokers
+        lastDroppedRefresh = clock()
+      }
+    }
+
+    def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
+      setIsAtHead(toSystemStreamPartition(tp), isAtHighWatermark)
+    }
+
+    def needsMoreMessages(tp: TopicAndPartition) = {
+      if(fetchLimitByBytesEnabled) {
+        getMessagesSizeInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThresholdBytes
+      } else {
+        getNumMessagesInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThreshold
+      }
+    }
+
+    def getMessageSize(message: Message): Integer = {
+      message.size + KafkaSystemConsumer.MESSAGE_SIZE_OVERHEAD
+    }
+
+    def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = {
+      trace("Incoming message %s: %s." format (tp, msg))
+
+      val systemStreamPartition = toSystemStreamPartition(tp)
+      val isAtHead = highWatermark == msg.offset
+      val offset = msg.offset.toString
+      val key = if (msg.message.key != null) {
+        keyDeserializer.fromBytes(Utils.readBytes(msg.message.key))
+      } else {
+        null
+      }
+      val message = if (!msg.message.isNull) {
+        deserializer.fromBytes(Utils.readBytes(msg.message.payload))
+      } else {
+        null
+      }
+
+      if(fetchLimitByBytesEnabled ) {
+        val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, getMessageSize(msg.message))
+        ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L)
+        put(systemStreamPartition, ime)
+      } else {
+        val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message)
+        ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L)
+        put(systemStreamPartition, ime)
+      }
+
+      setIsAtHead(systemStreamPartition, isAtHead)
+    }
+
+    def abdicate(tp: TopicAndPartition, nextOffset: Long) {
+      info("Abdicating for %s" format (tp))
+      topicPartitionsAndOffsets += tp -> nextOffset.toString
+      refreshBrokers
+    }
+
+    private def toSystemStreamPartition(tp: TopicAndPartition) = {
+      new SystemStreamPartition(systemName, tp.topic, new Partition(tp.partition))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/44d0685d/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemConsumerMetrics.scala
new file mode 100644
index 0000000..08b1a0c
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemConsumerMetrics.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka_deprecated
+
+import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.MetricsRegistry
+import java.util.concurrent.ConcurrentHashMap
+import kafka.common.TopicAndPartition
+import org.apache.samza.metrics.Counter
+import org.apache.samza.metrics.Gauge
+
+class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+  val offsets = new ConcurrentHashMap[TopicAndPartition, Counter]
+  val bytesRead = new ConcurrentHashMap[TopicAndPartition, Counter]
+  val reads = new ConcurrentHashMap[TopicAndPartition, Counter]
+  val lag = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
+  val highWatermark = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
+
+  /*
+   * (String, Int) = (host, port) of BrokerProxy.
+   */
+
+  val reconnects = new ConcurrentHashMap[(String, Int), Counter]
+  val brokerBytesRead = new ConcurrentHashMap[(String, Int), Counter]
+  val brokerReads = new ConcurrentHashMap[(String, Int), Counter]
+  val brokerSkippedFetchRequests = new ConcurrentHashMap[(String, Int), Counter]
+  val topicPartitions = new ConcurrentHashMap[(String, Int), Gauge[Int]]
+
+  def registerTopicAndPartition(tp: TopicAndPartition) = {
+    if (!offsets.contains(tp)) {
+      offsets.put(tp, newCounter("%s-%s-offset-change" format (tp.topic, tp.partition)))
+      bytesRead.put(tp, newCounter("%s-%s-bytes-read" format (tp.topic, tp.partition)))
+      reads.put(tp, newCounter("%s-%s-messages-read" format (tp.topic, tp.partition)))
+      highWatermark.put(tp, newGauge("%s-%s-high-watermark" format (tp.topic, tp.partition), -1L))
+      lag.put(tp, newGauge("%s-%s-messages-behind-high-watermark" format (tp.topic, tp.partition), 0L))
+    }
+  }
+
+  def registerBrokerProxy(host: String, port: Int) {
+    reconnects.put((host, port), newCounter("%s-%s-reconnects" format (host, port)))
+    brokerBytesRead.put((host, port), newCounter("%s-%s-bytes-read" format (host, port)))
+    brokerReads.put((host, port), newCounter("%s-%s-messages-read" format (host, port)))
+    brokerSkippedFetchRequests.put((host, port), newCounter("%s-%s-skipped-fetch-requests" format (host, port)))
+    topicPartitions.put((host, port), newGauge("%s-%s-topic-partitions" format (host, port), 0))
+  }
+
+  // java friendlier interfaces
+  // Gauges
+  def setTopicPartitionValue(host: String, port: Int, value: Int) {
+    topicPartitions.get((host,port)).set(value)
+  }
+  def setLagValue(topicAndPartition: TopicAndPartition, value: Long) {
+    lag.get((topicAndPartition)).set(value);
+  }
+  def setHighWatermarkValue(topicAndPartition: TopicAndPartition, value: Long) {
+    highWatermark.get((topicAndPartition)).set(value);
+  }
+
+  // Counters
+  def incBrokerReads(host: String, port: Int) {
+    brokerReads.get((host,port)).inc
+  }
+  def incReads(topicAndPartition: TopicAndPartition) {
+    reads.get(topicAndPartition).inc;
+  }
+  def incBytesReads(topicAndPartition: TopicAndPartition, inc: Long) {
+    bytesRead.get(topicAndPartition).inc(inc);
+  }
+  def incBrokerBytesReads(host: String, port: Int, incBytes: Long) {
+    brokerBytesRead.get((host,port)).inc(incBytes)
+  }
+  def incBrokerSkippedFetchRequests(host: String, port: Int) {
+    brokerSkippedFetchRequests.get((host,port)).inc()
+  }
+  def setOffsets(topicAndPartition: TopicAndPartition, offset: Long) {
+    offsets.get(topicAndPartition).set(offset)
+  }
+  def incReconnects(host: String, port: Int) {
+    reconnects.get((host,port)).inc()
+  }
+  override def getPrefix = systemName + "-"
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/44d0685d/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
new file mode 100644
index 0000000..eecdbe4
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka_deprecated
+import java.util.Properties
+
+import kafka.utils.ZkUtils
+import org.apache.samza.SamzaException
+import org.apache.samza.config.ApplicationConfig.ApplicationMode
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging}
+import org.apache.samza.config._
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.config.KafkaConfig.Config2Kafka
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.config.StorageConfig._
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemAdmin
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.system.SystemConsumer
+
+object KafkaSystemFactory extends Logging {
+  def getInjectedProducerProperties(systemName: String, config: Config) = if (config.isChangelogSystem(systemName)) {
+    warn("System name '%s' is being used as a changelog. Disabling compression since Kafka does not support compression for log compacted topics." format systemName)
+    Map[String, String]("compression.type" -> "none")
+  } else {
+    Map[String, String]()
+  }
+}
+
+class KafkaSystemFactory extends SystemFactory with Logging {
+  def getConsumer(systemName: String, config: Config, registry: MetricsRegistry): SystemConsumer = {
+    val clientId = getClientId("samza-consumer", config)
+    val metrics = new KafkaSystemConsumerMetrics(systemName, registry)
+
+    // Kind of goofy to need a producer config for consumers, but we need metadata.
+    val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
+    val bootstrapServers = producerConfig.bootsrapServers
+    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+
+    val timeout = consumerConfig.socketTimeoutMs
+    val bufferSize = consumerConfig.socketReceiveBufferBytes
+    val fetchSize = new StreamFetchSizes(consumerConfig.fetchMessageMaxBytes, config.getFetchMessageMaxBytesTopics(systemName))
+    val consumerMinSize = consumerConfig.fetchMinBytes
+    val consumerMaxWait = consumerConfig.fetchWaitMaxMs
+    val autoOffsetResetDefault = consumerConfig.autoOffsetReset
+    val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
+    val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt
+    val fetchThresholdBytes = config.getConsumerFetchThresholdBytes(systemName).getOrElse("-1").toLong
+    val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics)
+    val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, timeout)
+
+    new KafkaSystemConsumer(
+      systemName = systemName,
+      systemAdmin = getAdmin(systemName, config),
+      metrics = metrics,
+      metadataStore = metadataStore,
+      clientId = clientId,
+      timeout = timeout,
+      bufferSize = bufferSize,
+      fetchSize = fetchSize,
+      consumerMinSize = consumerMinSize,
+      consumerMaxWait = consumerMaxWait,
+      fetchThreshold = fetchThreshold,
+      fetchThresholdBytes = fetchThresholdBytes,
+      fetchLimitByBytesEnabled = config.isConsumerFetchThresholdBytesEnabled(systemName),
+      offsetGetter = offsetGetter)
+  }
+
+  def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = {
+    val clientId = getClientId("samza-producer", config)
+    val injectedProps = KafkaSystemFactory.getInjectedProducerProperties(systemName, config)
+    val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId, injectedProps)
+    val getProducer = () => { new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) }
+    val metrics = new KafkaSystemProducerMetrics(systemName, registry)
+
+    // Unlike consumer, no need to use encoders here, since they come for free
+    // inside the producer configs. Kafka's producer will handle all of this
+    // for us.
+
+    new KafkaSystemProducer(
+      systemName,
+      new ExponentialSleepStrategy(initialDelayMs = producerConfig.reconnectIntervalMs),
+      getProducer,
+      metrics,
+      dropProducerExceptions = config.getDropProducerError)
+  }
+
+  def getAdmin(systemName: String, config: Config): SystemAdmin = {
+    val clientId = getClientId("samza-admin", config)
+    val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId)
+    val bootstrapServers = producerConfig.bootsrapServers
+    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
+    val timeout = consumerConfig.socketTimeoutMs
+    val bufferSize = consumerConfig.socketReceiveBufferBytes
+    val zkConnect = Option(consumerConfig.zkConnect)
+      .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
+    val connectZk = () => {
+      ZkUtils(zkConnect, 6000, 6000, false)
+    }
+    val coordinatorStreamProperties = getCoordinatorTopicProperties(config)
+    val coordinatorStreamReplicationFactor = config.getCoordinatorReplicationFactor.toInt
+    val storeToChangelog = config.getKafkaChangelogEnabledStores()
+    // Construct the meta information for each topic, if the replication factor is not defined, we use 2 as the number of replicas for the change log stream.
+    val topicMetaInformation = storeToChangelog.map{case (storeName, topicName) =>
+    {
+      val replicationFactor = config.getChangelogStreamReplicationFactor(storeName).toInt
+      val changelogInfo = ChangelogInfo(replicationFactor, config.getChangelogKafkaProperties(storeName))
+      info("Creating topic meta information for topic: %s with replication factor: %s" format (topicName, replicationFactor))
+      (topicName, changelogInfo)
+    }}
+
+    val deleteCommittedMessages = config.deleteCommittedMessages(systemName)
+    val intermediateStreamProperties: Map[String, Properties] = getIntermediateStreamProperties(config)
+    new KafkaSystemAdmin(
+      systemName,
+      bootstrapServers,
+      connectZk,
+      coordinatorStreamProperties,
+      coordinatorStreamReplicationFactor,
+      timeout,
+      bufferSize,
+      clientId,
+      topicMetaInformation,
+      intermediateStreamProperties,
+      deleteCommittedMessages)
+  }
+
+  def getCoordinatorTopicProperties(config: Config) = {
+    val segmentBytes = config.getCoordinatorSegmentBytes
+    (new Properties /: Map(
+      "cleanup.policy" -> "compact",
+      "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props }
+  }
+
+  def getIntermediateStreamProperties(config : Config): Map[String, Properties] = {
+    val appConfig = new ApplicationConfig(config)
+    if (appConfig.getAppMode == ApplicationMode.BATCH) {
+      val streamConfig = new StreamConfig(config)
+      streamConfig.getStreamIds().filter(streamConfig.getIsIntermediateStream(_)).map(streamId => {
+        val properties = new Properties()
+        properties.putAll(streamConfig.getStreamProperties(streamId))
+        properties.putIfAbsent("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
+        (streamId, properties)
+      }).toMap
+    } else {
+      Map()
+    }
+  }
+  def getClientId(id: String, config: Config): String = getClientId(
+    id,
+    new JobConfig(config).getName.getOrElse(throw new ConfigException("Missing job name.")),
+    new JobConfig(config)getJobId)
+
+  def getClientId(id: String, jobName: String, jobId: String): String =
+    "%s-%s-%s" format
+      (id.replaceAll("[^A-Za-z0-9]", "_"),
+        jobName.replaceAll("[^A-Za-z0-9]", "_"),
+        jobId.replaceAll("[^A-Za-z0-9]", "_"))
+
+}