You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/18 23:01:19 UTC

[22/47] samza git commit: added test

added test


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/89f79829
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/89f79829
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/89f79829

Branch: refs/heads/NewKafkaSystemConsumer
Commit: 89f79829107ed21dd88058922b6038835af1cfbd
Parents: 34ae8ba
Author: Boris S <bo...@apache.org>
Authored: Thu Aug 30 10:30:55 2018 -0700
Committer: Boris S <bo...@apache.org>
Committed: Thu Aug 30 10:30:55 2018 -0700

----------------------------------------------------------------------
 .../clients/consumer/KafkaConsumerConfig.java   |  22 ++
 .../apache/samza/system/kafka/BrokerProxy.scala | 332 -------------------
 .../samza/system/kafka/KafkaConsumerProxy.java  |   6 +-
 .../system/kafka/KafkaSystemConsumer.scala      | 309 -----------------
 .../kafka/KafkaSystemConsumerMetrics.scala      |   1 +
 .../system/kafka/NewKafkaSystemConsumer.java    |  19 +-
 .../kafka/TestKafkaCheckpointManager.scala      |   3 +-
 .../samza/system/kafka/TestBrokerProxy.scala    |   3 +
 .../system/kafka/TestKafkaSystemConsumer.scala  | 191 -----------
 .../kafka/TestNewKafkaSystemConsumer.java       | 203 ++++++++++++
 10 files changed, 237 insertions(+), 852 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
index b29a041..88437ee 100644
--- a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
+++ b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java
@@ -23,9 +23,14 @@ package org.apache.kafka.clients.consumer;
 
 import java.util.Map;
 import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.Option;
 
 
@@ -34,6 +39,8 @@ import scala.Option;
  */
 public class KafkaConsumerConfig extends ConsumerConfig {
 
+  public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class);
+
   private static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer";
   private static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer";
   private static final String SAMZA_OFFSET_LARGEST = "largest";
@@ -76,6 +83,9 @@ public class KafkaConsumerConfig extends ConsumerConfig {
     if (! subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
       // get it from the producer config
       String bootstrapServer = config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+      if (StringUtils.isEmpty(bootstrapServer)) {
+        throw new SamzaException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config  for " + systemName);
+      }
       consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
     }
 
@@ -85,6 +95,18 @@ public class KafkaConsumerConfig extends ConsumerConfig {
         RangeAssignor.class.getName());
 
 
+    // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should
+    // default to byte[]
+    if ( !config.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
+      LOG.info("default key serialization for the consumer(for {}) to ByteArrayDeserializer", systemName);
+      consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+    }
+    if ( !config.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
+      LOG.info("default value serialization for the consumer(for {}) to ByteArrayDeserializer", systemName);
+      consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+    }
+
+
     // NOT SURE THIS IS NEEDED TODO
     String maxPollRecords = subConf.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, KAFKA_CONSUMER_MAX_POLL_RECORDS_DEFAULT);;
     consumerProps.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);

