You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/03/25 03:22:08 UTC

samza git commit: SAMZA-882 : Detect partition count changes in input streams

Repository: samza
Updated Branches:
  refs/heads/master 04ae6f5ce -> a51f7b2b9


SAMZA-882 : Detect partition count changes in input streams


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a51f7b2b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a51f7b2b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a51f7b2b

Branch: refs/heads/master
Commit: a51f7b2b994cd2af0bec586a405fc426a2f8cf1b
Parents: 04ae6f5
Author: Navina Ramesh <nr...@linkedin.com>
Authored: Thu Mar 24 19:18:53 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Thu Mar 24 19:18:53 2016 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     |  21 +++
 .../samza/system/ExtendedSystemAdmin.java       |  30 ++++
 .../org/apache/samza/config/JobConfig.scala     |  13 +-
 .../samza/coordinator/JobCoordinator.scala      |  63 +++++----
 .../StreamPartitionCountMonitor.scala           | 108 +++++++++++++++
 .../samza/system/StreamMetadataCache.scala      |  31 +++--
 .../main/scala/org/apache/samza/util/Util.scala |  10 +-
 .../samza/coordinator/TestJobCoordinator.scala  |  34 +++--
 .../TestStreamPartitionCountMonitor.scala       | 137 +++++++++++++++++++
 .../samza/system/kafka/KafkaSystemAdmin.scala   |  41 +++++-
 .../samza/job/yarn/TestContainerAllocator.java  |   2 +-
 .../yarn/TestHostAwareContainerAllocator.java   |   2 +-
 .../samza/job/yarn/TestSamzaTaskManager.java    |   2 +-
 13 files changed, 440 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 2745a22..c1c822e 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -184,6 +184,27 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="job-coordinator-monitor-partition-change">job.coordinator.<br />monitor-partition-change</td>
+                    <td class="default">false</td>
+                    <td class="description">
+                        If you are using Kafka for coordinator stream, this configuration enables the Job Coordinator to
+                        detect partition count difference in Kafka input topics. On detection, it updates a Gauge
+                        metric of format <span class="system">system-name</span>.<span class="stream">stream-name</span>.partitionCount,
+                        which indicates the difference in the partition count from the initial state. Please note that currently this
+                        feature only works for Kafka-based systems.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="job-coordinator-monitor-partition-change-frequency-ms">job.coordinator.<br />monitor-partition-change.frequency.ms</td>
+                    <td class="default">300000</td>
+                    <td class="description">
+                        The frequency at which the input streams' partition count change should be detected. This check
+                        can be tuned to be pretty low as partition increase is not a common event.
+                    </td>
+                </tr>
+
+                <tr>
                     <td class="property" id="job-config-rewriter-class">job.config.rewriter.<br><span class="rewriter">rewriter-name</span>.class</td>
                     <td class="default"></td>
                     <td class="description">

http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java
new file mode 100644
index 0000000..daa2212
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Interface extends the more generic SystemAdmin interface
+ * TODO: Merge this interface method with SystemAdmin when we upgrade to JDK 1.8
+ */
+public interface ExtendedSystemAdmin extends SystemAdmin {
+  Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 4f3e9a2..b59153f 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -19,10 +19,8 @@
 
 package org.apache.samza.config
 
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.util.Logging
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+import org.apache.samza.util.Logging
 
 object JobConfig {
   // job config constants
@@ -49,6 +47,9 @@ object JobConfig {
   // is not yet supported, and auto-creation of the topics cannot be always easily tuned off).
   // So we add a setting that allows for the job to continue even though number of partitions is not 1.
   val JOB_FAIL_CHECKPOINT_VALIDATION = "job.checkpoint.validation.enabled"
+  val MONITOR_PARTITION_CHANGE = "job.coordinator.monitor-partition-change"
+  val MONITOR_PARTITION_CHANGE_FREQUENCY_MS = "job.coordinator.monitor-partition-change.frequency.ms"
+  val DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS = 300000
 
   implicit def Config2Job(config: Config) = new JobConfig(config)
 }
@@ -74,6 +75,12 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     }
   }
 
