You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/05/06 18:09:56 UTC

samza git commit: SAMZA-943 - Occasional test failure: TestStreamPartitionCountMonitor.testStartStopBehavior

Repository: samza
Updated Branches:
  refs/heads/master 63ccf5eb1 -> d4936b899


SAMZA-943 - Occasional test failure: TestStreamPartitionCountMonitor.testStartStopBehavior


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d4936b89
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d4936b89
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d4936b89

Branch: refs/heads/master
Commit: d4936b8993fd15aad0050c0465449fdc16a9992f
Parents: 63ccf5e
Author: Jacob Maes <ja...@gmail.com>
Authored: Fri May 6 10:27:47 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Fri May 6 10:27:47 2016 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |  14 +-
 .../StreamPartitionCountMonitor.java            | 201 +++++++++++++++++++
 .../samza/coordinator/JobCoordinator.scala      |  17 +-
 .../StreamPartitionCountMonitor.scala           | 108 ----------
 .../TestStreamPartitionCountMonitor.scala       |  90 +++++++--
 5 files changed, 286 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/d4936b89/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 8d77486..fad7b55 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -19,13 +19,13 @@
 --> 
 <import-control pkg="org.apache.samza">
 	
-	<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
-	
-	<!-- common library dependencies -->
-	<allow pkg="java" />
-	<allow pkg="javax.management" />
-	<allow pkg="org.slf4j" />
-	<allow pkg="org.junit" />
+    <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
+
+    <!-- common library dependencies -->
+    <allow pkg="java" />
+    <allow pkg="javax.management" />
+    <allow pkg="org.slf4j" />
+    <allow pkg="org.junit" />
     <allow pkg="org.codehaus" />
     <allow pkg="org.mockito" />
     <allow pkg="org.apache.log4j" />