http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
deleted file mode 100644
index 423b68a..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.samza.system.kafka
-
-import java.lang.Thread.UncaughtExceptionHandler
-import java.nio.channels.ClosedByInterruptException
-import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
-
-import kafka.api._
-import kafka.common.{ErrorMapping, NotLeaderForPartitionException, TopicAndPartition, UnknownTopicOrPartitionException}
-import kafka.consumer.ConsumerConfig
-import kafka.message.MessageSet
-import org.apache.samza.SamzaException
-import org.apache.samza.util.ExponentialSleepStrategy
-import org.apache.samza.util.KafkaUtil
-import org.apache.samza.util.Logging
-
-import scala.collection.JavaConverters._
-import scala.collection.concurrent
-
-/**
- * A BrokerProxy consolidates Kafka fetches meant for a particular broker and retrieves them all at once, providing
- * a way for consumers to retrieve those messages by topic and partition.
- */
-class BrokerProxy(
-  val host: String,
-  val port: Int,
-  val system: String,
-  val clientID: String,
-  val metrics: KafkaSystemConsumerMetrics,
-  val messageSink: MessageSink,
-  val timeout: Int = ConsumerConfig.SocketTimeout,
-  val bufferSize: Int = ConsumerConfig.SocketBufferSize,
-  val fetchSize: StreamFetchSizes = new StreamFetchSizes,
-  val consumerMinSize:Int = ConsumerConfig.MinFetchBytes,
-  val consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs,
-  offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging {
-
-  /**
-   * How long should the fetcher thread sleep before checking if any TopicPartitions has been added to its purview
-   */
-  val sleepMSWhileNoTopicPartitions = 100
-
-  /** What's the next offset for a particular partition? **/
-  val nextOffsets:concurrent.Map[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]().asScala
-
-  /** Block on the first call to get message if the fetcher has not yet returned its initial results **/
-  // TODO: It should be sufficient to just use the count down latch and await on it for each of the calls, but
-  // VisualVM was showing the consumer thread spending all its time in the await method rather than returning
-  // immediately, even though the process was proceeding normally.  Hence the extra boolean.  Should be investigated.
-  val firstCallBarrier = new CountDownLatch(1)
-  var firstCall = true
-
-  var simpleConsumer = createSimpleConsumer()
-
-  metrics.registerBrokerProxy(host, port)
-
-  def createSimpleConsumer() = {
-    val hostString = "%s:%d" format (host, port)
-    info("Creating new SimpleConsumer for host %s for system %s" format (hostString, system))
-
-    val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, clientID, fetchSize, consumerMinSize, consumerMaxWait)
-    sc
-  }
-
-  def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
-    debug("Adding new topic and partition %s to queue for %s" format (tp, host))
-
-    if (nextOffsets.asJava.containsKey(tp)) {
-      toss("Already consuming TopicPartition %s" format tp)
-    }
-
-    val offset = if (nextOffset.isDefined && offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
-      nextOffset
-        .get
-        .toLong
-    } else {
-      warn("It appears that we received an invalid or empty offset %s for %s. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues." format (nextOffset, tp))
-
-      offsetGetter.getResetOffset(simpleConsumer, tp)
-    }
-
-    debug("Got offset %s for new topic and partition %s." format (offset, tp))
-
-    nextOffsets += tp -> offset
-
-    metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
-  }
-
-  def removeTopicPartition(tp: TopicAndPartition) = {
-    if (nextOffsets.asJava.containsKey(tp)) {
-      val offset = nextOffsets.remove(tp)
-      metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
-      debug("Removed %s" format tp)
-      offset
-    } else {
-      warn("Asked to remove topic and partition %s, but not in map (keys = %s)" format (tp, nextOffsets.keys.mkString(",")))
-      None
-    }
-  }
-
-  val thread = new Thread(new Runnable {
-    def run {
-      var reconnect = false
-
-      try {
-        (new ExponentialSleepStrategy).run(
-          loop => {
-            if (reconnect) {
-              metrics.reconnects.get((host, port)).inc
-              simpleConsumer.close()
-              simpleConsumer = createSimpleConsumer()
-            }
-
-            while (!Thread.currentThread.isInterrupted) {
-              messageSink.refreshDropped
-              if (nextOffsets.size == 0) {
-                debug("No TopicPartitions to fetch. Sleeping.")
-                Thread.sleep(sleepMSWhileNoTopicPartitions)
-              } else {
-                fetchMessages
-
-                // If we got here, fetchMessages didn't throw an exception, i.e. it was successful.
-                // In that case, reset the loop delay, so that the next time an error occurs,
-                // we start with a short retry delay.
-                loop.reset
-              }
-            }
-          },
-
-          (exception, loop) => {
-            warn("Restarting consumer due to %s. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace." format exception)
-            debug("Exception detail:", exception)
-            abdicateAll
-            reconnect = true
-          })
-      } catch {
-        case e: InterruptedException       => info("Got interrupt exception in broker proxy thread.")
-        case e: ClosedByInterruptException => info("Got closed by interrupt exception in broker proxy thread.")
-        case e: OutOfMemoryError           => throw new SamzaException("Got out of memory error in broker proxy thread.")
-        case e: StackOverflowError         => throw new SamzaException("Got stack overflow error in broker proxy thread.")
-      }
-
-      if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt.")
-    }
-  }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID))
-
-  private def fetchMessages(): Unit = {
-    val topicAndPartitionsToFetch = nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList
-
-    if (topicAndPartitionsToFetch.size > 0) {
-      metrics.brokerReads.get((host, port)).inc
-      val response: FetchResponse = simpleConsumer.defaultFetch(topicAndPartitionsToFetch: _*)
-      firstCall = false
-      firstCallBarrier.countDown()
-
-      // Split response into errors and non errors, processing the errors first
-      val (nonErrorResponses, errorResponses) = response.data.toSet.partition(_._2.error.code() == ErrorMapping.NoError)
-
-      handleErrors(errorResponses, response)
-
-      nonErrorResponses.foreach { case (tp, data) => moveMessagesToTheirQueue(tp, data) }
-    } else {
-      refreshLatencyMetrics
-
-      debug("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions))
-
-      metrics.brokerSkippedFetchRequests.get((host, port)).inc
-
-      Thread.sleep(sleepMSWhileNoTopicPartitions)
-    }
-  }
-
-  /**
-   * Releases ownership for a single TopicAndPartition. The
-   * KafkaSystemConsumer will try and find a new broker for the
-   * TopicAndPartition.
-   */
-  def abdicate(tp: TopicAndPartition) = removeTopicPartition(tp) match {
-    // Need to be mindful of a tp that was removed by another thread
-    case Some(offset) => messageSink.abdicate(tp, offset)
-    case None => warn("Tried to abdicate for topic partition not in map. Removed in interim?")
-  }
-
-  /**
-   * Releases all TopicAndPartition ownership for this BrokerProxy thread. The
-   * KafkaSystemConsumer will try and find a new broker for the
-   * TopicAndPartition.
-   */
-  def abdicateAll {
-    info("Abdicating all topic partitions.")
-    val immutableNextOffsetsCopy = nextOffsets.toMap
-    immutableNextOffsetsCopy.keySet.foreach(abdicate(_))
-  }
-
-  def handleErrors(errorResponses: Set[(TopicAndPartition, FetchResponsePartitionData)], response: FetchResponse) = {
-    // FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves
-    case class Error(tp: TopicAndPartition, code: Short, exception: Exception)
-
-    // Now subdivide the errors into three types: non-recoverable, not leader (== abdicate) and offset out of range (== get new offset)
-
-    // Convert FetchResponse into easier-to-work-with Errors
-    val errors = for (
-      (topicAndPartition, responseData) <- errorResponses;
-      error <- Option(response.error(topicAndPartition.topic, topicAndPartition.partition)) // Scala's being cranky about referring to error.getKey values...
-    ) yield new Error(topicAndPartition, error.code(), error.exception())
-
-    val (notLeaderOrUnknownTopic, otherErrors) = errors.partition { case (e) => e.code == ErrorMapping.NotLeaderForPartitionCode || e.code == ErrorMapping.UnknownTopicOrPartitionCode }
-    val (offsetOutOfRangeErrors, remainingErrors) = otherErrors.partition(_.code == ErrorMapping.OffsetOutOfRangeCode)
-
-    // Can recover from two types of errors: not leader (go find the new leader) and offset out of range (go get the new offset)
-    // However, we want to bail as quickly as possible if there are non recoverable errors so that the state of the other
-    // topic-partitions remains the same.  That way, when we've rebuilt the simple consumer, we can come around and
-    // handle the recoverable errors.
-    remainingErrors.foreach(e => {
-      warn("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format remainingErrors.mkString(","))
-      KafkaUtil.maybeThrowException(e.exception) })
-
-    notLeaderOrUnknownTopic.foreach(e => {
-      warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating" format(e.code, e.tp))
-      abdicate(e.tp)
-    })
-
-    offsetOutOfRangeErrors.foreach(e => {
-      warn("Received OffsetOutOfRange exception for %s. Current offset = %s" format (e.tp, nextOffsets.getOrElse(e.tp, "not found in map, likely removed in the interim")))
-
-      try {
-        val newOffset = offsetGetter.getResetOffset(simpleConsumer, e.tp)
-        // Put the new offset into the map (if the tp still exists).  Will catch it on the next go-around
-        nextOffsets.replace(e.tp, newOffset)
-      } catch {
-        // UnknownTopic or NotLeader are routine events and handled via abdication.  All others, bail.
-        case _ @ (_:UnknownTopicOrPartitionException | _: NotLeaderForPartitionException) => warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating" format(e.code, e.tp))
-                                                                                             abdicate(e.tp)
-      }
-    })
-  }
-
-  def moveMessagesToTheirQueue(tp: TopicAndPartition, data: FetchResponsePartitionData) = {
-    val messageSet: MessageSet = data.messages
-    var nextOffset = nextOffsets(tp)
-
-    messageSink.setIsAtHighWatermark(tp, data.hw == 0 || data.hw == nextOffset)
-    require(messageSet != null)
-    for (message <- messageSet.iterator) {
-      messageSink.addMessage(tp, message, data.hw) // TODO: Verify this is correct
-
-      nextOffset = message.nextOffset
-
-      val bytesSize = message.message.payloadSize + message.message.keySize
-      metrics.reads.get(tp).inc
-      metrics.bytesRead.get(tp).inc(bytesSize)
-      metrics.brokerBytesRead.get((host, port)).inc(bytesSize)
-      metrics.offsets.get(tp).set(nextOffset)
-    }
-
-    nextOffsets.replace(tp, nextOffset) // use replace rather than put in case this tp was removed while we were fetching.
-
-    // Update high water mark
-    val hw = data.hw
-    if (hw >= 0) {
-      metrics.highWatermark.get(tp).set(hw)
-      metrics.lag.get(tp).set(hw - nextOffset)
-    } else {
-      debug("Got a high water mark less than 0 (%d) for %s, so skipping." format (hw, tp))
-    }
-  }
-  override def toString() = "BrokerProxy for %s:%d" format (host, port)
-
-  def start {
-    if (!thread.isAlive) {
-      info("Starting " + toString)
-      thread.setDaemon(true)
-      thread.setName("Samza BrokerProxy " + thread.getName)
-      thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
-        override def uncaughtException(t: Thread, e: Throwable) = error("Uncaught exception in broker proxy:", e)
-      })
-      thread.start
-    } else {
-      debug("Tried to start an already started broker proxy (%s). Ignoring." format toString)
-    }
-  }
-
-  def stop {
-    info("Shutting down " + toString)
-
-    if (simpleConsumer != null) {
-      info("closing simple consumer...")
-      simpleConsumer.close
-    }
-
-    thread.interrupt
-    thread.join
-  }
-
-  private def refreshLatencyMetrics {
-    nextOffsets.foreach{
-      case (topicAndPartition, offset) => {
-        val latestOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, -1, Request.OrdinaryConsumerId)
-        trace("latest offset of %s is %s" format (topicAndPartition, latestOffset))
-        if (latestOffset >= 0) {
-          // only update the registered topicAndpartitions
-          if(metrics.highWatermark.containsKey(topicAndPartition)) {
-            metrics.highWatermark.get(topicAndPartition).set(latestOffset)
-          }
-          if(metrics.lag.containsKey(topicAndPartition)) {
-            metrics.lag.get(topicAndPartition).set(latestOffset - offset)
-          }
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 01b345a..e61e0ff 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -47,8 +47,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Separate thread that reads messages from kafka and puts them int the BlockingEnvelopeMap
- * This class is not thread safe. There will be only one instance of this class per LiKafkaSystemConsumer object
+ * Separate thread that reads messages from kafka and puts them into the BlockingEnvelopeMap.
+ * This class is not thread safe. There will be only one instance of this class per LiKafkaSystemConsumer object.
  * We still need some synchronization around kafkaConsumer. See pollConsumer() method for details.
  */
 public class KafkaConsumerProxy<K, V> {
@@ -65,7 +65,7 @@ public class KafkaConsumerProxy<K, V> {
   private final String clientId;
   private final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>();
   private final Map<SystemStreamPartition, MetricName> ssp2MetricName = new HashMap<>();
-  // list of all the SSPs we poll from with their next offsets correspondingly.
+  // list of all the SSPs we poll from, with their next offsets correspondingly.
   private final Map<SystemStreamPartition, Long> nextOffsets = new ConcurrentHashMap<>();
   // lags behind the high water mark, as reported by the Kafka consumer.
   private final Map<SystemStreamPartition, Long> latestLags = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
deleted file mode 100644
index fd84c4a..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.kafka
-
-import kafka.common.TopicAndPartition
-import org.apache.samza.util.Logging
-import kafka.message.Message
-import kafka.message.MessageAndOffset
-import org.apache.samza.Partition
-import org.apache.kafka.common.utils.Utils
-import org.apache.samza.util.Clock
-import kafka.serializer.DefaultDecoder
-import kafka.serializer.Decoder
-import org.apache.samza.util.BlockingEnvelopeMap
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.IncomingMessageEnvelope
-import kafka.consumer.ConsumerConfig
-import org.apache.samza.util.TopicMetadataStore
-import kafka.api.PartitionMetadata
-import kafka.api.TopicMetadata
-import org.apache.samza.util.ExponentialSleepStrategy
-import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
-import org.apache.samza.system.SystemAdmin
-
-object KafkaSystemConsumer {
-
-  // Approximate additional shallow heap overhead per message in addition to the raw bytes
-  // received from Kafka  4 + 64 + 4 + 4 + 4 = 80 bytes overhead.
-  // As this overhead is a moving target, and not very large
-  // compared to the message size its being ignore in the computation for now.
-  val MESSAGE_SIZE_OVERHEAD =  4 + 64 + 4 + 4 + 4;
-
-  def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
-    val topic = systemStreamPartition.getStream
-    val partitionId = systemStreamPartition.getPartition.getPartitionId
-    TopicAndPartition(topic, partitionId)
-  }
-}
-
-/**
- *  Maintain a cache of BrokerProxies, returning the appropriate one for the
- *  requested topic and partition.
- */
-private[kafka] class KafkaSystemConsumer(
-  systemName: String,
-  systemAdmin: SystemAdmin,
-  metrics: KafkaSystemConsumerMetrics,
-  metadataStore: TopicMetadataStore,
-  clientId: String,
-  timeout: Int = ConsumerConfig.ConsumerTimeoutMs,
-  bufferSize: Int = ConsumerConfig.SocketBufferSize,
-  fetchSize: StreamFetchSizes = new StreamFetchSizes,
-  consumerMinSize: Int = ConsumerConfig.MinFetchBytes,
-  consumerMaxWait: Int = ConsumerConfig.MaxFetchWaitMs,
-
-  /**
-   * Defines a low water mark for how many messages we buffer before we start
-   * executing fetch requests against brokers to get more messages. This value
-   * is divided equally among all registered SystemStreamPartitions. For
-   * example, if fetchThreshold is set to 50000, and there are 50
-   * SystemStreamPartitions registered, then the per-partition threshold is
-   * 1000. As soon as a SystemStreamPartition's buffered message count drops
-   * below 1000, a fetch request will be executed to get more data for it.
-   *
-   * Increasing this parameter will decrease the latency between when a queue
-   * is drained of messages and when new messages are enqueued, but also leads
-   * to an increase in memory usage since more messages will be held in memory.
-   */
-  fetchThreshold: Int = 50000,
-  /**
-   * Defines a low water mark for how many bytes we buffer before we start
-   * executing fetch requests against brokers to get more messages. This
-   * value is divided by 2 because the messages are buffered twice, once in
-   * KafkaConsumer and then in SystemConsumers. This value
-   * is divided equally among all registered SystemStreamPartitions.
-   * However this is a soft limit per partition, as the
-   * bytes are cached at the message boundaries, and the actual usage can be
-   * 1000 bytes + size of max message in the partition for a given stream.
-   * The bytes if the size of the bytebuffer in Message. Hence, the
-   * Object overhead is not taken into consideration. In this codebase
-   * it seems to be quite small. Hence, even for 500000 messages this is around 4MB x 2 = 8MB,
-   * which is not considerable.
-   *
-   * For example,
-   * if fetchThresholdBytes is set to 100000 bytes, and there are 50
-   * SystemStreamPartitions registered, then the per-partition threshold is
-   * (100000 / 2) / 50 = 1000 bytes.
-   * As this is a soft limit, the actual usage can be 1000 bytes + size of max message.
-   * As soon as a SystemStreamPartition's buffered messages bytes drops
-   * below 1000, a fetch request will be executed to get more data for it.
-   *
-   * Increasing this parameter will decrease the latency between when a queue
-   * is drained of messages and when new messages are enqueued, but also leads
-   * to an increase in memory usage since more messages will be held in memory.
-   *
-   * The default value is -1, which means this is not used. When the value
-   * is > 0, then the fetchThreshold which is count based is ignored.
-   */
-  fetchThresholdBytes: Long = -1,
-  /**
-   * if(fetchThresholdBytes > 0) true else false
-   */
-  fetchLimitByBytesEnabled: Boolean = false,
-  offsetGetter: GetOffset = new GetOffset("fail"),
-  deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
-  keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
-  retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
-  clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(
-    metrics.registry,
-    new Clock {
-      def currentTimeMillis = clock()
-    },
-    classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging {
-
-  type HostPort = (String, Int)
-  val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
-  val topicPartitionsAndOffsets: scala.collection.concurrent.Map[TopicAndPartition, String] = new ConcurrentHashMap[TopicAndPartition, String]().asScala
-  var perPartitionFetchThreshold = fetchThreshold
-  var perPartitionFetchThresholdBytes = 0L
-
-  def start() {
-    if (topicPartitionsAndOffsets.size > 0) {
-      perPartitionFetchThreshold = fetchThreshold / topicPartitionsAndOffsets.size
-      // messages get double buffered, hence divide by 2
-      if(fetchLimitByBytesEnabled) {
-        perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / topicPartitionsAndOffsets.size
-      }
-    }
-
-    systemAdmin.start()
-    refreshBrokers
-  }
-
-  override def register(systemStreamPartition: SystemStreamPartition, offset: String) {
-    super.register(systemStreamPartition, offset)
-
-    val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)
-    val existingOffset = topicPartitionsAndOffsets.getOrElseUpdate(topicAndPartition, offset)
-    // register the older offset in the consumer
-    if (systemAdmin.offsetComparator(existingOffset, offset) >= 0) {
-      topicPartitionsAndOffsets.replace(topicAndPartition, offset)
-    }
-
-    metrics.registerTopicAndPartition(KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition))
-  }
-
-  def stop() {
-    systemAdmin.stop()
-    brokerProxies.values.foreach(_.stop)
-  }
-
-  protected def createBrokerProxy(host: String, port: Int): BrokerProxy = {
-    info("Creating new broker proxy for host: %s and port: %s" format(host, port))
-    new BrokerProxy(host, port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter)
-  }
-
-  protected def getPartitionMetadata(topicMetadata: TopicMetadata, partition: Int): Option[PartitionMetadata] = {
-    topicMetadata.partitionsMetadata.find(_.partitionId == partition)
-  }
-
-  protected def getLeaderHostPort(partitionMetadata: Option[PartitionMetadata]): Option[(String, Int)] = {
-    // Whatever we do, we can't say Broker, even though we're
-    // manipulating it here. Broker is a private type and Scala doesn't seem
-    // to care about that as long as you don't explicitly declare its type.
-    val brokerOption = partitionMetadata.flatMap(_.leader)
-
-    brokerOption match {
-      case Some(broker) => Some(broker.host, broker.port)
-      case _ => None
-    }
-  }
-
-  def refreshBrokers {
-    var tpToRefresh = topicPartitionsAndOffsets.keySet.toList
-    info("Refreshing brokers for: %s" format topicPartitionsAndOffsets)
-    retryBackoff.run(
-      loop => {
-        val topics = tpToRefresh.map(_.topic).toSet
-        val topicMetadata = TopicMetadataCache.getTopicMetadata(topics, systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
-
-        // addTopicPartition one at a time, leaving the to-be-done list intact in case of exceptions.
-        // This avoids trying to re-add the same topic partition repeatedly
-        def refresh() = {
-          val head = tpToRefresh.head
-          // refreshBrokers can be called from abdicate and refreshDropped,
-          // both of which are triggered from BrokerProxy threads. To prevent
-          // accidentally creating multiple objects for the same broker, or
-          // accidentally not updating the topicPartitionsAndOffsets variable,
-          // we need to lock.
-          this.synchronized {
-            // Check if we still need this TopicAndPartition inside the
-            // critical section. If we don't, then notAValidEvent it.
-            topicPartitionsAndOffsets.get(head) match {
-              case Some(nextOffset) =>
-                val partitionMetadata = getPartitionMetadata(topicMetadata(head.topic), head.partition)
-                getLeaderHostPort(partitionMetadata) match {
-                  case Some((host, port)) =>
-                    debug("Got partition metadata for %s: %s" format(head, partitionMetadata.get))
-                    val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port))
-                    brokerProxy.addTopicPartition(head, Option(nextOffset))
-                    brokerProxy.start
-                    debug("Claimed topic-partition (%s) for (%s)".format(head, brokerProxy))
-                    topicPartitionsAndOffsets -= head
-                  case None => info("No metadata available for: %s. Will try to refresh and add to a consumer thread later." format head)
-                }
-              case _ => debug("Ignoring refresh for %s because we already added it from another thread." format head)
-            }
-          }
-          tpToRefresh.tail
-        }
-
-        while (!tpToRefresh.isEmpty) {
-          tpToRefresh = refresh()
-        }
-
-        loop.done
-      },
-
-      (exception, loop) => {
-        warn("While refreshing brokers for %s: %s. Retrying." format (tpToRefresh.head, exception))
-        debug("Exception detail:", exception)
-      })
-  }
-
-  val sink = new MessageSink {
-    var lastDroppedRefresh = clock()
-
-    def refreshDropped() {
-      if (topicPartitionsAndOffsets.size > 0 && clock() - lastDroppedRefresh > 10000) {
-        refreshBrokers
-        lastDroppedRefresh = clock()
-      }
-    }
-
-    def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
-      setIsAtHead(toSystemStreamPartition(tp), isAtHighWatermark)
-    }
-
-    def needsMoreMessages(tp: TopicAndPartition) = {
-      if(fetchLimitByBytesEnabled) {
-        getMessagesSizeInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThresholdBytes
-      } else {
-        getNumMessagesInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThreshold
-      }
-    }
-
-    def getMessageSize(message: Message): Integer = {
-      message.size + KafkaSystemConsumer.MESSAGE_SIZE_OVERHEAD
-    }
-
-    def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = {
-      trace("Incoming message %s: %s." format (tp, msg))
-
-      val systemStreamPartition = toSystemStreamPartition(tp)
-      val isAtHead = highWatermark == msg.offset
-      val offset = msg.offset.toString
-      val key = if (msg.message.key != null) {
-        keyDeserializer.fromBytes(Utils.readBytes(msg.message.key))
-      } else {
-        null
-      }
-      val message = if (!msg.message.isNull) {
-        deserializer.fromBytes(Utils.readBytes(msg.message.payload))
-      } else {
-        null
-      }
-
-      if(fetchLimitByBytesEnabled ) {
-        val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, getMessageSize(msg.message))
-        ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L)
-        put(systemStreamPartition, ime)
-      } else {
-        val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message)
-        ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L)
-        put(systemStreamPartition, ime)
-      }
-
-      setIsAtHead(systemStreamPartition, isAtHead)
-    }
-
-    def abdicate(tp: TopicAndPartition, nextOffset: Long) {
-      info("Abdicating for %s" format (tp))
-      topicPartitionsAndOffsets += tp -> nextOffset.toString
-      refreshBrokers
-    }
-
-    private def toSystemStreamPartition(tp: TopicAndPartition) = {
-      new SystemStreamPartition(systemName, tp.topic, new Partition(tp.partition))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
index 51545a0..1aa66dc 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -35,6 +35,7 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr
   val highWatermark = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]]
 
   /*
+  TODO Fix
    * (String, Int) = (host, port) of BrokerProxy.
    */
 