+  def getMonitorPartitionChange = getBoolean(JobConfig.MONITOR_PARTITION_CHANGE, false)
+
+  def getMonitorPartitionChangeFrequency = getInt(
+    JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS,
+    JobConfig.DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS)
+
   def getStreamJobFactoryClass = getOption(JobConfig.STREAM_JOB_FACTORY_CLASS)
 
   def getJobId = getOption(JobConfig.JOB_ID)

http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 06a96ad..cd7daa2 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -20,32 +20,26 @@
 package org.apache.samza.coordinator
 
 
+import java.util
 import java.util.concurrent.atomic.AtomicReference
 
-import org.apache.samza.config.StorageConfig
-import org.apache.samza.job.model.{JobModel, TaskModel}
-import org.apache.samza.config.Config
-import org.apache.samza.SamzaException
-import org.apache.samza.container.grouper.task.TaskNameGrouperFactory
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.config.{Config, StorageConfig}
 import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
-import java.util
+import org.apache.samza.container.grouper.task.TaskNameGrouperFactory
 import org.apache.samza.container.{LocalityManager, TaskName}
-import org.apache.samza.storage.ChangelogPartitionManager
-import org.apache.samza.util.Logging
+import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
+import org.apache.samza.job.model.{JobModel, TaskModel}
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.util.Util
+import org.apache.samza.storage.ChangelogPartitionManager
+import org.apache.samza.system.{ExtendedSystemAdmin, StreamMetadataCache, SystemFactory, SystemStreamPartition}
+import org.apache.samza.util.{Logging, Util}
+import org.apache.samza.{Partition, SamzaException}
+
 import scala.collection.JavaConversions._
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.Partition
-import org.apache.samza.system.StreamMetadataCache
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.coordinator.server.HttpServer
-import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
-import org.apache.samza.coordinator.server.JobServlet
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 
 /**
  * Helper companion object that is responsible for wiring up a JobCoordinator
@@ -58,6 +52,7 @@ object JobCoordinator extends Logging {
    */
   @volatile var currentJobCoordinator: JobCoordinator = null
   val jobModelRef: AtomicReference[JobModel] = new AtomicReference[JobModel]()
+  var streamPartitionCountMonitor: StreamPartitionCountMonitor = null
 
   /**
    * @param coordinatorSystemConfig A config object that contains job.name,
@@ -92,6 +87,19 @@ object JobCoordinator extends Logging {
     }).toMap
 
     val streamMetadataCache = new StreamMetadataCache(systemAdmins)
+    if (config.getMonitorPartitionChange) {
+      val extendedSystemAdmins = systemAdmins.filter{
+        case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin]
+      }
+      val inputStreamsToMonitor = config.getInputStreams.filter(systemStream => extendedSystemAdmins.containsKey(systemStream.getSystem))
+      if (inputStreamsToMonitor.nonEmpty) {
+        streamPartitionCountMonitor = new StreamPartitionCountMonitor(
+          inputStreamsToMonitor,
+          streamMetadataCache,
+          metricsRegistryMap,
+          config.getMonitorPartitionChangeFrequency)
+      }
+    }
 
     val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache)
     createChangeLogStreams(config, jobCoordinator.jobModel.maxChangeLogStreamPartitions, streamMetadataCache)
@@ -113,7 +121,7 @@ object JobCoordinator extends Logging {
 
     val server = new HttpServer
     server.addServlet("/*", new JobServlet(jobModelRef))
-    currentJobCoordinator = new JobCoordinator(jobModel, server)
+    currentJobCoordinator = new JobCoordinator(jobModel, server, streamPartitionCountMonitor)
     currentJobCoordinator
   }
 
@@ -294,7 +302,8 @@ class JobCoordinator(
   /**
    * HTTP server used to serve a Samza job's container model to SamzaContainers when they start up.
    */
