You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/03/25 03:22:08 UTC
samza git commit: SAMZA-882 : Detect partition count changes in input
streams
Repository: samza
Updated Branches:
refs/heads/master 04ae6f5ce -> a51f7b2b9
SAMZA-882 : Detect partition count changes in input streams
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a51f7b2b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a51f7b2b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a51f7b2b
Branch: refs/heads/master
Commit: a51f7b2b994cd2af0bec586a405fc426a2f8cf1b
Parents: 04ae6f5
Author: Navina Ramesh <nr...@linkedin.com>
Authored: Thu Mar 24 19:18:53 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Thu Mar 24 19:18:53 2016 -0700
----------------------------------------------------------------------
.../versioned/jobs/configuration-table.html | 21 +++
.../samza/system/ExtendedSystemAdmin.java | 30 ++++
.../org/apache/samza/config/JobConfig.scala | 13 +-
.../samza/coordinator/JobCoordinator.scala | 63 +++++----
.../StreamPartitionCountMonitor.scala | 108 +++++++++++++++
.../samza/system/StreamMetadataCache.scala | 31 +++--
.../main/scala/org/apache/samza/util/Util.scala | 10 +-
.../samza/coordinator/TestJobCoordinator.scala | 34 +++--
.../TestStreamPartitionCountMonitor.scala | 137 +++++++++++++++++++
.../samza/system/kafka/KafkaSystemAdmin.scala | 41 +++++-
.../samza/job/yarn/TestContainerAllocator.java | 2 +-
.../yarn/TestHostAwareContainerAllocator.java | 2 +-
.../samza/job/yarn/TestSamzaTaskManager.java | 2 +-
13 files changed, 440 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 2745a22..c1c822e 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -184,6 +184,27 @@
</tr>
<tr>
+ <td class="property" id="job-coordinator-monitor-partition-change">job.coordinator.<br />monitor-partition-change</td>
+ <td class="default">false</td>
+ <td class="description">
+ If you are using Kafka for coordinator stream, this configuration enables the Job Coordinator to
+ detect partition count difference in Kafka input topics. On detection, it updates a Gauge
+ metric of format <span class="system">system-name</span>.<span class="stream">stream-name</span>.partitionCount,
+ which indicates the difference in the partition count from the initial state. Please note that currently this
+ feature only works for Kafka-based systems.
+ </td>
+ </tr>
+
+ <tr>
+ <td class="property" id="job-coordinator-monitor-partition-change-frequency-ms">job.coordinator.<br />monitor-partition-change.frequency.ms</td>
+ <td class="default">300000</td>
+ <td class="description">
+ The frequency at which the input streams' partition count change should be detected. This check
+ can be tuned to be pretty low as partition increase is not a common event.
+ </td>
+ </tr>
+
+ <tr>
<td class="property" id="job-config-rewriter-class">job.config.rewriter.<br><span class="rewriter">rewriter-name</span>.class</td>
<td class="default"></td>
<td class="description">
http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java
new file mode 100644
index 0000000..daa2212
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Interface extends the more generic SystemAdmin interface
+ * TODO: Merge this interface method with SystemAdmin when we upgrade to JDK 1.8
+ */
+public interface ExtendedSystemAdmin extends SystemAdmin {
+ Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 4f3e9a2..b59153f 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -19,10 +19,8 @@
package org.apache.samza.config
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.util.Logging
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+import org.apache.samza.util.Logging
object JobConfig {
// job config constants
@@ -49,6 +47,9 @@ object JobConfig {
// is not yet supported, and auto-creation of the topics cannot be always easily tuned off).
// So we add a setting that allows for the job to continue even though number of partitions is not 1.
val JOB_FAIL_CHECKPOINT_VALIDATION = "job.checkpoint.validation.enabled"
+ val MONITOR_PARTITION_CHANGE = "job.coordinator.monitor-partition-change"
+ val MONITOR_PARTITION_CHANGE_FREQUENCY_MS = "job.coordinator.monitor-partition-change.frequency.ms"
+ val DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS = 300000
implicit def Config2Job(config: Config) = new JobConfig(config)
}
@@ -74,6 +75,12 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
}
}
+ def getMonitorPartitionChange = getBoolean(JobConfig.MONITOR_PARTITION_CHANGE, false)
+
+ def getMonitorPartitionChangeFrequency = getInt(
+ JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS,
+ JobConfig.DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS)
+
def getStreamJobFactoryClass = getOption(JobConfig.STREAM_JOB_FACTORY_CLASS)
def getJobId = getOption(JobConfig.JOB_ID)
http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 06a96ad..cd7daa2 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -20,32 +20,26 @@
package org.apache.samza.coordinator
+import java.util
import java.util.concurrent.atomic.AtomicReference
-import org.apache.samza.config.StorageConfig
-import org.apache.samza.job.model.{JobModel, TaskModel}
-import org.apache.samza.config.Config
-import org.apache.samza.SamzaException
-import org.apache.samza.container.grouper.task.TaskNameGrouperFactory
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.config.{Config, StorageConfig}
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
-import java.util
+import org.apache.samza.container.grouper.task.TaskNameGrouperFactory
import org.apache.samza.container.{LocalityManager, TaskName}
-import org.apache.samza.storage.ChangelogPartitionManager
-import org.apache.samza.util.Logging
+import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
+import org.apache.samza.job.model.{JobModel, TaskModel}
import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.util.Util
+import org.apache.samza.storage.ChangelogPartitionManager
+import org.apache.samza.system.{ExtendedSystemAdmin, StreamMetadataCache, SystemFactory, SystemStreamPartition}
+import org.apache.samza.util.{Logging, Util}
+import org.apache.samza.{Partition, SamzaException}
+
import scala.collection.JavaConversions._
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.Partition
-import org.apache.samza.system.StreamMetadataCache
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.coordinator.server.HttpServer
-import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
-import org.apache.samza.coordinator.server.JobServlet
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
/**
* Helper companion object that is responsible for wiring up a JobCoordinator
@@ -58,6 +52,7 @@ object JobCoordinator extends Logging {
*/
@volatile var currentJobCoordinator: JobCoordinator = null
val jobModelRef: AtomicReference[JobModel] = new AtomicReference[JobModel]()
+ var streamPartitionCountMonitor: StreamPartitionCountMonitor = null
/**
* @param coordinatorSystemConfig A config object that contains job.name,
@@ -92,6 +87,19 @@ object JobCoordinator extends Logging {
}).toMap
val streamMetadataCache = new StreamMetadataCache(systemAdmins)
+ if (config.getMonitorPartitionChange) {
+ val extendedSystemAdmins = systemAdmins.filter{
+ case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin]
+ }
+ val inputStreamsToMonitor = config.getInputStreams.filter(systemStream => extendedSystemAdmins.containsKey(systemStream.getSystem))
+ if (inputStreamsToMonitor.nonEmpty) {
+ streamPartitionCountMonitor = new StreamPartitionCountMonitor(
+ inputStreamsToMonitor,
+ streamMetadataCache,
+ metricsRegistryMap,
+ config.getMonitorPartitionChangeFrequency)
+ }
+ }
val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache)
createChangeLogStreams(config, jobCoordinator.jobModel.maxChangeLogStreamPartitions, streamMetadataCache)
@@ -113,7 +121,7 @@ object JobCoordinator extends Logging {
val server = new HttpServer
server.addServlet("/*", new JobServlet(jobModelRef))
- currentJobCoordinator = new JobCoordinator(jobModel, server)
+ currentJobCoordinator = new JobCoordinator(jobModel, server, streamPartitionCountMonitor)
currentJobCoordinator
}
@@ -294,7 +302,8 @@ class JobCoordinator(
/**
* HTTP server used to serve a Samza job's container model to SamzaContainers when they start up.
*/
- val server: HttpServer = null) extends Logging {
+ val server: HttpServer = null,
+ val streamPartitionCountMonitor: StreamPartitionCountMonitor = null) extends Logging {
debug("Got job model: %s." format jobModel)
@@ -302,13 +311,21 @@ class JobCoordinator(
if (server != null) {
debug("Starting HTTP server.")
server.start
- info("Startd HTTP server: %s" format server.getUrl)
+ if (streamPartitionCountMonitor != null) {
+ debug("Starting Stream Partition Count Monitor..")
+ streamPartitionCountMonitor.startMonitor()
+ }
+ info("Started HTTP server: %s" format server.getUrl)
}
}
def stop {
if (server != null) {
debug("Stopping HTTP server.")
+ if (streamPartitionCountMonitor != null) {
+ debug("Stopping Stream Partition Count Monitor..")
+ streamPartitionCountMonitor.stopMonitor()
+ }
server.stop
info("Stopped HTTP server.")
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala
new file mode 100644
index 0000000..6aeff57
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator
+
+import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.samza.metrics.{Gauge, MetricsRegistryMap}
+import org.apache.samza.system.{StreamMetadataCache, SystemStream, SystemStreamMetadata}
+import org.apache.samza.util.Logging
+
+private[coordinator] class StreamPartitionCountMonitor (
+ val streamsToMonitor: Set[SystemStream],
+ val metadataCache: StreamMetadataCache,
+ val metrics: MetricsRegistryMap,
+ val monitorFrequency: Int = 300000) extends Logging {
+
+ val initialMetadata: Map[SystemStream, SystemStreamMetadata] = metadataCache.getStreamMetadata(streamsToMonitor, true)
+ val gauges = new java.util.HashMap[SystemStream, Gauge[Int]]()
+ private val running: AtomicBoolean = new AtomicBoolean(false)
+ private var thread: Thread = null
+ private val lock = new Object
+
+ private def getMonitorThread(): Thread = {
+ new Thread(new Runnable {
+ override def run(): Unit = {
+ while (running.get()) {
+ try {
+ var currentMetadata: Map[SystemStream, SystemStreamMetadata] = Map[SystemStream, SystemStreamMetadata]()
+ currentMetadata = metadataCache.getStreamMetadata(streamsToMonitor, true)
+ initialMetadata.map {
+ case (systemStream, metadata) => {
+ val currentPartitionCount = currentMetadata(systemStream).getSystemStreamPartitionMetadata.keySet().size()
+ val prevPartitionCount = metadata.getSystemStreamPartitionMetadata.keySet().size()
+
+ val gauge = if (gauges.containsKey(systemStream)) {
+ gauges.get(systemStream)
+ } else {
+ metrics.newGauge[Int](
+ "job-coordinator",
+ String.format("%s-%s-partitionCount", systemStream.getSystem, systemStream.getStream),
+ 0)
+ }
+ gauge.set(currentPartitionCount - prevPartitionCount)
+ gauges.put(systemStream, gauge)
+ }
+ }
+ lock synchronized {
+ lock.wait(monitorFrequency)
+ }
+ } catch {
+ case ie: InterruptedException =>
+ info("Received Interrupted Exception: %s" format ie, ie)
+ case e: Exception =>
+ warn("Received Exception: %s" format e, e)
+ }
+ }
+ }
+ })
+ }
+
+ def startMonitor(): Unit = {
+ if (thread == null || !thread.isAlive) {
+ thread = getMonitorThread()
+ running.set(true)
+ thread.start()
+ }
+ }
+
+ /**
+ * Used in unit tests only
+ * @return Returns true if the monitor thread is running and false, otherwise
+ */
+ def isRunning(): Boolean = {
+ thread != null && thread.isAlive
+ }
+
+ def stopMonitor(): Unit = {
+ try {
+ running.set(false)
+ lock synchronized {
+ lock.notify()
+ }
+ if (thread != null) {
+ thread.join(monitorFrequency)
+ }
+ } catch {
+ case e: Exception =>
+ println("[STOP MONITOR] Received Exception: %s" format e)
+ e.printStackTrace()
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
index 155c3d1..18b47ec 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
@@ -43,13 +43,17 @@ class StreamMetadataCache (
private case class CacheEntry(metadata: SystemStreamMetadata, lastRefreshMs: Long)
private var cache = Map[SystemStream, CacheEntry]()
private val lock = new Object
-
/**
* Returns metadata about each of the given streams (such as first offset, newest
* offset, etc). If the metadata isn't in the cache, it is retrieved from the systems
* using the given SystemAdmins.
+ *
+ * @param streams Set of SystemStreams for which the metadata is requested
+ * @param partitionsMetadataOnly Flag to indicate that only partition count metadata should be fetched/refreshed
*/
- def getStreamMetadata(streams: Set[SystemStream]): Map[SystemStream, SystemStreamMetadata] = {
+ def getStreamMetadata(
+ streams: Set[SystemStream],
+ partitionsMetadataOnly: Boolean = false): Map[SystemStream, SystemStreamMetadata] = {
val time = clock.currentTimeMillis
val cacheHits = streams.flatMap(stream => getFromCache(stream, time)).toMap
@@ -57,21 +61,26 @@ class StreamMetadataCache (
.groupBy[String](_.getSystem)
.flatMap {
case (systemName, systemStreams) =>
- systemAdmins
+ val systemAdmin = systemAdmins
.getOrElse(systemName, throw new SamzaException("Cannot get metadata for unknown system: %s" format systemName))
- .getSystemStreamMetadata(systemStreams.map(_.getStream))
- .map {
- case (streamName, metadata) => (new SystemStream(systemName, streamName) -> metadata)
- }
+ val streamToMetadata = if (partitionsMetadataOnly && systemAdmin.isInstanceOf[ExtendedSystemAdmin]) {
+ systemAdmin.asInstanceOf[ExtendedSystemAdmin].getSystemStreamPartitionCounts(systemStreams.map(_.getStream))
+ } else {
+ systemAdmin.getSystemStreamMetadata(systemStreams.map(_.getStream))
+ }
+ streamToMetadata.map {
+ case (streamName, metadata) => (new SystemStream(systemName, streamName) -> metadata)
+ }
}
- .toMap
val allResults = cacheHits ++ cacheMisses
val missing = streams.filter(stream => allResults.getOrElse(stream, null) == null)
if (!missing.isEmpty) {
throw new SamzaException("Cannot get metadata for unknown streams: " + missing.mkString(", "))
}
- cacheMisses.foreach { case (stream, metadata) => addToCache(stream, metadata, time) }
+ if (!partitionsMetadataOnly) {
+ cacheMisses.foreach { case (stream, metadata) => addToCache(stream, metadata, time) }
+ }
allResults
}
@@ -83,9 +92,9 @@ class StreamMetadataCache (
}
}
- private def addToCache(stream: SystemStream, metadata: SystemStreamMetadata, now: Long) {
+ private def addToCache(systemStream: SystemStream, metadata: SystemStreamMetadata, now: Long) {
lock synchronized {
- cache += stream -> CacheEntry(metadata, now)
+ cache += systemStream -> CacheEntry(metadata, now)
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index bd0fe5f..fc3d085 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -208,8 +208,14 @@ object Util extends Logging {
def buildCoordinatorStreamConfig(config: Config) = {
val (jobName, jobId) = getJobNameAndId(config)
// Build a map with just the system config and job.name/job.id. This is what's required to start the JobCoordinator.
- new MapConfig(config.subset(SystemConfig.SYSTEM_PREFIX format config.getCoordinatorSystemName, false) ++
- Map[String, String](JobConfig.JOB_NAME -> jobName, JobConfig.JOB_ID -> jobId, JobConfig.JOB_COORDINATOR_SYSTEM -> config.getCoordinatorSystemName))
+ new MapConfig(
+ config.subset(SystemConfig.SYSTEM_PREFIX format config.getCoordinatorSystemName, false) ++
+ Map[String, String](
+ JobConfig.JOB_NAME -> jobName,
+ JobConfig.JOB_ID -> jobId,
+ JobConfig.JOB_COORDINATOR_SYSTEM -> config.getCoordinatorSystemName,
+ JobConfig.MONITOR_PARTITION_CHANGE -> String.valueOf(config.getMonitorPartitionChange),
+ JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS -> String.valueOf(config.getMonitorPartitionChangeFrequency)))
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 9ab1dd5..110c3a9 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -19,6 +19,8 @@
package org.apache.samza.coordinator
+import java.util
+
import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.util.Util
import org.junit.{After, Test}
@@ -30,18 +32,13 @@ import org.apache.samza.config.SystemConfig
import org.apache.samza.container.{SamzaContainer, TaskName}
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.config.Config
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.system.SystemAdmin
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.system._
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import org.apache.samza.Partition
import org.apache.samza.job.model.JobModel
import org.apache.samza.job.model.ContainerModel
import org.apache.samza.job.model.TaskModel
import org.apache.samza.config.JobConfig
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.SystemConsumer
import org.apache.samza.coordinator.stream.{MockCoordinatorStreamWrappedConsumer, MockCoordinatorStreamSystemFactory}
class TestJobCoordinator {
@@ -93,7 +90,9 @@ class TestJobCoordinator {
TaskConfig.INPUT_STREAMS -> "test.stream1",
SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
- TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
+ TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory",
+ JobConfig.MONITOR_PARTITION_CHANGE -> "true",
+ JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS -> "100"
)
// We want the mocksystemconsumer to use the same instance across runs
@@ -114,7 +113,12 @@ class TestJobCoordinator {
val jobModelFromCoordinatorUrl = SamzaObjectMapper.getObjectMapper.readValue(Util.read(coordinator.server.getUrl), classOf[JobModel])
assertEquals(expectedJobModel, jobModelFromCoordinatorUrl)
+ // Check the status of Stream Partition Count Monitor
+ assertNotNull(coordinator.streamPartitionCountMonitor)
+ assertTrue(coordinator.streamPartitionCountMonitor.isRunning())
+
coordinator.stop
+ assertFalse(coordinator.streamPartitionCountMonitor.isRunning())
}
@Test
@@ -202,7 +206,7 @@ class MockSystemFactory extends SystemFactory {
def getAdmin(systemName: String, config: Config) = new MockSystemAdmin
}
-class MockSystemAdmin extends SystemAdmin {
+class MockSystemAdmin extends ExtendedSystemAdmin {
def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = null
def getSystemStreamMetadata(streamNames: java.util.Set[String]): java.util.Map[String, SystemStreamMetadata] = {
assertEquals(1, streamNames.size)
@@ -227,4 +231,18 @@ class MockSystemAdmin extends SystemAdmin {
}
override def offsetComparator(offset1: String, offset2: String) = null
+
+ override def getSystemStreamPartitionCounts(streamNames: util.Set[String]): util.Map[String, SystemStreamMetadata] = {
+ assertEquals(1, streamNames.size())
+ val result = streamNames.map {
+ stream =>
+ val partitionMetadata = Map(
+ new Partition(0) -> new SystemStreamPartitionMetadata("", "", ""),
+ new Partition(1) -> new SystemStreamPartitionMetadata("", "", ""),
+ new Partition(2) -> new SystemStreamPartitionMetadata("", "", "")
+ )
+ stream -> new SystemStreamMetadata(stream, partitionMetadata)
+ }.toMap
+ result
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
new file mode 100644
index 0000000..f47f818
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator
+
+import org.apache.samza.Partition
+import org.apache.samza.metrics.{Gauge, MetricsRegistryMap}
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.{SystemAdmin, StreamMetadataCache, SystemStream, SystemStreamMetadata}
+import org.junit.Assert._
+import org.junit.Test
+import org.mockito.Matchers
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.junit.AssertionsForJUnit
+import org.scalatest.mock.MockitoSugar
+
+class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSugar {
+
+ @Test
+ def testStreamPartitionCountMonitor(): Unit = {
+ val mockMetadataCache = mock[StreamMetadataCache]
+ val inputSystemStream = new SystemStream("test-system", "test-stream")
+ val inputSystemStreamSet = Set[SystemStream](inputSystemStream)
+
+ val initialPartitionMetadata = new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+ {
+ put(new Partition(0), new SystemStreamPartitionMetadata("", "", ""))
+ put(new Partition(1), new SystemStreamPartitionMetadata("", "", ""))
+ }
+ }
+
+ val finalPartitionMetadata = new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+ {
+ putAll(initialPartitionMetadata)
+ put(new Partition(2), new SystemStreamPartitionMetadata("", "", ""))
+ }
+ }
+ val initialMetadata = Map(
+ inputSystemStream -> new SystemStreamMetadata(inputSystemStream.getStream, initialPartitionMetadata)
+ )
+ val finalMetadata = Map(
+ inputSystemStream -> new SystemStreamMetadata(inputSystemStream.getStream, finalPartitionMetadata)
+ )
+
+ when(mockMetadataCache.getStreamMetadata(any(classOf[Set[SystemStream]]), Matchers.eq(true)))
+ .thenReturn(initialMetadata) // Called during StreamPartitionCountMonitor instantiation
+ .thenReturn(initialMetadata) // Called when monitor thread is started
+ .thenReturn(finalMetadata) // Called from monitor thread the second time
+ .thenReturn(finalMetadata)
+
+ val partitionCountMonitor = new StreamPartitionCountMonitor(
+ inputSystemStreamSet,
+ mockMetadataCache,
+ new MetricsRegistryMap(),
+ 5
+ )
+
+ partitionCountMonitor.startMonitor()
+ Thread.sleep(50)
+ partitionCountMonitor.stopMonitor()
+
+ assertNotNull(partitionCountMonitor.gauges.get(inputSystemStream))
+ assertEquals(1, partitionCountMonitor.gauges.get(inputSystemStream).getValue)
+
+ assertNotNull(partitionCountMonitor.metrics.getGroup("job-coordinator"))
+
+ val metricGroup = partitionCountMonitor.metrics.getGroup("job-coordinator")
+ assertTrue(metricGroup.get("test-system-test-stream-partitionCount").isInstanceOf[Gauge[Int]])
+ assertEquals(1, metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue)
+ }
+
+ @Test
+ def testStartStopBehavior(): Unit = {
+ val mockMetadataCache = new MockStreamMetadataCache
+ val inputSystemStream = new SystemStream("test-system", "test-stream")
+ val inputSystemStreamSet = Set[SystemStream](inputSystemStream)
+ val monitor = new StreamPartitionCountMonitor(
+ inputSystemStreamSet,
+ mockMetadataCache,
+ new MetricsRegistryMap(),
+ 50
+ )
+ monitor.stopMonitor()
+ monitor.startMonitor()
+ assertTrue(monitor.isRunning())
+ monitor.startMonitor()
+ assertTrue(monitor.isRunning())
+ monitor.stopMonitor()
+ assertFalse(monitor.isRunning())
+ monitor.startMonitor()
+ assertTrue(monitor.isRunning())
+ monitor.stopMonitor()
+ assertFalse(monitor.isRunning())
+ monitor.stopMonitor()
+ assertFalse(monitor.isRunning())
+ }
+
+ class MockStreamMetadataCache extends StreamMetadataCache(Map[String, SystemAdmin]()) {
+ /**
+ * Returns metadata about each of the given streams (such as first offset, newest
+ * offset, etc). If the metadata isn't in the cache, it is retrieved from the systems
+ * using the given SystemAdmins.
+ */
+
+ override def getStreamMetadata(streams: Set[SystemStream], partitionsMetadataOnly: Boolean): Map[SystemStream, SystemStreamMetadata] = {
+ val inputSystemStream = new SystemStream("test-system", "test-stream")
+ val initialPartitionMetadata = new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+ {
+ put(new Partition(0), new SystemStreamPartitionMetadata("", "", ""))
+ put(new Partition(1), new SystemStreamPartitionMetadata("", "", ""))
+ }
+ }
+
+ val initialMetadata = Map(
+ inputSystemStream -> new SystemStreamMetadata(inputSystemStream.getStream, initialPartitionMetadata)
+ )
+ initialMetadata
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 9dc436a..23aa58d 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -19,17 +19,18 @@
package org.apache.samza.system.kafka
+import java.util
+
import org.I0Itec.zkclient.ZkClient
import org.apache.samza.Partition
import org.apache.samza.SamzaException
-import org.apache.samza.system.SystemAdmin
-import org.apache.samza.system.SystemStreamMetadata
-import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.{ExtendedSystemAdmin, SystemStreamMetadata, SystemStreamPartition}
import org.apache.samza.util.{ ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging }
import kafka.api._
import kafka.consumer.SimpleConsumer
import kafka.common.{ TopicExistsException, TopicAndPartition }
import java.util.{ Properties, UUID }
+import scala.collection.JavaConversions
import scala.collection.JavaConversions._
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import kafka.consumer.ConsumerConfig
@@ -134,10 +135,42 @@ class KafkaSystemAdmin(
* Replication factor for the Changelog topic in kafka
* Kafka properties to be used during the Changelog topic creation
*/
- topicMetaInformation: Map[String, ChangelogInfo] = Map[String, ChangelogInfo]()) extends SystemAdmin with Logging {
+ topicMetaInformation: Map[String, ChangelogInfo] = Map[String, ChangelogInfo]()) extends ExtendedSystemAdmin with Logging {
import KafkaSystemAdmin._
+ def getSystemStreamPartitionCounts(streams: util.Set[String]): util.Map[String, SystemStreamMetadata] = {
+ getSystemStreamPartitionCounts(streams, new ExponentialSleepStrategy(initialDelayMs = 500))
+ }
+
+ def getSystemStreamPartitionCounts(streams: util.Set[String], retryBackoff: ExponentialSleepStrategy): util.Map[String, SystemStreamMetadata] = {
+ debug("Fetching system stream partition count for: %s" format streams)
+ retryBackoff.run(
+ loop => {
+ val metadata = TopicMetadataCache.getTopicMetadata(
+ streams.toSet,
+ systemName,
+ getTopicMetadata)
+ val result = metadata.map {
+ case (topic, topicMetadata) => {
+ val partitionsMap = topicMetadata.partitionsMetadata.map {
+ pm =>
+ new Partition(pm.partitionId) -> new SystemStreamPartitionMetadata("", "", "")
+ }.toMap[Partition, SystemStreamPartitionMetadata]
+ (topic -> new SystemStreamMetadata(topic, partitionsMap))
+ }
+ }
+ loop.done
+ JavaConversions.mapAsJavaMap(result)
+ },
+
+ (exception, loop) => {
+ warn("Unable to fetch last offsets for streams %s due to %s. Retrying." format (streams, exception))
+ debug("Exception detail:", exception)
+ }
+ ).getOrElse(throw new SamzaException("Failed to get system stream metadata"))
+ }
+
/**
* Returns the offset for the message after the specified offset for each
* SystemStreamPartition that was passed in.
http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
index 2b1bdab..b253f98 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java
@@ -90,7 +90,7 @@ public class TestContainerAllocator {
containers.put(i, container);
}
JobModel jobModel = new JobModel(config, containers);
- return new JobCoordinator(jobModel, server);
+ return new JobCoordinator(jobModel, server, null);
}
@Before
http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
index 0c7a09f..93e430b 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java
@@ -99,7 +99,7 @@ public class TestHostAwareContainerAllocator {
containers.put(i, container);
}
JobModel jobModel = new JobModel(getConfig(), containers);
- return new JobCoordinator(jobModel, server);
+ return new JobCoordinator(jobModel, server, null);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a51f7b2b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
index 9da1edf..ff0fcde 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java
@@ -109,7 +109,7 @@ public class TestSamzaTaskManager {
when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
JobModel jobModel = new JobModel(getConfig(), containers, mockLocalityManager);
- return new JobCoordinator(jobModel, server);
+ return new JobCoordinator(jobModel, server, null);
}
@Before