http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
index dd7e584..b745628 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
@@ -66,14 +66,14 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
   private final KafkaSystemConsumerMetrics samzaConsumerMetrics;
   private final String clientId;
   private final String metricName;
-  private final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>();
+  /* package private */final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>();
   private final AtomicBoolean stopped = new AtomicBoolean(false);
   private final AtomicBoolean started = new AtomicBoolean(false);
   private final Config config;
   private final boolean fetchThresholdBytesEnabled;
 
   // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap.
-  private KafkaConsumerMessageSink messageSink;
+  /* package private */ KafkaConsumerMessageSink messageSink;
   // proxy is doing the actual reading
   private KafkaConsumerProxy proxy;
 
@@ -142,17 +142,6 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
 
     Map<String, String> injectProps = new HashMap<>();
 
-    // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should
-    // default to byte[]
-    if ( !config.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
-      LOG.info("default key serialization for the consumer(for {}) to ByteArrayDeserializer", systemName);
-      injectProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-    }
-    if ( !config.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
-      LOG.info("default value serialization for the consumer(for {}) to ByteArrayDeserializer", systemName);
-      injectProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-    }
-
     // extract kafka consumer configs
     KafkaConsumerConfig consumerConfig =
         KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId, injectProps);
@@ -203,7 +192,7 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
     }
   }
 