-  val server: HttpServer = null) extends Logging {
+  val server: HttpServer = null,
+  val streamPartitionCountMonitor: StreamPartitionCountMonitor = null) extends Logging {
 
   debug("Got job model: %s." format jobModel)
 
@@ -302,13 +311,21 @@ class JobCoordinator(
     if (server != null) {
       debug("Starting HTTP server.")
       server.start
-      info("Startd HTTP server: %s" format server.getUrl)
+      if (streamPartitionCountMonitor != null) {
+        debug("Starting Stream Partition Count Monitor..")
+        streamPartitionCountMonitor.startMonitor()
+      }
+      info("Started HTTP server: %s" format server.getUrl)
     }
   }
 
   def stop {
     if (server != null) {
       debug("Stopping HTTP server.")
+      if (streamPartitionCountMonitor != null) {
+        debug("Stopping Stream Partition Count Monitor..")
+        streamPartitionCountMonitor.stopMonitor()
+      }
       server.stop
       info("Stopped HTTP server.")
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala
new file mode 100644
index 0000000..6aeff57
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator
+
+import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.samza.metrics.{Gauge, MetricsRegistryMap}
+import org.apache.samza.system.{StreamMetadataCache, SystemStream, SystemStreamMetadata}
+import org.apache.samza.util.Logging
+
+private[coordinator] class StreamPartitionCountMonitor (
+  val streamsToMonitor: Set[SystemStream],
+  val metadataCache: StreamMetadataCache,
+  val metrics: MetricsRegistryMap,
+  val monitorFrequency: Int = 300000) extends Logging {
+
+  val initialMetadata: Map[SystemStream, SystemStreamMetadata] = metadataCache.getStreamMetadata(streamsToMonitor, true)
+  val gauges = new java.util.HashMap[SystemStream, Gauge[Int]]()
+  private val running: AtomicBoolean = new AtomicBoolean(false)
+  private var thread: Thread = null
+  private val lock = new Object
+
+  private def getMonitorThread(): Thread = {
+    new Thread(new Runnable {
+      override def run(): Unit = {
+        while (running.get()) {
+          try {
+            var currentMetadata: Map[SystemStream, SystemStreamMetadata] = Map[SystemStream, SystemStreamMetadata]()
+            currentMetadata = metadataCache.getStreamMetadata(streamsToMonitor, true)
+            initialMetadata.map {
+              case (systemStream, metadata) => {
+                val currentPartitionCount = currentMetadata(systemStream).getSystemStreamPartitionMetadata.keySet().size()
+                val prevPartitionCount = metadata.getSystemStreamPartitionMetadata.keySet().size()
+
+                val gauge = if (gauges.containsKey(systemStream)) {
+                  gauges.get(systemStream)
+                } else {
+                  metrics.newGauge[Int](
+                    "job-coordinator",
+                    String.format("%s-%s-partitionCount", systemStream.getSystem, systemStream.getStream),
+                    0)
+                }
+                gauge.set(currentPartitionCount - prevPartitionCount)
+                gauges.put(systemStream, gauge)
+              }
+            }
+            lock synchronized {
+              lock.wait(monitorFrequency)
+            }
+          } catch {
+            case ie: InterruptedException =>
+              info("Received Interrupted Exception: %s" format ie, ie)
+            case e: Exception =>
+              warn("Received Exception: %s" format e, e)
+          }
+        }
+      }
+    })
+  }
+
+  def startMonitor(): Unit = {
+    if (thread == null || !thread.isAlive) {
+      thread = getMonitorThread()
+      running.set(true)
+      thread.start()
+    }
+  }
+
+  /**
+   * Used in unit tests only
+   * @return Returns true if the monitor thread is running and false, otherwise
+   */
+  def isRunning(): Boolean = {
+    thread != null && thread.isAlive
+  }
+
+  def stopMonitor(): Unit = {
+    try {
+      running.set(false)
+      lock synchronized  {
+        lock.notify()
+      }
+      if (thread != null) {
+        thread.join(monitorFrequency)
+      }
+    } catch {
+      case e: Exception =>
+        println("[STOP MONITOR] Received Exception: %s" format e)
+        e.printStackTrace()
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
index 155c3d1..18b47ec 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
@@ -43,13 +43,17 @@ class StreamMetadataCache (
   private case class CacheEntry(metadata: SystemStreamMetadata, lastRefreshMs: Long)
   private var cache = Map[SystemStream, CacheEntry]()
   private val lock = new Object
-
   /**
    * Returns metadata about each of the given streams (such as first offset, newest
    * offset, etc). If the metadata isn't in the cache, it is retrieved from the systems
    * using the given SystemAdmins.
+   *
+   * @param streams Set of SystemStreams for which the metadata is requested
+   * @param partitionsMetadataOnly Flag to indicate that only partition count metadata should be fetched/refreshed
    */
-  def getStreamMetadata(streams: Set[SystemStream]): Map[SystemStream, SystemStreamMetadata] = {
+  def getStreamMetadata(
+    streams: Set[SystemStream],
+    partitionsMetadataOnly: Boolean = false): Map[SystemStream, SystemStreamMetadata] = {
     val time = clock.currentTimeMillis
     val cacheHits = streams.flatMap(stream => getFromCache(stream, time)).toMap
 
@@ -57,21 +61,26 @@ class StreamMetadataCache (
       .groupBy[String](_.getSystem)
       .flatMap {
         case (systemName, systemStreams) =>
-          systemAdmins
+          val systemAdmin = systemAdmins
             .getOrElse(systemName, throw new SamzaException("Cannot get metadata for unknown system: %s" format systemName))
-            .getSystemStreamMetadata(systemStreams.map(_.getStream))
-            .map {
-              case (streamName, metadata) => (new SystemStream(systemName, streamName) -> metadata)
-            }
+          val streamToMetadata = if (partitionsMetadataOnly && systemAdmin.isInstanceOf[ExtendedSystemAdmin]) {
+            systemAdmin.asInstanceOf[ExtendedSystemAdmin].getSystemStreamPartitionCounts(systemStreams.map(_.getStream))
+          } else {
+            systemAdmin.getSystemStreamMetadata(systemStreams.map(_.getStream))
+          }
+          streamToMetadata.map {
+            case (streamName, metadata) => (new SystemStream(systemName, streamName) -> metadata)
+          }
       }
-      .toMap
 
     val allResults = cacheHits ++ cacheMisses
     val missing = streams.filter(stream => allResults.getOrElse(stream, null) == null)
     if (!missing.isEmpty) {
       throw new SamzaException("Cannot get metadata for unknown streams: " + missing.mkString(", "))
     }
-    cacheMisses.foreach { case (stream, metadata) => addToCache(stream, metadata, time) }
+    if (!partitionsMetadataOnly) {
+      cacheMisses.foreach { case (stream, metadata) => addToCache(stream, metadata, time) }
+    }
     allResults
   }
 
@@ -83,9 +92,9 @@ class StreamMetadataCache (
     }
   }
 
-  private def addToCache(stream: SystemStream, metadata: SystemStreamMetadata, now: Long) {
+  private def addToCache(systemStream: SystemStream, metadata: SystemStreamMetadata, now: Long) {
     lock synchronized {
-      cache += stream -> CacheEntry(metadata, now)
+      cache += systemStream -> CacheEntry(metadata, now)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index bd0fe5f..fc3d085 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -208,8 +208,14 @@ object Util extends Logging {
   def buildCoordinatorStreamConfig(config: Config) = {
     val (jobName, jobId) = getJobNameAndId(config)
     // Build a map with just the system config and job.name/job.id. This is what's required to start the JobCoordinator.
-    new MapConfig(config.subset(SystemConfig.SYSTEM_PREFIX format config.getCoordinatorSystemName, false) ++
-      Map[String, String](JobConfig.JOB_NAME -> jobName, JobConfig.JOB_ID -> jobId, JobConfig.JOB_COORDINATOR_SYSTEM -> config.getCoordinatorSystemName))
+    new MapConfig(
+      config.subset(SystemConfig.SYSTEM_PREFIX format config.getCoordinatorSystemName, false) ++
+      Map[String, String](
+        JobConfig.JOB_NAME -> jobName,
+        JobConfig.JOB_ID -> jobId,
+        JobConfig.JOB_COORDINATOR_SYSTEM -> config.getCoordinatorSystemName,
+        JobConfig.MONITOR_PARTITION_CHANGE -> String.valueOf(config.getMonitorPartitionChange),
+        JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS -> String.valueOf(config.getMonitorPartitionChangeFrequency)))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 9ab1dd5..110c3a9 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -19,6 +19,8 @@
 
 package org.apache.samza.coordinator
 
+import java.util
+
 import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.util.Util
 import org.junit.{After, Test}
@@ -30,18 +32,13 @@ import org.apache.samza.config.SystemConfig
 import org.apache.samza.container.{SamzaContainer, TaskName}
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.config.Config
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.system.SystemAdmin
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.system._
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.Partition
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.job.model.TaskModel
 import org.apache.samza.config.JobConfig
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.SystemConsumer
 import org.apache.samza.coordinator.stream.{MockCoordinatorStreamWrappedConsumer, MockCoordinatorStreamSystemFactory}
 
 class TestJobCoordinator {
@@ -93,7 +90,9 @@ class TestJobCoordinator {
       TaskConfig.INPUT_STREAMS -> "test.stream1",
       SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
       SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
-      TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
+      TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory",
+      JobConfig.MONITOR_PARTITION_CHANGE -> "true",
+      JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS -> "100"
       )
 
     // We want the mocksystemconsumer to use the same instance across runs
@@ -114,7 +113,12 @@ class TestJobCoordinator {
     val jobModelFromCoordinatorUrl = SamzaObjectMapper.getObjectMapper.readValue(Util.read(coordinator.server.getUrl), classOf[JobModel])
     assertEquals(expectedJobModel, jobModelFromCoordinatorUrl)
 
+    // Check the status of Stream Partition Count Monitor
+    assertNotNull(coordinator.streamPartitionCountMonitor)
+    assertTrue(coordinator.streamPartitionCountMonitor.isRunning())
+
     coordinator.stop
+    assertFalse(coordinator.streamPartitionCountMonitor.isRunning())
   }
 
   @Test
@@ -202,7 +206,7 @@ class MockSystemFactory extends SystemFactory {
   def getAdmin(systemName: String, config: Config) = new MockSystemAdmin
 }
 
-class MockSystemAdmin extends SystemAdmin {
+class MockSystemAdmin extends ExtendedSystemAdmin {
   def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = null
   def getSystemStreamMetadata(streamNames: java.util.Set[String]): java.util.Map[String, SystemStreamMetadata] = {
     assertEquals(1, streamNames.size)
@@ -227,4 +231,18 @@ class MockSystemAdmin extends SystemAdmin {
   }
   
   override def offsetComparator(offset1: String, offset2: String) = null
+
+  override def getSystemStreamPartitionCounts(streamNames: util.Set[String]): util.Map[String, SystemStreamMetadata] = {
+    assertEquals(1, streamNames.size())
+    val result = streamNames.map {
+      stream =>
+        val partitionMetadata = Map(
+          new Partition(0) -> new SystemStreamPartitionMetadata("", "", ""),
+          new Partition(1) -> new SystemStreamPartitionMetadata("", "", ""),
+          new Partition(2) -> new SystemStreamPartitionMetadata("", "", "")
+        )
+        stream -> new SystemStreamMetadata(stream, partitionMetadata)
+    }.toMap
+    result
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
new file mode 100644
index 0000000..f47f818
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator
+
+import org.apache.samza.Partition
+import org.apache.samza.metrics.{Gauge, MetricsRegistryMap}
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.{SystemAdmin, StreamMetadataCache, SystemStream, SystemStreamMetadata}
+import org.junit.Assert._
+import org.junit.Test
+import org.mockito.Matchers
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.junit.AssertionsForJUnit
+import org.scalatest.mock.MockitoSugar
+
+class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSugar {
+
+  @Test
+  def testStreamPartitionCountMonitor(): Unit = {
+    val mockMetadataCache = mock[StreamMetadataCache]
+    val inputSystemStream = new SystemStream("test-system", "test-stream")
+    val inputSystemStreamSet = Set[SystemStream](inputSystemStream)
+
+    val initialPartitionMetadata = new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+      {
+        put(new Partition(0), new SystemStreamPartitionMetadata("", "", ""))
+        put(new Partition(1), new SystemStreamPartitionMetadata("", "", ""))
+      }
+    }
+
+    val finalPartitionMetadata = new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+      {
+        putAll(initialPartitionMetadata)
+        put(new Partition(2), new SystemStreamPartitionMetadata("", "", ""))
+      }
+    }
+    val initialMetadata = Map(
+      inputSystemStream -> new SystemStreamMetadata(inputSystemStream.getStream, initialPartitionMetadata)
+    )
+    val finalMetadata = Map(
+      inputSystemStream -> new SystemStreamMetadata(inputSystemStream.getStream, finalPartitionMetadata)
+    )
+
+    when(mockMetadataCache.getStreamMetadata(any(classOf[Set[SystemStream]]), Matchers.eq(true)))
+      .thenReturn(initialMetadata)  // Called during StreamPartitionCountMonitor instantiation
+      .thenReturn(initialMetadata)  // Called when monitor thread is started
+      .thenReturn(finalMetadata)  // Called from monitor thread the second time
+      .thenReturn(finalMetadata)
+
+    val partitionCountMonitor = new StreamPartitionCountMonitor(
+      inputSystemStreamSet,
+      mockMetadataCache,
+      new MetricsRegistryMap(),
+      5
+    )
+
+    partitionCountMonitor.startMonitor()
+    Thread.sleep(50)
+    partitionCountMonitor.stopMonitor()
+
+    assertNotNull(partitionCountMonitor.gauges.get(inputSystemStream))
+    assertEquals(1, partitionCountMonitor.gauges.get(inputSystemStream).getValue)
+
+    assertNotNull(partitionCountMonitor.metrics.getGroup("job-coordinator"))
+
+    val metricGroup = partitionCountMonitor.metrics.getGroup("job-coordinator")
+    assertTrue(metricGroup.get("test-system-test-stream-partitionCount").isInstanceOf[Gauge[Int]])
+    assertEquals(1, metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue)
+  }
+
+  @Test
+  def testStartStopBehavior(): Unit = {
+    val mockMetadataCache = new MockStreamMetadataCache
+    val inputSystemStream = new SystemStream("test-system", "test-stream")
+    val inputSystemStreamSet = Set[SystemStream](inputSystemStream)
+    val monitor = new StreamPartitionCountMonitor(
+      inputSystemStreamSet,
+      mockMetadataCache,
+      new MetricsRegistryMap(),
+      50
+    )
+    monitor.stopMonitor()
+    monitor.startMonitor()
+    assertTrue(monitor.isRunning())
+    monitor.startMonitor()
+    assertTrue(monitor.isRunning())
+    monitor.stopMonitor()
+    assertFalse(monitor.isRunning())
+    monitor.startMonitor()
+    assertTrue(monitor.isRunning())
+    monitor.stopMonitor()
+    assertFalse(monitor.isRunning())
+    monitor.stopMonitor()
+    assertFalse(monitor.isRunning())
+  }
+
+  class MockStreamMetadataCache extends StreamMetadataCache(Map[String, SystemAdmin]()) {
+    /**
+     * Returns metadata about each of the given streams (such as first offset, newest
+     * offset, etc). If the metadata isn't in the cache, it is retrieved from the systems
+     * using the given SystemAdmins.
+     */
+
+    override def getStreamMetadata(streams: Set[SystemStream], partitionsMetadataOnly: Boolean): Map[SystemStream, SystemStreamMetadata] = {
+      val inputSystemStream = new SystemStream("test-system", "test-stream")
+      val initialPartitionMetadata = new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+        {
+          put(new Partition(0), new SystemStreamPartitionMetadata("", "", ""))
+          put(new Partition(1), new SystemStreamPartitionMetadata("", "", ""))
+        }
+      }
+
+      val initialMetadata = Map(
+        inputSystemStream -> new SystemStreamMetadata(inputSystemStream.getStream, initialPartitionMetadata)
+      )
+      initialMetadata
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 9dc436a..23aa58d 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -19,17 +19,18 @@
 
 package org.apache.samza.system.kafka
 
+import java.util
+
 import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.Partition
 import org.apache.samza.SamzaException
-import org.apache.samza.system.SystemAdmin
-import org.apache.samza.system.SystemStreamMetadata
-import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.{ExtendedSystemAdmin, SystemStreamMetadata, SystemStreamPartition}
 import org.apache.samza.util.{ ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging }
 import kafka.api._
 import kafka.consumer.SimpleConsumer
 import kafka.common.{ TopicExistsException, TopicAndPartition }
 import java.util.{ Properties, UUID }
+import scala.collection.JavaConversions
 import scala.collection.JavaConversions._
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import kafka.consumer.ConsumerConfig
@@ -134,10 +135,42 @@ class KafkaSystemAdmin(
    * Replication factor for the Changelog topic in kafka
    * Kafka properties to be used during the Changelog topic creation
    */
-  topicMetaInformation: Map[String, ChangelogInfo] = Map[String, ChangelogInfo]()) extends SystemAdmin with Logging {
+  topicMetaInformation: Map[String, ChangelogInfo] = Map[String, ChangelogInfo]()) extends ExtendedSystemAdmin with Logging {
 
   import KafkaSystemAdmin._
 
+  def getSystemStreamPartitionCounts(streams: util.Set[String]): util.Map[String, SystemStreamMetadata] = {
+    getSystemStreamPartitionCounts(streams, new ExponentialSleepStrategy(initialDelayMs = 500))
+  }
+
+  def getSystemStreamPartitionCounts(streams: util.Set[String], retryBackoff: ExponentialSleepStrategy): util.Map[String, SystemStreamMetadata] = {
+    debug("Fetching system stream partition count for: %s" format streams)
+    retryBackoff.run(
+      loop => {
+        val metadata = TopicMetadataCache.getTopicMetadata(
+          streams.toSet,
+          systemName,
+        getTopicMetadata)
+        val result = metadata.map {
+          case (topic, topicMetadata) => {
+            val partitionsMap = topicMetadata.partitionsMetadata.map {
+              pm =>
+                new Partition(pm.partitionId) -> new SystemStreamPartitionMetadata("", "", "")
+            }.toMap[Partition, SystemStreamPartitionMetadata]
+            (topic -> new SystemStreamMetadata(topic, partitionsMap))
+          }
+        }
+        loop.done
+        JavaConversions.mapAsJavaMap(result)
+      },
+
+      (exception, loop) => {
+        warn("Unable to fetch last offsets for streams %s due to %s. Retrying." format (streams, exception))
+        debug("Exception detail:", exception)
+      }
+    ).getOrElse(throw new SamzaException("Failed to get system stream metadata"))
+  }
+
   /**
    * Returns the offset for the message after the specified offset for each
    * SystemStreamPartition that was passed in.

http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
index 2b1bdab..b253f98 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
@@ -90,7 +90,7 @@ public class TestContainerAllocator {
       containers.put(i, container);
     }
     JobModel jobModel = new JobModel(config, containers);
-    return new JobCoordinator(jobModel, server);
+    return new JobCoordinator(jobModel, server, null);
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
index 0c7a09f..93e430b 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
@@ -99,7 +99,7 @@ public class TestHostAwareContainerAllocator {
       containers.put(i, container);
     }
     JobModel jobModel = new JobModel(getConfig(), containers);
-    return new JobCoordinator(jobModel, server);
+    return new JobCoordinator(jobModel, server, null);
   }
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
index 9da1edf..ff0fcde 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
@@ -109,7 +109,7 @@ public class TestSamzaTaskManager {
     when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
 
     JobModel jobModel = new JobModel(getConfig(), containers, mockLocalityManager);
-    return new JobCoordinator(jobModel, server);
+    return new JobCoordinator(jobModel, server, null);
   }
 
   @Before