You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/05/06 18:09:56 UTC
samza git commit: SAMZA-943 - Occasional test failure:
TestStreamPartitionCountMonitor.testStartStopBehavior
Repository: samza
Updated Branches:
refs/heads/master 63ccf5eb1 -> d4936b899
SAMZA-943 - Occasional test failure: TestStreamPartitionCountMonitor.testStartStopBehavior
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d4936b89
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d4936b89
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d4936b89
Branch: refs/heads/master
Commit: d4936b8993fd15aad0050c0465449fdc16a9992f
Parents: 63ccf5e
Author: Jacob Maes <ja...@gmail.com>
Authored: Fri May 6 10:27:47 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Fri May 6 10:27:47 2016 -0700
----------------------------------------------------------------------
checkstyle/import-control.xml | 14 +-
.../StreamPartitionCountMonitor.java | 201 +++++++++++++++++++
.../samza/coordinator/JobCoordinator.scala | 17 +-
.../StreamPartitionCountMonitor.scala | 108 ----------
.../TestStreamPartitionCountMonitor.scala | 90 +++++++--
5 files changed, 286 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/d4936b89/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 8d77486..fad7b55 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -19,13 +19,13 @@
-->
<import-control pkg="org.apache.samza">
- <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
-
- <!-- common library dependencies -->
- <allow pkg="java" />
- <allow pkg="javax.management" />
- <allow pkg="org.slf4j" />
- <allow pkg="org.junit" />
+ <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
+
+ <!-- common library dependencies -->
+ <allow pkg="java" />
+ <allow pkg="javax.management" />
+ <allow pkg="org.slf4j" />
+ <allow pkg="org.junit" />
<allow pkg="org.codehaus" />
<allow pkg="org.mockito" />
<allow pkg="org.apache.log4j" />
http://git-wip-us.apache.org/repos/asf/samza/blob/d4936b89/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
new file mode 100644
index 0000000..8652465
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
+
+
+/**
+ * Periodically monitors the partition count for each system stream and emits a metric
+ * for each system stream indicating the delta partition count since the monitor was created.
+ */
+public class StreamPartitionCountMonitor {
+ private static final Logger log = LoggerFactory.getLogger(StreamPartitionCountMonitor.class);
+
+ private enum State { INIT, RUNNING, STOPPED }
+
+ private final Set<SystemStream> streamsToMonitor;
+ private final StreamMetadataCache metadataCache;
+ private final MetricsRegistryMap metrics;
+ private final int monitorPeriodMs;
+ private final Map<SystemStream, Gauge<Integer>> gauges;
+ private final Map<SystemStream, SystemStreamMetadata> initialMetadata;
+
+ // Used to guard write access to state.
+ private final Object lock = new Object();
+ private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryImpl();
+ private final ScheduledExecutorService schedulerService =
+ Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
+
+ private volatile State state = State.INIT;
+
+
+ /**
+ * Gets the metadata for all the specified system streams from the provided metadata cache.
+ * Handles scala-java conversions.
+ *
+ * @param streamsToMonitor the set of system streams for which the metadata is needed.
+ * @param metadataCache the metadata cache which will be used to fetch metadata.
+ * @return a map from each system stream to its metadata.
+ */
+ private static Map<SystemStream, SystemStreamMetadata> getMetadata(Set<SystemStream> streamsToMonitor,
+ StreamMetadataCache metadataCache) {
+ return JavaConversions
+ .mapAsJavaMap(metadataCache.getStreamMetadata(JavaConversions.asScalaSet(streamsToMonitor).<SystemStream>toSet(), true));
+ }
+
+ /**
+ * Default constructor.
+ *
+ * @param streamsToMonitor a set of SystemStreams to monitor.
+ * @param metadataCache the metadata cache which will be used to fetch metadata for partition counts.
+ * @param metrics the metrics registry to which the metrics should be added.
+ * @param monitorPeriodMs the period at which the monitor will run in milliseconds.
+ */
+ public StreamPartitionCountMonitor(Set<SystemStream> streamsToMonitor, StreamMetadataCache metadataCache,
+ MetricsRegistryMap metrics, int monitorPeriodMs) {
+ this.streamsToMonitor = streamsToMonitor;
+ this.metadataCache = metadataCache;
+ this.metrics = metrics;
+ this.monitorPeriodMs = monitorPeriodMs;
+ this.initialMetadata = getMetadata(streamsToMonitor, metadataCache);
+
+ // Pre-populate the gauges
+ Map<SystemStream, Gauge<Integer>> mutableGauges = new HashMap<>();
+ for (Map.Entry<SystemStream, SystemStreamMetadata> metadataEntry : initialMetadata.entrySet()) {
+ SystemStream systemStream = metadataEntry.getKey();
+ Gauge gauge = metrics.newGauge("job-coordinator",
+ String.format("%s-%s-partitionCount", systemStream.getSystem(), systemStream.getStream()), 0);
+ mutableGauges.put(systemStream, gauge);
+ }
+ gauges = Collections.unmodifiableMap(mutableGauges);
+ }
+
+ /**
+ * Fetches the current partition count for each system stream from the cache, compares the current count to the
+ * original count and updates the metric for that system stream with the delta.
+ */
+ void updatePartitionCountMetric() {
+ try {
+ Map<SystemStream, SystemStreamMetadata> currentMetadata = getMetadata(streamsToMonitor, metadataCache);
+
+ for (Map.Entry<SystemStream, SystemStreamMetadata> metadataEntry : initialMetadata.entrySet()) {
+ SystemStream systemStream = metadataEntry.getKey();
+ SystemStreamMetadata metadata = metadataEntry.getValue();
+
+ int currentPartitionCount = currentMetadata.get(systemStream).getSystemStreamPartitionMetadata().keySet().size();
+ int prevPartitionCount = metadata.getSystemStreamPartitionMetadata().keySet().size();
+
+ Gauge gauge = gauges.get(systemStream);
+ gauge.set(currentPartitionCount - prevPartitionCount);
+ }
+ } catch (Exception e) {
+ log.error("Exception while updating partition count metric.", e);
+ }
+ }
+
+ /**
+ * For testing. Returns the metrics.
+ */
+ Map<SystemStream, Gauge<Integer>> getGauges() {
+ return gauges;
+ }
+
+ /**
+ * Starts the monitor.
+ */
+ public void start() {
+ synchronized (lock) {
+ switch (state) {
+ case INIT:
+ schedulerService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ updatePartitionCountMetric();
+ }
+ }, monitorPeriodMs, monitorPeriodMs, TimeUnit.MILLISECONDS);
+
+ state = State.RUNNING;
+ break;
+
+ case RUNNING:
+ // start is idempotent
+ return;
+
+ case STOPPED:
+ throw new IllegalStateException("StreamPartitionCountMonitor was stopped and cannot be restarted.");
+ }
+ }
+ }
+
+ /**
+ * Stops the monitor. Once it stops, it cannot be restarted.
+ */
+ public void stop() {
+ synchronized (lock) {
+ // We could also wait for full termination of the scheduler service, but it is overkill for
+ // our use case.
+ schedulerService.shutdownNow();
+
+ state = State.STOPPED;
+ }
+ }
+
+ /**
+ * For testing.
+ */
+ boolean isRunning() {
+ return state == State.RUNNING;
+ }
+
+ /**
+ * Wait until this service has shutdown. Returns true if shutdown occurred within the timeout
+ * and false otherwise.
+ * <p>
+ * This is currently exposed at the package private level for tests only.
+ */
+ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return schedulerService.awaitTermination(timeout, unit);
+ }
+
+ private static class ThreadFactoryImpl implements ThreadFactory {
+ private static final String PREFIX = "Samza-" + StreamPartitionCountMonitor.class.getSimpleName() + "-";
+ private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();
+
+ public Thread newThread(Runnable runnable) {
+ return new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/d4936b89/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 384b2e7..03f48db 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -52,7 +52,6 @@ object JobCoordinator extends Logging {
*/
@volatile var currentJobCoordinator: JobCoordinator = null
val jobModelRef: AtomicReference[JobModel] = new AtomicReference[JobModel]()
- var streamPartitionCountMonitor: StreamPartitionCountMonitor = null
/**
* @param coordinatorSystemConfig A config object that contains job.name,
@@ -87,21 +86,22 @@ object JobCoordinator extends Logging {
}).toMap
val streamMetadataCache = new StreamMetadataCache(systemAdmins)
+ var streamPartitionCountMonitor: StreamPartitionCountMonitor = null
if (config.getMonitorPartitionChange) {
val extendedSystemAdmins = systemAdmins.filter{
- case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin]
- }
+ case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin]
+ }
val inputStreamsToMonitor = config.getInputStreams.filter(systemStream => extendedSystemAdmins.containsKey(systemStream.getSystem))
if (inputStreamsToMonitor.nonEmpty) {
streamPartitionCountMonitor = new StreamPartitionCountMonitor(
- inputStreamsToMonitor,
+ setAsJavaSet(inputStreamsToMonitor),
streamMetadataCache,
metricsRegistryMap,
config.getMonitorPartitionChangeFrequency)
}
}
- val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache)
+ val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache, streamPartitionCountMonitor)
createChangeLogStreams(config, jobCoordinator.jobModel.maxChangeLogStreamPartitions, streamMetadataCache)
jobCoordinator
@@ -115,7 +115,8 @@ object JobCoordinator extends Logging {
def getJobCoordinator(config: Config,
changelogManager: ChangelogPartitionManager,
localityManager: LocalityManager,
- streamMetadataCache: StreamMetadataCache) = {
+ streamMetadataCache: StreamMetadataCache,
+ streamPartitionCountMonitor: StreamPartitionCountMonitor) = {
val jobModel: JobModel = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache)
jobModelRef.set(jobModel)
@@ -318,7 +319,7 @@ class JobCoordinator(
server.start
if (streamPartitionCountMonitor != null) {
debug("Starting Stream Partition Count Monitor..")
- streamPartitionCountMonitor.startMonitor()
+ streamPartitionCountMonitor.start()
}
info("Started HTTP server: %s" format server.getUrl)
}
@@ -329,7 +330,7 @@ class JobCoordinator(
debug("Stopping HTTP server.")
if (streamPartitionCountMonitor != null) {
debug("Stopping Stream Partition Count Monitor..")
- streamPartitionCountMonitor.stopMonitor()
+ streamPartitionCountMonitor.stop()
}
server.stop
info("Stopped HTTP server.")
http://git-wip-us.apache.org/repos/asf/samza/blob/d4936b89/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala
deleted file mode 100644
index 6aeff57..0000000
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.coordinator
-
-import java.util.concurrent.atomic.AtomicBoolean
-import org.apache.samza.metrics.{Gauge, MetricsRegistryMap}
-import org.apache.samza.system.{StreamMetadataCache, SystemStream, SystemStreamMetadata}
-import org.apache.samza.util.Logging
-
-private[coordinator] class StreamPartitionCountMonitor (
- val streamsToMonitor: Set[SystemStream],
- val metadataCache: StreamMetadataCache,
- val metrics: MetricsRegistryMap,
- val monitorFrequency: Int = 300000) extends Logging {
-
- val initialMetadata: Map[SystemStream, SystemStreamMetadata] = metadataCache.getStreamMetadata(streamsToMonitor, true)
- val gauges = new java.util.HashMap[SystemStream, Gauge[Int]]()
- private val running: AtomicBoolean = new AtomicBoolean(false)
- private var thread: Thread = null
- private val lock = new Object
-
- private def getMonitorThread(): Thread = {
- new Thread(new Runnable {
- override def run(): Unit = {
- while (running.get()) {
- try {
- var currentMetadata: Map[SystemStream, SystemStreamMetadata] = Map[SystemStream, SystemStreamMetadata]()
- currentMetadata = metadataCache.getStreamMetadata(streamsToMonitor, true)
- initialMetadata.map {
- case (systemStream, metadata) => {
- val currentPartitionCount = currentMetadata(systemStream).getSystemStreamPartitionMetadata.keySet().size()
- val prevPartitionCount = metadata.getSystemStreamPartitionMetadata.keySet().size()
-
- val gauge = if (gauges.containsKey(systemStream)) {
- gauges.get(systemStream)
- } else {
- metrics.newGauge[Int](
- "job-coordinator",
- String.format("%s-%s-partitionCount", systemStream.getSystem, systemStream.getStream),
- 0)
- }
- gauge.set(currentPartitionCount - prevPartitionCount)
- gauges.put(systemStream, gauge)
- }
- }
- lock synchronized {
- lock.wait(monitorFrequency)
- }
- } catch {
- case ie: InterruptedException =>
- info("Received Interrupted Exception: %s" format ie, ie)
- case e: Exception =>
- warn("Received Exception: %s" format e, e)
- }
- }
- }
- })
- }
-
- def startMonitor(): Unit = {
- if (thread == null || !thread.isAlive) {
- thread = getMonitorThread()
- running.set(true)
- thread.start()
- }
- }
-
- /**
- * Used in unit tests only
- * @return Returns true if the monitor thread is running and false, otherwise
- */
- def isRunning(): Boolean = {
- thread != null && thread.isAlive
- }
-
- def stopMonitor(): Unit = {
- try {
- running.set(false)
- lock synchronized {
- lock.notify()
- }
- if (thread != null) {
- thread.join(monitorFrequency)
- }
- } catch {
- case e: Exception =>
- println("[STOP MONITOR] Received Exception: %s" format e)
- e.printStackTrace()
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/d4936b89/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
index f47f818..99ee0bd 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
@@ -19,10 +19,12 @@
package org.apache.samza.coordinator
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
import org.apache.samza.Partition
import org.apache.samza.metrics.{Gauge, MetricsRegistryMap}
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{SystemAdmin, StreamMetadataCache, SystemStream, SystemStreamMetadata}
+import org.apache.samza.system.{StreamMetadataCache, SystemAdmin, SystemStream, SystemStreamMetadata}
import org.junit.Assert._
import org.junit.Test
import org.mockito.Matchers
@@ -31,6 +33,9 @@ import org.mockito.Mockito._
import org.scalatest.junit.AssertionsForJUnit
import org.scalatest.mock.MockitoSugar
+import scala.collection.JavaConversions
+
+
class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSugar {
@Test
@@ -61,27 +66,25 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
when(mockMetadataCache.getStreamMetadata(any(classOf[Set[SystemStream]]), Matchers.eq(true)))
.thenReturn(initialMetadata) // Called during StreamPartitionCountMonitor instantiation
- .thenReturn(initialMetadata) // Called when monitor thread is started
.thenReturn(finalMetadata) // Called from monitor thread the second time
- .thenReturn(finalMetadata)
+
+ val metrics = new MetricsRegistryMap()
val partitionCountMonitor = new StreamPartitionCountMonitor(
- inputSystemStreamSet,
+ JavaConversions.setAsJavaSet(inputSystemStreamSet),
mockMetadataCache,
- new MetricsRegistryMap(),
+ metrics,
5
)
- partitionCountMonitor.startMonitor()
- Thread.sleep(50)
- partitionCountMonitor.stopMonitor()
+ partitionCountMonitor.updatePartitionCountMetric()
- assertNotNull(partitionCountMonitor.gauges.get(inputSystemStream))
- assertEquals(1, partitionCountMonitor.gauges.get(inputSystemStream).getValue)
+ assertNotNull(partitionCountMonitor.getGauges().get(inputSystemStream))
+ assertEquals(1, partitionCountMonitor.getGauges().get(inputSystemStream).getValue)
- assertNotNull(partitionCountMonitor.metrics.getGroup("job-coordinator"))
+ assertNotNull(metrics.getGroup("job-coordinator"))
- val metricGroup = partitionCountMonitor.metrics.getGroup("job-coordinator")
+ val metricGroup = metrics.getGroup("job-coordinator")
assertTrue(metricGroup.get("test-system-test-stream-partitionCount").isInstanceOf[Gauge[Int]])
assertEquals(1, metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue)
}
@@ -92,26 +95,71 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
val inputSystemStream = new SystemStream("test-system", "test-stream")
val inputSystemStreamSet = Set[SystemStream](inputSystemStream)
val monitor = new StreamPartitionCountMonitor(
- inputSystemStreamSet,
+ JavaConversions.setAsJavaSet(inputSystemStreamSet),
mockMetadataCache,
new MetricsRegistryMap(),
50
)
- monitor.stopMonitor()
- monitor.startMonitor()
+
+ assertFalse(monitor.isRunning())
+
+ // Normal start
+ monitor.start()
assertTrue(monitor.isRunning())
- monitor.startMonitor()
+
+ // Start should be idempotent
+ monitor.start()
assertTrue(monitor.isRunning())
- monitor.stopMonitor()
+
+ // Normal stop
+ monitor.stop()
+ assertTrue(monitor.awaitTermination(5, TimeUnit.SECONDS));
assertFalse(monitor.isRunning())
- monitor.startMonitor()
- assertTrue(monitor.isRunning())
- monitor.stopMonitor()
+
+ // Cannot restart a stopped instance
+ try
+ {
+ monitor.start()
+ fail("IllegalStateException should have been thrown")
+ } catch {
+ case e: IllegalStateException => assertTrue(true)
+ case _: Throwable => fail("IllegalStateException should have been thrown")
+ }
assertFalse(monitor.isRunning())
- monitor.stopMonitor()
+
+ // Stop should be idempotent
+ monitor.stop()
assertFalse(monitor.isRunning())
}
+ @Test
+ def testScheduler(): Unit = {
+ val mockMetadataCache = new MockStreamMetadataCache
+ val inputSystemStream = new SystemStream("test-system", "test-stream")
+ val inputSystemStreamSet = Set[SystemStream](inputSystemStream)
+ val sampleCount = new CountDownLatch(2); // Verify 2 invocations
+
+ val monitor = new StreamPartitionCountMonitor(
+ JavaConversions.setAsJavaSet(inputSystemStreamSet),
+ mockMetadataCache,
+ new MetricsRegistryMap(),
+ 50
+ ) {
+ override def updatePartitionCountMetric(): Unit = {
+ sampleCount.countDown()
+ }
+ }
+
+ monitor.start()
+ try {
+ if (!sampleCount.await(5, TimeUnit.SECONDS)) {
+ fail("Did not see all metric updates. Remaining count: " + sampleCount.getCount)
+ }
+ } finally {
+ monitor.stop()
+ }
+ }
+
class MockStreamMetadataCache extends StreamMetadataCache(Map[String, SystemAdmin]()) {
/**
* Returns metadata about each of the given streams (such as first offset, newest