-  private void createConsumerProxy() {
+  void createConsumerProxy() {
     // create a sink for passing the messages between the proxy and the consumer
     messageSink = new KafkaConsumerMessageSink();
 
@@ -219,7 +208,7 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
    Add the TopicPartitions to the proxy.
    Start the proxy thread.
    */
-  private void startConsumer() {
+  void startConsumer() {
     //set the offset for each TopicPartition
     topicPartitions2Offset.forEach((tp, startingOffsetString) -> {
       long startingOffset = Long.valueOf(startingOffsetString);

http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 8544dbf..8d92f4d 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -92,8 +92,8 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     assertNull(readCp)
 
     writeCheckpoint(checkpointTopic, taskName, checkpoint1)
-
     assertEquals(checkpoint1, readCheckpoint(checkpointTopic, taskName))
+
     // writing a second message and reading it returns a more recent checkpoint
     writeCheckpoint(checkpointTopic, taskName, checkpoint2)
     assertEquals(checkpoint2, readCheckpoint(checkpointTopic, taskName))
@@ -194,7 +194,6 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory])
 
     val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1, props)
-    System.out.println("CONFIG = " + config)
     new KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, config, new NoOpMetricsRegistry, serde)
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index d510076..a3f76e7 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -41,6 +41,7 @@ import org.mockito.{Matchers, Mockito}
 import scala.collection.JavaConverters._
 
 class TestBrokerProxy extends Logging {
+  /*
   val tp2 = new TopicAndPartition("Redbird", 2013)
   var fetchTp1 = true // control whether fetching tp1 messages or not
 
@@ -305,6 +306,7 @@ class TestBrokerProxy extends Logging {
   }
 
   /**
+    * TODO fix
    * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions
    * that it owns when a consumer failure occurs.
    */
@@ -431,4 +433,5 @@ class TestBrokerProxy extends Logging {
     bp.stop
     verify(mockSimpleConsumer).close
   }
+  */
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
deleted file mode 100644
index 8656d10..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.kafka
-
-import kafka.api.TopicMetadata
-import kafka.api.PartitionMetadata
-import kafka.cluster.Broker
-import kafka.common.TopicAndPartition
-import kafka.message.Message
-import kafka.message.MessageAndOffset
-import org.apache.kafka.common.protocol.Errors
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.Partition
-import org.apache.samza.util.TopicMetadataStore
-import org.junit.Test
-import org.junit.Assert._
-import org.apache.samza.system.SystemAdmin
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-
-class TestKafkaSystemConsumer {
-  val systemAdmin: SystemAdmin = mock(classOf[KafkaSystemAdmin])
-  private val SSP: SystemStreamPartition = new SystemStreamPartition("test", "test", new Partition(0))
-  private val envelope: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null)
-  private val envelopeWithSize: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null, 100)
-  private val clientId = "TestClientId"
-
-  @Test
-  def testFetchThresholdShouldDivideEvenlyAmongPartitions {
-    val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000) {
-      override def refreshBrokers {
-      }
-    }
-
-    for (i <- 0 until 50) {
-      consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
-    }
-
-    consumer.start
-
-    assertEquals(1000, consumer.perPartitionFetchThreshold)
-  }
-
-  @Test
-  def testBrokerCreationShouldTriggerStart {
-    val systemName = "test-system"
-    val streamName = "test-stream"
-    val metrics = new KafkaSystemConsumerMetrics
-    // Lie and tell the store that the partition metadata is empty. We can't
-    // use partition metadata because it has Broker in its constructor, which
-    // is package private to Kafka.
-    val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, Errors.NONE)))
-    var hosts = List[String]()
-    var getHostPortCount = 0
-    val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, metadataStore, clientId) {
-      override def getLeaderHostPort(partitionMetadata: Option[PartitionMetadata]): Option[(String, Int)] = {
-        // Generate a unique host every time getHostPort is called.
-        getHostPortCount += 1
-        Some("localhost-%s" format getHostPortCount, 0)
-      }
-
-      override def createBrokerProxy(host: String, port: Int): BrokerProxy = {
-        new BrokerProxy(host, port, systemName, "", metrics, sink) {
-          override def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
-            // Skip this since we normally do verification of offsets, which
-            // tries to connect to Kafka. Rather than mock that, just forget it.
-            nextOffsets.size
-          }
-
-          override def start {
-            hosts :+= host
-          }
-        }
-      }
-    }
-
-    consumer.register(new SystemStreamPartition(systemName, streamName, new Partition(0)), "1")
-    assertEquals(0, hosts.size)
-    consumer.start
-    assertEquals(List("localhost-1"), hosts)
-    // Should trigger a refresh with a new host.
-    consumer.sink.abdicate(new TopicAndPartition(streamName, 0), 2)
-    assertEquals(List("localhost-1", "localhost-2"), hosts)
-  }
-
-  @Test
-  def testConsumerRegisterOlderOffsetOfTheSamzaSSP {
-    when(systemAdmin.offsetComparator(anyString, anyString)).thenCallRealMethod()
-
-    val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000)
-    val ssp0 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
-    val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(1))
-    val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(2))
-
-    consumer.register(ssp0, "0")
-    consumer.register(ssp0, "5")
-    consumer.register(ssp1, "2")
-    consumer.register(ssp1, "3")
-    consumer.register(ssp2, "0")
-
-    assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp0)))
-    assertEquals("2", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp1)))
-    assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp2)))
-  }
-
-  @Test
-  def testFetchThresholdBytesShouldDivideEvenlyAmongPartitions {
-    val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId,
-      fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) {
-      override def refreshBrokers {
-      }
-    }
-
-    for (i <- 0 until 10) {
-      consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
-    }
-
-    consumer.start
-
-    assertEquals(5000, consumer.perPartitionFetchThreshold)
-    assertEquals(3000, consumer.perPartitionFetchThresholdBytes)
-  }
-
-  @Test
-  def testFetchThresholdBytes {
-    val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("test-system", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId,
-      fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) {
-      override def refreshBrokers {
-      }
-    }
-
-    for (i <- 0 until 10) {
-      consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
-    }
-
-    consumer.start
-
-    val msg = Array[Byte](5, 112, 9, 126)
-    val msgAndOffset: MessageAndOffset = MessageAndOffset(new Message(msg), 887654)
-    // 4 data + 18 Message overhead + 80 IncomingMessageEnvelope overhead
-    consumer.sink.addMessage(new TopicAndPartition("test-stream", 0),  msgAndOffset, 887354)
-
-    assertEquals(106, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0))))
-  }
-
-  @Test
-  def testFetchThresholdBytesDisabled {
-    val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId,
-      fetchThreshold = 50000, fetchThresholdBytes = 60000L) {
-      override def refreshBrokers {
-      }
-    }
-
-    for (i <- 0 until 10) {
-      consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
-    }
-
-    consumer.start
-
-    assertEquals(5000, consumer.perPartitionFetchThreshold)
-    assertEquals(0, consumer.perPartitionFetchThresholdBytes)
-    assertEquals(0, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0))))
-  }
-}
-
-class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) extends TopicMetadataStore {
-  def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata] = metadata
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestNewKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestNewKafkaSystemConsumer.java b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestNewKafkaSystemConsumer.java
new file mode 100644
index 0000000..f7f63f3
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestNewKafkaSystemConsumer.java
@@ -0,0 +1,203 @@
+package org.apache.samza.system.kafka;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.KafkaConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestNewKafkaSystemConsumer {
+  public final String TEST_SYSTEM = "test-system";
+  public final String TEST_STREAM = "test-stream";
+  public final String TEST_CLIENT_ID = "testClientId";
+  public final String BOOTSTRAP_SERVER = "127.0.0.1:8888";
+  public final String FETCH_THRESHOLD_MSGS = "50000";
+  public final String FETCH_THRESHOLD_BYTES = "100000";
+
+  @Before
+  public void setUp() {
+
+  }
+
+  private NewKafkaSystemConsumer setupConsumer(String fetchMsg, String fetchBytes) {
+    final Map<String, String> map = new HashMap<>();
+
+    map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD(), TEST_SYSTEM), fetchMsg);
+    map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES(), TEST_SYSTEM), fetchBytes);
+    map.put(String.format("systems.%s.consumer.%s", TEST_SYSTEM, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
+        BOOTSTRAP_SERVER);
+
+    Config config = new MapConfig(map);
+    KafkaConsumerConfig consumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, TEST_CLIENT_ID, Collections.emptyMap());
+    final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig.originals());
+
+    MockNewKafkaSystmeCosumer newKafkaSystemConsumer =
+        new MockNewKafkaSystmeCosumer(kafkaConsumer, TEST_SYSTEM, config, TEST_CLIENT_ID,
+            new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry()), System::currentTimeMillis);
+
+    return newKafkaSystemConsumer;
+  }
+
+  @Test
+  public void testConfigValidations() {
+
+    final NewKafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
+
+    consumer.start();
+    // should be no failures
+  }
+
+  @Test
+  public void testFetchThresholdShouldDivideEvenlyAmongPartitions() {
+    final NewKafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
+    final int partitionsNum = 50;
+    for (int i = 0; i < partitionsNum; i++) {
+      consumer.register(new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(i)), "0");
+    }
+
+    consumer.start();
+
+    Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum, consumer.perPartitionFetchThreshold);
+    Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_BYTES) / 2 / partitionsNum,
+        consumer.perPartitionFetchThresholdBytes);
+  }
+
+  @Test
+  public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() {
+
+    NewKafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
+
+    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
+    SystemStreamPartition ssp2 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(2));
+
+    consumer.register(ssp0, "0");
+    consumer.register(ssp0, "5");
+    consumer.register(ssp1, "2");
+    consumer.register(ssp1, "3");
+    consumer.register(ssp2, "0");
+
+    assertEquals("0", consumer.topicPartitions2Offset.get(NewKafkaSystemConsumer.toTopicPartition(ssp0)));
+    assertEquals("2", consumer.topicPartitions2Offset.get(NewKafkaSystemConsumer.toTopicPartition(ssp1)));
+    assertEquals("0", consumer.topicPartitions2Offset.get(NewKafkaSystemConsumer.toTopicPartition(ssp2)));
+  }
+
+  @Test
+  public void testFetchThresholdBytes() {
+
+    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
+    int partitionsNum = 2;
+    int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // fake size
+    int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 1; // fake size
+    int ime11Size = 20;
+    ByteArraySerializer bytesSerde = new ByteArraySerializer();
+    IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", bytesSerde.serialize("", "key0".getBytes()),
+        bytesSerde.serialize("", "value0".getBytes()), ime0Size);
+    IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key1".getBytes()),
+        bytesSerde.serialize("", "value1".getBytes()), ime1Size);
+    IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()),
+        bytesSerde.serialize("", "value11".getBytes()), ime11Size);
+    NewKafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES);
+
+    consumer.register(ssp0, "0");
+    consumer.register(ssp1, "0");
+    consumer.start();
+    consumer.messageSink.addMessage(ssp0, ime0);
+    // queue for ssp0 should be full now, because we added message of size FETCH_THRESHOLD_MSGS/partitionsNum
+    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp0));
+    consumer.messageSink.addMessage(ssp1, ime1);
+    // queue for ssp1 should be less then full now, because we added message of size (FETCH_THRESHOLD_MSGS/partitionsNum - 1)
+    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1));
+    consumer.messageSink.addMessage(ssp1, ime11);
+    // queue for ssp1 should full now, because we added message of size 20 on top
+    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1));
+
+    Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0));
+    Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
+    Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0));
+    Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1));
+  }
+
+  @Test
+  public void testFetchThresholdBytesDiabled() {
+    // Pass 0 as fetchThresholdByBytes, which disables checking for limit by size
+
+    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1));
+    int partitionsNum = 2;
+    int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // fake size, upto the limit
+    int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 100; // fake size, below the limit
+    int ime11Size = 20;// event with the second message still below the size limit
+    ByteArraySerializer bytesSerde = new ByteArraySerializer();
+    IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", bytesSerde.serialize("", "key0".getBytes()),
+        bytesSerde.serialize("", "value0".getBytes()), ime0Size);
+    IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key1".getBytes()),
+        bytesSerde.serialize("", "value1".getBytes()), ime1Size);
+    IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()),
+        bytesSerde.serialize("", "value11".getBytes()), ime11Size);
+
+    // limit by number of messages 4/2 = 2 per partition
+    // limit by number of bytes - disabled
+    NewKafkaSystemConsumer consumer = setupConsumer("4", "0"); // should disable
+
+    consumer.register(ssp0, "0");
+    consumer.register(ssp1, "0");
+    consumer.start();
+    consumer.messageSink.addMessage(ssp0, ime0);
+    // should be full by size, but not full by number of messages (1 of 2)
+    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp0));
+    consumer.messageSink.addMessage(ssp1, ime1);
+    // not full neither by size nor by messages
+    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1));
+    consumer.messageSink.addMessage(ssp1, ime11);
+    // not full by size, but should be full by messages
+    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1));
+
+    Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0));
+    Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
+    Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0));
+    Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1));
+  }
+
+  // mock kafkaConsumer and SystemConsumer
+  static class MockKafkaConsumer extends KafkaConsumer {
+    public MockKafkaConsumer(Map<String, Object> configs) {
+      super(configs);
+    }
+  }
+
+  static class MockNewKafkaSystmeCosumer extends NewKafkaSystemConsumer {
+    public MockNewKafkaSystmeCosumer(Consumer kafkaConsumer, String systemName, Config config, String clientId,
+        KafkaSystemConsumerMetrics metrics, Clock clock) {
+      super(kafkaConsumer, systemName, config, clientId, metrics, clock);
+    }
+
+    @Override
+    void createConsumerProxy() {
+      this.messageSink = new KafkaConsumerMessageSink();
+    }
+
+    @Override
+    void startConsumer() {
+    }
+  }
+}