http://git-wip-us.apache.org/repos/asf/samza/blob/d4936b89/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
new file mode 100644
index 0000000..8652465
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
+
+
+/**
+ * Periodically monitors the partition count for each system stream and emits a metric
+ * for each system stream indicating the delta partition count since the monitor was created.
+ */
+public class StreamPartitionCountMonitor {
+  private static final Logger log = LoggerFactory.getLogger(StreamPartitionCountMonitor.class);
+
+  private enum State { INIT, RUNNING, STOPPED }
+
+  private final Set<SystemStream> streamsToMonitor;
+  private final StreamMetadataCache metadataCache;
+  private final MetricsRegistryMap metrics;
+  private final int monitorPeriodMs;
+  private final Map<SystemStream, Gauge<Integer>> gauges;
+  private final Map<SystemStream, SystemStreamMetadata> initialMetadata;
+
+  // Used to guard write access to state.
+  private final Object lock = new Object();
+  private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryImpl();
+  private final ScheduledExecutorService schedulerService =
+      Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
+
+  private volatile State state = State.INIT;
+
+
+  /**
+   * Gets the metadata for all the specified system streams from the provided metadata cache.
+   * Handles scala-java conversions.
+   *
+   * @param streamsToMonitor  the set of system streams for which the metadata is needed.
+   * @param metadataCache     the metadata cache which will be used to fetch metadata.
+   * @return                  a map from each system stream to its metadata.
+   */
+  private static Map<SystemStream, SystemStreamMetadata> getMetadata(Set<SystemStream> streamsToMonitor,
+      StreamMetadataCache metadataCache) {
+    return JavaConversions
+        .mapAsJavaMap(metadataCache.getStreamMetadata(JavaConversions.asScalaSet(streamsToMonitor).<SystemStream>toSet(), true));
+  }
+
+  /**
+   * Default constructor.
+   *
+   * @param streamsToMonitor  a set of SystemStreams to monitor.
+   * @param metadataCache     the metadata cache which will be used to fetch metadata for partition counts.
+   * @param metrics           the metrics registry to which the metrics should be added.
+   * @param monitorPeriodMs   the period at which the monitor will run in milliseconds.
+   */
+  public StreamPartitionCountMonitor(Set<SystemStream> streamsToMonitor, StreamMetadataCache metadataCache,
+      MetricsRegistryMap metrics, int monitorPeriodMs) {
+    this.streamsToMonitor = streamsToMonitor;
+    this.metadataCache = metadataCache;
+    this.metrics = metrics;
+    this.monitorPeriodMs = monitorPeriodMs;
+    this.initialMetadata = getMetadata(streamsToMonitor, metadataCache);
+
+    // Pre-populate the gauges
+    Map<SystemStream, Gauge<Integer>> mutableGauges = new HashMap<>();
+    for (Map.Entry<SystemStream, SystemStreamMetadata> metadataEntry : initialMetadata.entrySet()) {
+      SystemStream systemStream = metadataEntry.getKey();
+      Gauge gauge = metrics.newGauge("job-coordinator",
+          String.format("%s-%s-partitionCount", systemStream.getSystem(), systemStream.getStream()), 0);
+      mutableGauges.put(systemStream, gauge);
+    }
+    gauges = Collections.unmodifiableMap(mutableGauges);
+  }
+
+  /**
+   * Fetches the current partition count for each system stream from the cache, compares the current count to the
+   * original count and updates the metric for that system stream with the delta.
+   */
+  void updatePartitionCountMetric() {
+    try {
+      Map<SystemStream, SystemStreamMetadata> currentMetadata = getMetadata(streamsToMonitor, metadataCache);
+
+      for (Map.Entry<SystemStream, SystemStreamMetadata> metadataEntry : initialMetadata.entrySet()) {
+        SystemStream systemStream = metadataEntry.getKey();
+        SystemStreamMetadata metadata = metadataEntry.getValue();
+
+        int currentPartitionCount = currentMetadata.get(systemStream).getSystemStreamPartitionMetadata().keySet().size();
+        int prevPartitionCount = metadata.getSystemStreamPartitionMetadata().keySet().size();
+
+        Gauge gauge = gauges.get(systemStream);
+        gauge.set(currentPartitionCount - prevPartitionCount);
+      }
+    } catch (Exception e) {
+      log.error("Exception while updating partition count metric.", e);
+    }
+  }
+
+  /**
+   * For testing. Returns the metrics.
+   */
+  Map<SystemStream, Gauge<Integer>> getGauges() {
+    return gauges;
+  }
+
+  /**
+   * Starts the monitor.
+   */
+  public void start() {
+    synchronized (lock) {
+      switch (state) {
+        case INIT:
+          schedulerService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+              updatePartitionCountMetric();
+            }
+          }, monitorPeriodMs, monitorPeriodMs, TimeUnit.MILLISECONDS);
+
+          state = State.RUNNING;
+          break;
+
+        case RUNNING:
+          // start is idempotent
+          return;
+
+        case STOPPED:
+          throw new IllegalStateException("StreamPartitionCountMonitor was stopped and cannot be restarted.");
+      }
+    }
+  }
+
+  /**
+   * Stops the monitor. Once it stops, it cannot be restarted.
+   */
+  public void stop() {
+    synchronized (lock) {
+      // We could also wait for full termination of the scheduler service, but it is overkill for
+      // our use case.
+      schedulerService.shutdownNow();
+
+      state = State.STOPPED;
+    }
+  }
+
+  /**
+   * For testing.
+   */
+  boolean isRunning() {
+    return state == State.RUNNING;
+  }
+
+  /**
+   * Wait until this service has shutdown. Returns true if shutdown occurred within the timeout
+   * and false otherwise.
+   * <p>
+   * This is currently exposed at the package private level for tests only.
+   */
+  boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+    return schedulerService.awaitTermination(timeout, unit);
+  }
+
+  private static class ThreadFactoryImpl implements ThreadFactory  {
+    private static final String PREFIX = "Samza-" + StreamPartitionCountMonitor.class.getSimpleName() + "-";
+    private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();
+
+    public Thread newThread(Runnable runnable) {
+      return new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4936b89/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 384b2e7..03f48db 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -52,7 +52,6 @@ object JobCoordinator extends Logging {
    */
   @volatile var currentJobCoordinator: JobCoordinator = null
   val jobModelRef: AtomicReference[JobModel] = new AtomicReference[JobModel]()
-  var streamPartitionCountMonitor: StreamPartitionCountMonitor = null
 
   /**
    * @param coordinatorSystemConfig A config object that contains job.name,
@@ -87,21 +86,22 @@ object JobCoordinator extends Logging {
     }).toMap
 
     val streamMetadataCache = new StreamMetadataCache(systemAdmins)
+    var streamPartitionCountMonitor: StreamPartitionCountMonitor = null
     if (config.getMonitorPartitionChange) {
       val extendedSystemAdmins = systemAdmins.filter{
-        case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin]
-      }
+                                                      case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin]
+                                                    }
       val inputStreamsToMonitor = config.getInputStreams.filter(systemStream => extendedSystemAdmins.containsKey(systemStream.getSystem))
       if (inputStreamsToMonitor.nonEmpty) {
         streamPartitionCountMonitor = new StreamPartitionCountMonitor(
-          inputStreamsToMonitor,
+          setAsJavaSet(inputStreamsToMonitor),
           streamMetadataCache,
           metricsRegistryMap,
           config.getMonitorPartitionChangeFrequency)
       }
     }
 
-    val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache)
+    val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache, streamPartitionCountMonitor)
     createChangeLogStreams(config, jobCoordinator.jobModel.maxChangeLogStreamPartitions, streamMetadataCache)
 
     jobCoordinator
@@ -115,7 +115,8 @@ object JobCoordinator extends Logging {
   def getJobCoordinator(config: Config,
                         changelogManager: ChangelogPartitionManager,
                         localityManager: LocalityManager,
-                        streamMetadataCache: StreamMetadataCache) = {
+                        streamMetadataCache: StreamMetadataCache,
+                        streamPartitionCountMonitor: StreamPartitionCountMonitor) = {
     val jobModel: JobModel = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache)
     jobModelRef.set(jobModel)
 
@@ -318,7 +319,7 @@ class JobCoordinator(
       server.start
       if (streamPartitionCountMonitor != null) {
         debug("Starting Stream Partition Count Monitor..")
-        streamPartitionCountMonitor.startMonitor()
+        streamPartitionCountMonitor.start()
       }
       info("Started HTTP server: %s" format server.getUrl)
     }
@@ -329,7 +330,7 @@ class JobCoordinator(
       debug("Stopping HTTP server.")
       if (streamPartitionCountMonitor != null) {
         debug("Stopping Stream Partition Count Monitor..")
-        streamPartitionCountMonitor.stopMonitor()
+        streamPartitionCountMonitor.stop()
       }
       server.stop
       info("Stopped HTTP server.")

http://git-wip-us.apache.org/repos/asf/samza/blob/d4936b89/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala
deleted file mode 100644
index 6aeff57..0000000
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.coordinator
-
-import java.util.concurrent.atomic.AtomicBoolean
-import org.apache.samza.metrics.{Gauge, MetricsRegistryMap}
-import org.apache.samza.system.{StreamMetadataCache, SystemStream, SystemStreamMetadata}
-import org.apache.samza.util.Logging
-
-private[coordinator] class StreamPartitionCountMonitor (
-  val streamsToMonitor: Set[SystemStream],
-  val metadataCache: StreamMetadataCache,
-  val metrics: MetricsRegistryMap,
-  val monitorFrequency: Int = 300000) extends Logging {
-
-  val initialMetadata: Map[SystemStream, SystemStreamMetadata] = metadataCache.getStreamMetadata(streamsToMonitor, true)
-  val gauges = new java.util.HashMap[SystemStream, Gauge[Int]]()
-  private val running: AtomicBoolean = new AtomicBoolean(false)
-  private var thread: Thread = null
-  private val lock = new Object
-
-  private def getMonitorThread(): Thread = {
-    new Thread(new Runnable {
-      override def run(): Unit = {
-        while (running.get()) {
-          try {
-            var currentMetadata: Map[SystemStream, SystemStreamMetadata] = Map[SystemStream, SystemStreamMetadata]()
-            currentMetadata = metadataCache.getStreamMetadata(streamsToMonitor, true)
-            initialMetadata.map {
-              case (systemStream, metadata) => {
-                val currentPartitionCount = currentMetadata(systemStream).getSystemStreamPartitionMetadata.keySet().size()
-                val prevPartitionCount = metadata.getSystemStreamPartitionMetadata.keySet().size()
-
-                val gauge = if (gauges.containsKey(systemStream)) {
-                  gauges.get(systemStream)
-                } else {
-                  metrics.newGauge[Int](
-                    "job-coordinator",
-                    String.format("%s-%s-partitionCount", systemStream.getSystem, systemStream.getStream),
-                    0)
-                }
-                gauge.set(currentPartitionCount - prevPartitionCount)
-                gauges.put(systemStream, gauge)
-              }
-            }
-            lock synchronized {
-              lock.wait(monitorFrequency)
-            }
-          } catch {
-            case ie: InterruptedException =>
-              info("Received Interrupted Exception: %s" format ie, ie)
-            case e: Exception =>
-              warn("Received Exception: %s" format e, e)
-          }
-        }
-      }
-    })
-  }
-
-  def startMonitor(): Unit = {
-    if (thread == null || !thread.isAlive) {
-      thread = getMonitorThread()
-      running.set(true)
-      thread.start()
-    }
-  }
-
-  /**
-   * Used in unit tests only
-   * @return Returns true if the monitor thread is running and false, otherwise
-   */
-  def isRunning(): Boolean = {
-    thread != null && thread.isAlive
-  }
-
-  def stopMonitor(): Unit = {
-    try {
-      running.set(false)
-      lock synchronized  {
-        lock.notify()
-      }
-      if (thread != null) {
-        thread.join(monitorFrequency)
-      }
-    } catch {
-      case e: Exception =>
-        println("[STOP MONITOR] Received Exception: %s" format e)
-        e.printStackTrace()
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/d4936b89/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
index f47f818..99ee0bd 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
@@ -19,10 +19,12 @@
 
 package org.apache.samza.coordinator
 
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
 import org.apache.samza.Partition
 import org.apache.samza.metrics.{Gauge, MetricsRegistryMap}
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{SystemAdmin, StreamMetadataCache, SystemStream, SystemStreamMetadata}
+import org.apache.samza.system.{StreamMetadataCache, SystemAdmin, SystemStream, SystemStreamMetadata}
 import org.junit.Assert._
 import org.junit.Test
 import org.mockito.Matchers
@@ -31,6 +33,9 @@ import org.mockito.Mockito._
 import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mock.MockitoSugar
 
+import scala.collection.JavaConversions
+
+
 class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSugar {
 
   @Test
@@ -61,27 +66,25 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
 
     when(mockMetadataCache.getStreamMetadata(any(classOf[Set[SystemStream]]), Matchers.eq(true)))
       .thenReturn(initialMetadata)  // Called during StreamPartitionCountMonitor instantiation
-      .thenReturn(initialMetadata)  // Called when monitor thread is started
       .thenReturn(finalMetadata)  // Called from monitor thread the second time
-      .thenReturn(finalMetadata)
+
+    val metrics = new MetricsRegistryMap()
 
     val partitionCountMonitor = new StreamPartitionCountMonitor(
-      inputSystemStreamSet,
+      JavaConversions.setAsJavaSet(inputSystemStreamSet),
       mockMetadataCache,
-      new MetricsRegistryMap(),
+      metrics,
       5
     )
 
-    partitionCountMonitor.startMonitor()
-    Thread.sleep(50)
-    partitionCountMonitor.stopMonitor()
+    partitionCountMonitor.updatePartitionCountMetric()
 
-    assertNotNull(partitionCountMonitor.gauges.get(inputSystemStream))
-    assertEquals(1, partitionCountMonitor.gauges.get(inputSystemStream).getValue)
+    assertNotNull(partitionCountMonitor.getGauges().get(inputSystemStream))
+    assertEquals(1, partitionCountMonitor.getGauges().get(inputSystemStream).getValue)
 
-    assertNotNull(partitionCountMonitor.metrics.getGroup("job-coordinator"))
+    assertNotNull(metrics.getGroup("job-coordinator"))
 
-    val metricGroup = partitionCountMonitor.metrics.getGroup("job-coordinator")
+    val metricGroup = metrics.getGroup("job-coordinator")
     assertTrue(metricGroup.get("test-system-test-stream-partitionCount").isInstanceOf[Gauge[Int]])
     assertEquals(1, metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue)
   }
@@ -92,26 +95,71 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
     val inputSystemStream = new SystemStream("test-system", "test-stream")
     val inputSystemStreamSet = Set[SystemStream](inputSystemStream)
     val monitor = new StreamPartitionCountMonitor(
-      inputSystemStreamSet,
+      JavaConversions.setAsJavaSet(inputSystemStreamSet),
       mockMetadataCache,
       new MetricsRegistryMap(),
       50
     )
-    monitor.stopMonitor()
-    monitor.startMonitor()
+
+    assertFalse(monitor.isRunning())
+
+    // Normal start
+    monitor.start()
     assertTrue(monitor.isRunning())
-    monitor.startMonitor()
+
+    // Start should be idempotent
+    monitor.start()
     assertTrue(monitor.isRunning())
-    monitor.stopMonitor()
+
+    // Normal stop
+    monitor.stop()
+    assertTrue(monitor.awaitTermination(5, TimeUnit.SECONDS));
     assertFalse(monitor.isRunning())
-    monitor.startMonitor()
-    assertTrue(monitor.isRunning())
-    monitor.stopMonitor()
+
+    // Cannot restart a stopped instance
+    try
+    {
+      monitor.start()
+      fail("IllegalStateException should have been thrown")
+    } catch {
+      case e: IllegalStateException => assertTrue(true)
+      case _: Throwable => fail("IllegalStateException should have been thrown")
+    }
     assertFalse(monitor.isRunning())
-    monitor.stopMonitor()
+
+    // Stop should be idempotent
+    monitor.stop()
     assertFalse(monitor.isRunning())
   }
 
+  @Test
+  def testScheduler(): Unit = {
+    val mockMetadataCache = new MockStreamMetadataCache
+    val inputSystemStream = new SystemStream("test-system", "test-stream")
+    val inputSystemStreamSet = Set[SystemStream](inputSystemStream)
+    val sampleCount = new CountDownLatch(2); // Verify 2 invocations
+
+    val monitor = new StreamPartitionCountMonitor(
+      JavaConversions.setAsJavaSet(inputSystemStreamSet),
+      mockMetadataCache,
+      new MetricsRegistryMap(),
+      50
+    ) {
+      override def updatePartitionCountMetric(): Unit = {
+        sampleCount.countDown()
+      }
+    }
+
+    monitor.start()
+    try {
+      if (!sampleCount.await(5, TimeUnit.SECONDS)) {
+        fail("Did not see all metric updates. Remaining count: " + sampleCount.getCount)
+      }
+    } finally {
+      monitor.stop()
+    }
+  }
+
   class MockStreamMetadataCache extends StreamMetadataCache(Map[String, SystemAdmin]()) {
     /**
      * Returns metadata about each of the given streams (such as first offset, newest