You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/04/29 04:57:50 UTC
[3/4] samza git commit: SAMZA-465: Use coordinator stream and
eliminate CheckpointManager
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 3b6685e..e4b14f4 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -20,6 +20,9 @@
package org.apache.samza.config
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.util.Logging
object JobConfig {
// job config constants
@@ -35,15 +38,45 @@ object JobConfig {
val CONFIG_REWRITER_CLASS = "job.config.rewriter.%s.class" // streaming.job_config_rewriter_class - regex, system, config
val JOB_NAME = "job.name" // streaming.job_name
val JOB_ID = "job.id" // streaming.job_id
-
+ val JOB_COORDINATOR_SYSTEM = "job.coordinator.system"
+ val JOB_CONTAINER_COUNT = "job.container.count"
+ val JOB_REPLICATION_FACTOR = "job.coordinator.replication.factor"
+ val JOB_SEGMENT_BYTES = "job.coordinator.segment.bytes"
val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory"
implicit def Config2Job(config: Config) = new JobConfig(config)
}
-class JobConfig(config: Config) extends ScalaMapConfig(config) {
+class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
def getName = getOption(JobConfig.JOB_NAME)
+ def getCoordinatorSystemName = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse({
+ // If no coordinator system is configured, try and guess it if there's just one system configured.
+ val systemNames = config.getSystemNames.toSet
+ if (systemNames.size == 1) {
+ val systemName = systemNames.iterator.next
+ info("No coordinator system defined, so defaulting to %s" format systemName)
+ systemName
+ } else {
+ throw new ConfigException("Missing job.coordinator.system configuration.")
+ }
+ })
+
+ def getContainerCount = {
+ getOption(JobConfig.JOB_CONTAINER_COUNT) match {
+ case Some(count) => count.toInt
+ case _ =>
+ // To maintain backwards compatibility, honor yarn.container.count for now.
+ // TODO get rid of this in a future release.
+ getOption("yarn.container.count") match {
+ case Some(count) =>
+ warn("Configuration 'yarn.container.count' is deprecated. Please use %s." format JobConfig.JOB_CONTAINER_COUNT)
+ count.toInt
+ case _ => 1
+ }
+ }
+ }
+
def getStreamJobFactoryClass = getOption(JobConfig.STREAM_JOB_FACTORY_CLASS)
def getJobId = getOption(JobConfig.JOB_ID)
@@ -53,4 +86,30 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) {
def getConfigRewriterClass(name: String) = getOption(JobConfig.CONFIG_REWRITER_CLASS format name)
def getSystemStreamPartitionGrouperFactory = getOption(JobConfig.SSP_GROUPER_FACTORY).getOrElse(classOf[GroupByPartitionFactory].getCanonicalName)
+
+ val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
+
+ def getCoordinatorReplicationFactor = getOption(JobConfig.JOB_REPLICATION_FACTOR) match {
+ case Some(rplFactor) => rplFactor
+ case _ =>
+ // TODO get rid of checkpoint configs in a future release
+ getOption("task.checkpoint.replication.factor") match {
+ case Some(rplFactor) =>
+ warn("Configuration 'task.checkpoint.replication.factor' is deprecated. Please use %s." format JobConfig.JOB_REPLICATION_FACTOR)
+ rplFactor
+ case _ => "3"
+ }
+ }
+
+ def getCoordinatorSegmentBytes = getOption(JobConfig.JOB_SEGMENT_BYTES) match {
+ case Some(segBytes) => segBytes
+ case _ =>
+ // TODO get rid of checkpoint configs in a future release
+ getOption("task.checkpoint.segment.bytes") match {
+ case Some(segBytes) =>
+ warn("Configuration 'task.checkpoint.segment.bytes' is deprecated. Please use %s." format JobConfig.JOB_SEGMENT_BYTES)
+ segBytes
+ case _ => "26214400"
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
index 1a2dd44..e94a473 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -21,9 +21,9 @@ package org.apache.samza.config
object ShellCommandConfig {
/**
- * This environment variable is used to store a JSON serialized map of all configuration.
+ * This environment variable is used to store a JSON serialized map of all coordinator system configs.
*/
- val ENV_CONFIG = "SAMZA_CONFIG"
+ val ENV_COORDINATOR_SYSTEM_CONFIG = "SAMZA_COORDINATOR_SYSTEM_CONFIG"
/**
* The ID for a container. This is an integer number between 0 and
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index adef09e..e172589 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -21,8 +21,6 @@ package org.apache.samza.config
import org.apache.samza.util.Logging
import scala.collection.JavaConversions._
-import org.apache.samza.SamzaException
-import org.apache.samza.util.Util
import org.apache.samza.system.SystemStream
object StreamConfig {
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index cd06c06..0b3a235 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -19,8 +19,8 @@
package org.apache.samza.config
-import org.apache.samza.util.Util
import org.apache.samza.system.SystemStream
+import org.apache.samza.util.Util
object TaskConfig {
// task config constants
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 720fbdc..56819e0 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -21,7 +21,7 @@ package org.apache.samza.container
import java.io.File
import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.{ CheckpointManagerFactory, OffsetManager }
+import org.apache.samza.checkpoint.{ CheckpointManager, OffsetManager }
import org.apache.samza.config.Config
import org.apache.samza.config.MetricsConfig.Config2Metrics
import org.apache.samza.config.SerializerConfig.Config2Serializer
@@ -30,6 +30,7 @@ import org.apache.samza.config.StorageConfig.Config2Storage
import org.apache.samza.config.StreamConfig.Config2Stream
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
import org.apache.samza.metrics.JmxServer
import org.apache.samza.metrics.JvmMetrics
import org.apache.samza.metrics.MetricsRegistryMap
@@ -328,17 +329,16 @@ object SamzaContainer extends Logging {
info("Got metrics reporters: %s" format reporters.keys)
- val checkpointManager = config.getCheckpointManagerFactory match {
- case Some(checkpointFactoryClassName) if (!checkpointFactoryClassName.isEmpty) =>
- Util
- .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
- .getCheckpointManager(config, samzaContainerMetrics.registry)
- case _ => null
- }
+ val coordinatorSystemConsumer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemConsumer(config, samzaContainerMetrics.registry)
+ val coordinatorSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, samzaContainerMetrics.registry)
+ val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, String.valueOf(containerId))
info("Got checkpoint manager: %s" format checkpointManager)
- val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics)
+ val combinedOffsets: Map[SystemStreamPartition, String] =
+ containerModel.getTasks.values().flatMap(_.getCheckpointedOffsets).toMap
+
+ val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics, combinedOffsets)
info("Got offset manager: %s" format offsetManager)
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index c14f2f6..5b43b58 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -19,61 +19,74 @@
package org.apache.samza.coordinator
+
import org.apache.samza.config.Config
-import org.apache.samza.job.model.JobModel
+import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
import org.apache.samza.SamzaException
import org.apache.samza.container.grouper.task.GroupByContainerCount
-import org.apache.samza.util.Util
-import org.apache.samza.checkpoint.CheckpointManagerFactory
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
import java.util
import org.apache.samza.container.TaskName
+import org.apache.samza.storage.ChangelogPartitionManager
import org.apache.samza.util.Logging
-import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.config.StorageConfig.Config2Storage
+import org.apache.samza.util.Util
import scala.collection.JavaConversions._
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.Partition
-import org.apache.samza.job.model.TaskModel
import org.apache.samza.system.StreamMetadataCache
import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.serializers.model.SamzaObjectMapper
-import java.net.URL
import org.apache.samza.system.SystemFactory
import org.apache.samza.coordinator.server.HttpServer
-import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
import org.apache.samza.coordinator.server.JobServlet
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamMessage, CoordinatorStreamSystemFactory}
+import org.apache.samza.config.ConfigRewriter
+/**
+ * Helper companion object that is responsible for wiring up a JobCoordinator
+ * given a Config object.
+ */
object JobCoordinator extends Logging {
+
/**
- * Build a JobCoordinator using a Samza job's configuration.
+ * @param coordinatorSystemConfig A config object that contains job.name,
+ * job.id, and all system.<job-coordinator-system-name>.*
+ * configuration. The method will use this config to read all configuration
+ * from the coordinator stream, and instantiate a JobCoordinator.
*/
- def apply(config: Config, containerCount: Int) = {
- val jobModel = buildJobModel(config, containerCount)
- val server = new HttpServer
- server.addServlet("/*", new JobServlet(jobModel))
- new JobCoordinator(jobModel, server)
+ def apply(coordinatorSystemConfig: Config, metricsRegistryMap: MetricsRegistryMap): JobCoordinator = {
+ val coordinatorStreamSystemFactory: CoordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory()
+ val coordinatorSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap)
+ val coordinatorSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap)
+ info("Registering coordinator system stream.")
+ coordinatorSystemConsumer.register
+ debug("Starting coordinator system stream.")
+ coordinatorSystemConsumer.start
+ debug("Bootstrapping coordinator system stream.")
+ coordinatorSystemConsumer.bootstrap
+ debug("Stopping coordinator system stream.")
+ coordinatorSystemConsumer.stop
+ val config = coordinatorSystemConsumer.getConfig
+ info("Got config: %s" format config)
+ val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator")
+ val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator")
+ getJobCoordinator(rewriteConfig(config), checkpointManager, changelogManager)
}
+ def apply(coordinatorSystemConfig: Config): JobCoordinator = apply(coordinatorSystemConfig, new MetricsRegistryMap())
+
/**
- * Gets a CheckpointManager from the configuration.
+ * Build a JobCoordinator using a Samza job's configuration.
*/
- def getCheckpointManager(config: Config) = {
- config.getCheckpointManagerFactory match {
- case Some(checkpointFactoryClassName) =>
- Util
- .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
- .getCheckpointManager(config, new MetricsRegistryMap)
- case _ =>
- if (!config.getStoreNames.isEmpty) {
- throw new SamzaException("Storage factories configured, but no checkpoint manager has been specified. " +
- "Unable to start job as there would be no place to store changelog partition mapping.")
- }
- null
- }
+ def getJobCoordinator(config: Config, checkpointManager: CheckpointManager, changelogManager: ChangelogPartitionManager) = {
+ val containerCount = config.getContainerCount
+ val jobModelGenerator = initializeJobModel(config, containerCount, checkpointManager, changelogManager)
+ val server = new HttpServer
+ server.addServlet("/*", new JobServlet(jobModelGenerator))
+ new JobCoordinator(jobModelGenerator(), server, checkpointManager)
}
/**
@@ -115,74 +128,135 @@ object JobCoordinator extends Logging {
}
/**
- * Build a full Samza job model using the job configuration.
+ * Re-writes configuration using a ConfigRewriter, if one is defined. If
+ * there is no ConfigRewriter defined for the job, then this method is a
+ * no-op.
+ *
+ * @param config The config to re-write.
*/
- def buildJobModel(config: Config, containerCount: Int) = {
- // TODO containerCount should go away when we generalize the job coordinator,
+ def rewriteConfig(config: Config): Config = {
+ def rewrite(c: Config, rewriterName: String): Config = {
+ val klass = config
+ .getConfigRewriterClass(rewriterName)
+ .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
+ val rewriter = Util.getObj[ConfigRewriter](klass)
+ info("Re-writing config with " + rewriter)
+ rewriter.rewrite(rewriterName, c)
+ }
+
+ config.getConfigRewriters match {
+ case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _))
+ case _ => config
+ }
+ }
+
+ /**
+ * The method intializes the jobModel and creates a JobModel generator which can be used to generate new JobModels
+ * which catchup with the latest content from the coordinator stream.
+ */
+ private def initializeJobModel(config: Config,
+ containerCount: Int,
+ checkpointManager: CheckpointManager,
+ changelogManager: ChangelogPartitionManager): () => JobModel = {
+ // TODO containerCount should go away when we generalize the job coordinator,
// and have a non-yarn-specific way of specifying container count.
- val checkpointManager = getCheckpointManager(config)
+
+ // Do grouping to fetch TaskName to SSP mapping
val allSystemStreamPartitions = getInputStreamPartitions(config)
val grouper = getSystemStreamPartitionGrouper(config)
- val previousChangelogeMapping = if (checkpointManager != null) {
- checkpointManager.start
- checkpointManager.readChangeLogPartitionMapping
- } else {
- new util.HashMap[TaskName, java.lang.Integer]()
+ info("SystemStreamPartitionGrouper " + grouper + " has grouped the SystemStreamPartitions into the following taskNames:")
+ val groups = grouper.group(allSystemStreamPartitions)
+
+ // Initialize the ChangelogPartitionManager and the CheckpointManager
+ val previousChangelogeMapping = if (changelogManager != null)
+ {
+ changelogManager.start
+ changelogManager.readChangeLogPartitionMapping
}
- var maxChangelogPartitionId = previousChangelogeMapping
- .values
- .map(_.toInt)
- .toList
- .sorted
- .lastOption
- .getOrElse(-1)
-
- // Assign all SystemStreamPartitions to TaskNames.
- val taskModels = {
- val groups = grouper.group(allSystemStreamPartitions)
- info("SystemStreamPartitionGrouper " + grouper + " has grouped the SystemStreamPartitions into the following taskNames:")
- groups
- .map {
- case (taskName, systemStreamPartitions) =>
- val changelogPartition = Option(previousChangelogeMapping.get(taskName)) match {
- case Some(changelogPartitionId) => new Partition(changelogPartitionId)
- case _ =>
- // If we've never seen this TaskName before, then assign it a
- // new changelog.
- maxChangelogPartitionId += 1
- info("New task %s is being assigned changelog partition %s." format (taskName, maxChangelogPartitionId))
- new Partition(maxChangelogPartitionId)
- }
- new TaskModel(taskName, systemStreamPartitions, changelogPartition)
- }
- .toSet
+ else
+ {
+ new util.HashMap[TaskName, java.lang.Integer]()
}
+ checkpointManager.start
+ groups.foreach(taskSSP => checkpointManager.register(taskSSP._1))
+
+ // Generate the jobModel
+ def jobModelGenerator(): JobModel = refreshJobModel(config,
+ allSystemStreamPartitions,
+ checkpointManager,
+ groups,
+ previousChangelogeMapping,
+ containerCount)
+
+ val jobModel = jobModelGenerator()
- // Save the changelog mapping back to the checkpoint manager.
- if (checkpointManager != null) {
- // newChangelogMapping is the merging of all current task:changelog
+ // Save the changelog mapping back to the ChangelogPartitionmanager
+ if (changelogManager != null)
+ {
+ // newChangelogMapping is the merging of all current task:changelog
// assignments with whatever we had before (previousChangelogeMapping).
- // We must persist legacy changelog assignments so that
- // maxChangelogPartitionId always has the absolute max, not the current
- // max (in case the task with the highest changelog partition mapping
+ // We must persist legacy changelog assignments so that
+ // maxChangelogPartitionId always has the absolute max, not the current
+ // max (in case the task with the highest changelog partition mapping
// disappears.
- val newChangelogMapping = taskModels.map(taskModel => {
- taskModel.getTaskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
- }).toMap ++ previousChangelogeMapping
+ val newChangelogMapping = jobModel.getContainers.flatMap(_._2.getTasks).map{case (taskName,taskModel) => {
+ taskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
+ }}.toMap ++ previousChangelogeMapping
info("Saving task-to-changelog partition mapping: %s" format newChangelogMapping)
- checkpointManager.writeChangeLogPartitionMapping(newChangelogMapping)
- checkpointManager.stop
+ changelogManager.writeChangeLogPartitionMapping(newChangelogMapping)
}
+ // Return a jobModelGenerator lambda that can be used to refresh the job model
+ jobModelGenerator
+ }
- // Here is where we should put in a pluggable option for the
- // SSPTaskNameGrouper for locality, load-balancing, etc.
- val containerGrouper = new GroupByContainerCount(containerCount)
- val containerModels = containerGrouper
- .group(taskModels)
- .map { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }
- .toMap
+ /**
+ * Build a full Samza job model. The function reads the latest checkpoint from the underlying coordinator stream and
+ * builds a new JobModel.
+ * This method needs to be thread safe, the reason being, for every HTTP request from a container, this method is called
+ * and underlying it uses the same instance of coordinator stream producer and coordinator stream consumer.
+ */
+ private def refreshJobModel(config: Config,
+ allSystemStreamPartitions: util.Set[SystemStreamPartition],
+ checkpointManager: CheckpointManager,
+ groups: util.Map[TaskName, util.Set[SystemStreamPartition]],
+ previousChangelogeMapping: util.Map[TaskName, Integer],
+ containerCount: Int): JobModel = {
+ this.synchronized
+ {
+ // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change
+ // mapping.
+ var maxChangelogPartitionId = previousChangelogeMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
+
+ // Assign all SystemStreamPartitions to TaskNames.
+ val taskModels =
+ {
+ groups.map
+ { case (taskName, systemStreamPartitions) =>
+ val checkpoint = Option(checkpointManager.readLastCheckpoint(taskName)).getOrElse(new Checkpoint(new util.HashMap[SystemStreamPartition, String]()))
+ // Find the system partitions which don't have a checkpoint and set null for the values for offsets
+ val offsetMap = systemStreamPartitions.map(ssp => (ssp -> null)).toMap ++ checkpoint.getOffsets
+ val changelogPartition = Option(previousChangelogeMapping.get(taskName)) match
+ {
+ case Some(changelogPartitionId) => new Partition(changelogPartitionId)
+ case _ =>
+ // If we've never seen this TaskName before, then assign it a
+ // new changelog.
+ maxChangelogPartitionId += 1
+ info("New task %s is being assigned changelog partition %s." format(taskName, maxChangelogPartitionId))
+ new Partition(maxChangelogPartitionId)
+ }
+ new TaskModel(taskName, offsetMap, changelogPartition)
+ }.toSet
+ }
+
+ // Here is where we should put in a pluggable option for the
+ // SSPTaskNameGrouper for locality, load-balancing, etc.
+ val containerGrouper = new GroupByContainerCount(containerCount)
+ val containerModels = containerGrouper.group(taskModels).map
+ { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
- new JobModel(config, containerModels)
+ new JobModel(config, containerModels)
+ }
}
}
@@ -207,19 +281,31 @@ class JobCoordinator(
/**
* HTTP server used to serve a Samza job's container model to SamzaContainers when they start up.
*/
- val server: HttpServer) extends Logging {
+ val server: HttpServer = null,
+
+ /**
+ * Handle to checkpoint manager that's used to refresh the JobModel
+ */
+ val checkpointManager: CheckpointManager) extends Logging {
debug("Got job model: %s." format jobModel)
def start {
- debug("Starting HTTP server.")
- server.start
- info("Startd HTTP server: %s" format server.getUrl)
+ if (server != null) {
+ debug("Starting HTTP server.")
+ server.start
+ info("Startd HTTP server: %s" format server.getUrl)
+ }
}
def stop {
- debug("Stopping HTTP server.")
- server.stop
- info("Stopped HTTP server.")
+ if (server != null) {
+ debug("Stopping HTTP server.")
+ server.stop
+ info("Stopped HTTP server.")
+ debug("Stopping checkpoint manager.")
+ checkpointManager.stop()
+ info("Stopped checkpoint manager.")
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
index 10986a4..dfe3a45 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
@@ -20,8 +20,6 @@
package org.apache.samza.coordinator.server;
import java.net.InetAddress
-import java.net.URI
-import java.net.UnknownHostException
import javax.servlet.Servlet
import org.apache.samza.SamzaException
import org.eclipse.jetty.server.Connector
@@ -32,6 +30,7 @@ import org.eclipse.jetty.servlet.ServletHolder
import java.net.URL
import org.apache.samza.util.Logging
+
/**
* <p>A Jetty-based HTTP server. The server allows arbitrary servlets to be added
* with the addServlet() method. The server is configured to automatically
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
index 635c353..a3baddb 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
@@ -19,12 +19,13 @@
package org.apache.samza.coordinator.server
+
import org.apache.samza.job.model.JobModel
import org.apache.samza.util.Logging
/**
* A servlet that dumps the job model for a Samza job.
*/
-class JobServlet(jobModel: JobModel) extends ServletBase with Logging {
- protected def getObjectToWrite() = jobModel
-}
+class JobServlet(jobModelGenerator: () => JobModel) extends ServletBase with Logging {
+ protected def getObjectToWrite() = jobModelGenerator()
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
new file mode 100644
index 0000000..9283812
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream
+
+import org.apache.samza.SamzaException
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.{Config, SystemConfig}
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.system.{SystemFactory, SystemStream}
+import org.apache.samza.util.Util
+
+/**
+ * A helper class that does wiring for CoordinatorStreamSystemConsumer and
+ * CoordinatorStreamSystemProducer. This factory should only be used in
+ * situations where the underlying SystemConsumer/SystemProducer does not
+ * exist.
+ */
+class CoordinatorStreamSystemFactory {
+ def getCoordinatorStreamSystemConsumer(config: Config, registry: MetricsRegistry) = {
+ val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
+ val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
+ val systemConsumer = systemFactory.getConsumer(coordinatorSystemStream.getSystem, config, registry)
+ new CoordinatorStreamSystemConsumer(coordinatorSystemStream, systemConsumer, systemAdmin)
+ }
+
+ def getCoordinatorStreamSystemProducer(config: Config, registry: MetricsRegistry) = {
+ val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
+ val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
+ val systemProducer = systemFactory.getProducer(coordinatorSystemStream.getSystem, config, registry)
+ new CoordinatorStreamSystemProducer(coordinatorSystemStream, systemProducer, systemAdmin)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 0b720ec..1c178a6 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -20,16 +20,27 @@
package org.apache.samza.job
import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, ConfigRewriter}
+import org.apache.samza.config.Config
import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.factories.PropertiesConfigFactory
import org.apache.samza.job.ApplicationStatus.Running
-import org.apache.samza.util.Util
import org.apache.samza.util.CommandLine
import org.apache.samza.util.Logging
+import org.apache.samza.util.Util
import scala.collection.JavaConversions._
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.ConfigException
+import org.apache.samza.config.SystemConfig
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer
+import org.apache.samza.system.SystemStream
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
+
+object JobRunner {
+ val SOURCE = "job-runner"
-object JobRunner extends Logging {
def main(args: Array[String]) {
val cmdline = new CommandLine
val options = cmdline.parser.parse(args: _*)
@@ -43,23 +54,43 @@ object JobRunner extends Logging {
* on a config URI. The configFactory is instantiated, fed the configPath,
* and returns a Config, which is used to execute the job.
*/
-class JobRunner(config: Config) extends Logging with Runnable {
-
- def run() {
- val conf = rewriteConfig(config)
-
- val jobFactoryClass = conf.getStreamJobFactoryClass match {
+class JobRunner(config: Config) extends Logging {
+ def run() = {
+ debug("config: %s" format (config))
+ val jobFactoryClass = config.getStreamJobFactoryClass match {
case Some(factoryClass) => factoryClass
case _ => throw new SamzaException("no job factory class defined")
}
-
val jobFactory = Class.forName(jobFactoryClass).newInstance.asInstanceOf[StreamJobFactory]
-
info("job factory: %s" format (jobFactoryClass))
- debug("config: %s" format (conf))
+ val factory = new CoordinatorStreamSystemFactory
+ val coordinatorSystemConsumer = factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
+ val coordinatorSystemProducer = factory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
+
+ // Create the coordinator stream if it doesn't exist
+ info("Creating coordinator stream");
+ val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
+ val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
+ systemAdmin.createCoordinatorStream(coordinatorSystemStream.getStream)
+
+ info("Storing config in coordinator stream.")
+ coordinatorSystemProducer.register(JobRunner.SOURCE)
+ coordinatorSystemProducer.start
+ coordinatorSystemProducer.writeConfig(JobRunner.SOURCE, config)
+ info("Loading old config from coordinator stream.")
+ coordinatorSystemConsumer.register
+ coordinatorSystemConsumer.start
+ coordinatorSystemConsumer.bootstrap
+ coordinatorSystemConsumer.stop
+ val oldConfig = coordinatorSystemConsumer.getConfig();
+ info("Deleting old configs that are no longer defined: %s".format(oldConfig.keySet -- config.keySet))
+ (oldConfig.keySet -- config.keySet).foreach(key => {
+ coordinatorSystemProducer.send(new CoordinatorStreamMessage.Delete(JobRunner.SOURCE, key, SetConfig.TYPE))
+ })
+ coordinatorSystemProducer.stop
// Create the actual job, and submit it.
- val job = jobFactory.getJob(conf).submit
+ val job = jobFactory.getJob(config).submit
info("waiting for job to start")
@@ -76,22 +107,6 @@ class JobRunner(config: Config) extends Logging with Runnable {
}
info("exiting")
- }
-
- // Apply any and all config re-writer classes that the user has specified
- def rewriteConfig(config: Config): Config = {
- def rewrite(c: Config, rewriterName: String): Config = {
- val klass = config
- .getConfigRewriterClass(rewriterName)
- .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
- val rewriter = Util.getObj[ConfigRewriter](klass)
- info("Re-writing config file with " + rewriter)
- rewriter.rewrite(rewriterName, c)
- }
-
- config.getConfigRewriters match {
- case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _))
- case None => config
- }
+ job
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index fd9719a..4fac154 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -30,7 +30,7 @@ import org.apache.samza.util.{Logging, Util}
*/
class ProcessJobFactory extends StreamJobFactory with Logging {
def getJob(config: Config): StreamJob = {
- val coordinator = JobCoordinator(config, 1)
+ val coordinator = JobCoordinator(config)
val containerModel = coordinator.jobModel.getContainers.get(0)
val commandBuilder = {
@@ -48,11 +48,12 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
}
// JobCoordinator is stopped by ProcessJob when it exits
coordinator.start
+ val coordinatorSystemConfig = Util.buildCoordinatorStreamConfig(config)
commandBuilder
- .setConfig(config)
- .setId(0)
- .setUrl(coordinator.server.getUrl)
+ .setConfig(coordinatorSystemConfig)
+ .setId(0)
+ .setUrl(coordinator.server.getUrl)
new ProcessJob(commandBuilder, coordinator)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 530255e..60ee36f 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -19,6 +19,8 @@
package org.apache.samza.job.local
+
+import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.util.Logging
import org.apache.samza.SamzaException
import org.apache.samza.config.Config
@@ -26,7 +28,6 @@ import org.apache.samza.config.ShellCommandConfig._
import org.apache.samza.config.TaskConfig._
import org.apache.samza.container.SamzaContainer
import org.apache.samza.job.{ StreamJob, StreamJobFactory }
-import org.apache.samza.util.Util
import org.apache.samza.config.JobConfig._
import org.apache.samza.coordinator.JobCoordinator
@@ -36,7 +37,7 @@ import org.apache.samza.coordinator.JobCoordinator
class ThreadJobFactory extends StreamJobFactory with Logging {
def getJob(config: Config): StreamJob = {
info("Creating a ThreadJob, which is only meant for debugging.")
- val coordinator = JobCoordinator(config, 1)
+ val coordinator = JobCoordinator(config)
val containerModel = coordinator.jobModel.getContainers.get(0)
// Give developers a nice friendly warning if they've specified task.opts and are using a threaded job.
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
index 744eec0..4b658c1 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
@@ -18,19 +18,33 @@
*/
package org.apache.samza.serializers
+
+import org.apache.samza.SamzaException
+import org.apache.samza.serializers.model.SamzaObjectMapper
+import org.codehaus.jackson.`type`.TypeReference
import org.codehaus.jackson.map.ObjectMapper
-import java.nio.ByteBuffer
import org.apache.samza.config.Config
-class JsonSerde extends Serde[Object] {
- val objectMapper = new ObjectMapper
+class JsonSerde[T] extends Serde[T] {
+ val mapper = SamzaObjectMapper.getObjectMapper()
+
+ def toBytes(obj: T): Array[Byte] = {
+ try {
+ mapper.writeValueAsString(obj).getBytes("UTF-8")
+ }
+ catch {
+ case e: Exception => throw new SamzaException(e);
+ }
+ }
- def toBytes(obj: Object) = objectMapper
- .writeValueAsString(obj)
- .getBytes("UTF-8")
+ def fromBytes(bytes: Array[Byte]): T = {
+ try {
+ mapper.readValue(new String(bytes, "UTF-8"), new TypeReference[T]() {})}
+ catch {
+ case e: Exception => throw new SamzaException(e);
+ }
+ }
- def fromBytes(bytes: Array[Byte]) = objectMapper
- .readValue(bytes, classOf[Object])
}
class JsonSerdeFactory extends SerdeFactory[Object] {
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
index ec1d749..097f410 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
@@ -131,7 +131,11 @@ class FileReaderSystemAdmin extends SystemAdmin with Logging {
enterPosition
}
- override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
- throw new SamzaException("Method not implemented")
+ def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
+ throw new UnsupportedOperationException("Method not implemented.")
+ }
+
+ def createCoordinatorStream(streamName: String) {
+ throw new UnsupportedOperationException("Method not implemented.")
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 1a67586..8a83566 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -19,14 +19,20 @@
package org.apache.samza.util
-import java.net.URL
-import java.io.BufferedReader
+import java.net.{HttpURLConnection, URL}
+import java.io.{InputStream, BufferedReader, File, InputStreamReader}
import java.lang.management.ManagementFactory
-import java.io.File
-import org.apache.samza.system.SystemStream
+import org.apache.samza.{SamzaException, Partition}
+import org.apache.samza.system.{SystemFactory, SystemStreamPartition, SystemStream}
import java.util.Random
-import org.apache.samza.job.model.JobModel
-import java.io.InputStreamReader
+import org.apache.samza.config.Config
+import org.apache.samza.config.SystemConfig
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.ConfigException
+import org.apache.samza.config.MapConfig
+import scala.collection.JavaConversions._
+import org.apache.samza.config.JobConfig
object Util extends Logging {
val random = new Random
@@ -114,14 +120,121 @@ object Util extends Logging {
* @param timeout How long to wait before timing out when connecting to or reading from the HTTP server.
* @return String payload of the body of the HTTP response.
*/
- def read(url: URL, timeout: Int = 30000): String = {
- val conn = url.openConnection();
+ def read(url: URL, timeout: Int = 60000): String = {
+ var httpConn = getHttpConnection(url, timeout)
+ val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
+ retryBackoff.run(loop => {
+ if(httpConn.getResponseCode != 200)
+ {
+ warn("Error: " + httpConn.getResponseCode)
+ val errorContent = readStream(httpConn.getErrorStream)
+ warn("Error reading stream, failed with response %s" format errorContent)
+ httpConn = getHttpConnection(url, timeout)
+ }
+ else
+ {
+ loop.done
+ }
+ },
+ (exception, loop) => {
+ exception match {
+ case e: Exception =>
+ loop.done
+ error("Unable to connect to Job coordinator server, received exception", e)
+ throw e
+ }
+ })
+
+ if(httpConn.getResponseCode != 200) {
+ throw new SamzaException("Unable to read JobModel from Jobcoordinator HTTP server")
+ }
+ readStream(httpConn.getInputStream)
+ }
+
+ private def getHttpConnection(url: URL, timeout: Int): HttpURLConnection = {
+ val conn = url.openConnection()
conn.setConnectTimeout(timeout)
conn.setReadTimeout(timeout)
- val br = new BufferedReader(new InputStreamReader(conn.getInputStream));
+ conn.asInstanceOf[HttpURLConnection]
+ }
+ private def readStream(stream: InputStream): String = {
+ val br = new BufferedReader(new InputStreamReader(stream));
var line: String = null;
val body = Iterator.continually(br.readLine()).takeWhile(_ != null).mkString
br.close
+ stream.close
body
}
+
+
+ /**
+ * Generates a coordinator stream name based off of the job name and job id
+ * for the jobd. The format is of the stream name will be
+ * __samza_coordinator_<JOBNAME>_<JOBID>.
+ */
+ def getCoordinatorStreamName(jobName: String, jobId: String) = {
+ "__samza_coordinator_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+ }
+
+ /**
+ * Get a job's name and ID given a config. Job ID is defaulted to 1 if not
+ * defined in the config, and job name must be defined in config.
+ *
+ * @return A tuple of (jobName, jobId)
+ */
+ def getJobNameAndId(config: Config) = {
+ (config.getName.getOrElse(throw new ConfigException("Missing required config: job.name")), config.getJobId.getOrElse("1"))
+ }
+
+ /**
+ * Given a job's full config object, build a subset config which includes
+ * only the job name, job id, and system config for the coordinator stream.
+ */
+ def buildCoordinatorStreamConfig(config: Config) = {
+ val (jobName, jobId) = getJobNameAndId(config)
+ // Build a map with just the system config and job.name/job.id. This is what's required to start the JobCoordinator.
+ new MapConfig(config.subset(SystemConfig.SYSTEM_PREFIX format config.getCoordinatorSystemName, false) ++
+ Map[String, String](JobConfig.JOB_NAME -> jobName, JobConfig.JOB_ID -> jobId, JobConfig.JOB_COORDINATOR_SYSTEM -> config.getCoordinatorSystemName))
+ }
+
+ /**
+ * Get the Coordinator System and system factory from the configuration
+ * @param config
+ * @return
+ */
+ def getCoordinatorSystemStreamAndFactory(config: Config) = {
+ val systemName = config.getCoordinatorSystemName
+ val (jobName, jobId) = Util.getJobNameAndId(config)
+ val streamName = Util.getCoordinatorStreamName(jobName, jobId)
+ val coordinatorSystemStream = new SystemStream(systemName, streamName)
+ val systemFactoryClassName = config
+ .getSystemFactory(systemName)
+ .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
+ val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+ (coordinatorSystemStream, systemFactory)
+ }
+
+ /**
+ * The helper function converts a SSP to a string
+ * @param ssp System stream partition
+ * @return The string representation of the SSP
+ */
+ def sspToString(ssp: SystemStreamPartition): String = {
+ ssp.getSystem() + "." + ssp.getStream() + "." + String.valueOf(ssp.getPartition().getPartitionId())
+ }
+
+ /**
+ * The method converts the string SSP back to a SSP
+ * @param ssp The string form of the SSP
+ * @return An SSP typed object
+ */
+ def stringToSsp(ssp: String): SystemStreamPartition = {
+ val idx = ssp.indexOf('.');
+ val lastIdx = ssp.lastIndexOf('.')
+ if (idx < 0 || lastIdx < 0) {
+ throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition")
+ }
+ new SystemStreamPartition(new SystemStream(ssp.substring(0, idx), ssp.substring(idx + 1, lastIdx)),
+ new Partition(Integer.parseInt(ssp.substring(lastIdx + 1))))
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
new file mode 100644
index 0000000..59782fe
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
+import org.apache.samza.util.Util;
+
+/**
+ * Helper for creating mock CoordinatorStreamConsumer and
+ * CoordinatorStreamConsumer. The CoordinatorStreamConsumer is meant to just
+ * forward all configs to JobCoordinator, which is useful for mocking in unit
+ * tests.
+ */
+public class MockCoordinatorStreamSystemFactory implements SystemFactory {
+
+ private static SystemConsumer mockConsumer = null;
+ private static boolean useCachedConsumer = false;
+ public static void enableMockConsumerCache() {
+ mockConsumer = null;
+ useCachedConsumer = true;
+ }
+
+ public static void disableMockConsumerCache() {
+ useCachedConsumer = false;
+ mockConsumer = null;
+ }
+
+ /**
+ * Returns a consumer that sends all configs to the coordinator stream.
+ * @param config Along with the configs, you can pass checkpoints and changelog stream messages into the stream.
+ * The expected pattern is cp:source:taskname -> ssp,offset for checkpoint (Use sspToString util)
+ * ch:source:taskname -> changelogPartition for changelog
+ * Everything else is processed as normal config
+ */
+ public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+
+ if(useCachedConsumer && mockConsumer != null) {
+ return mockConsumer;
+ }
+
+ String jobName = config.get("job.name");
+ String jobId = config.get("job.id");
+ if (jobName == null) {
+ throw new ConfigException("Must define job.name.");
+ }
+ if (jobId == null) {
+ jobId = "1";
+ }
+ String streamName = Util.getCoordinatorStreamName(jobName, jobId);
+ SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, streamName, new Partition(0));
+ mockConsumer = new MockCoordinatorStreamWrappedConsumer(systemStreamPartition, config);
+ return mockConsumer;
+ }
+
+ /**
+ * Returns a no-op producer.
+ */
+ public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+ // A do-nothing producer.
+ return new SystemProducer() {
+ public void start() {
+ }
+
+ public void stop() {
+ }
+
+ public void register(String source) {
+ }
+
+ public void send(String source, OutgoingMessageEnvelope envelope) {
+ }
+
+ public void flush(String source) {
+ }
+ };
+ }
+
+ /**
+ * Returns a single partition admin that pretends to create a coordinator
+ * stream.
+ */
+ public SystemAdmin getAdmin(String systemName, Config config) {
+ return new MockSystemAdmin();
+ }
+
+ public static final class MockSystemAdmin extends SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
+ public void createCoordinatorStream(String streamName) {
+ // Do nothing.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
new file mode 100644
index 0000000..00a2d59
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.apache.samza.util.Util;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * A mock SystemConsumer that pretends to be a coordinator stream. The mock will
+ * take all configs given to it, and put them into the coordinator stream's
+ * SystemStreamPartition. This is useful in cases where config needs to be
+ * quickly passed from a unit test into the JobCoordinator.
+ */
+public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap {
+ private final static ObjectMapper MAPPER = SamzaObjectMapper.getObjectMapper();
+ public final static String CHANGELOGPREFIX = "ch:";
+ public final static String CHECKPOINTPREFIX = "cp:";
+ public final CountDownLatch blockConsumerPoll = new CountDownLatch(1);
+ public boolean blockpollFlag = false;
+
+ private final SystemStreamPartition systemStreamPartition;
+ private final Config config;
+
+ public MockCoordinatorStreamWrappedConsumer(SystemStreamPartition systemStreamPartition, Config config) {
+ super();
+ this.config = config;
+ this.systemStreamPartition = systemStreamPartition;
+ }
+
+ public void start() {
+ convertConfigToCoordinatorMessage(config);
+ }
+
+ public void addMoreMessages(Config config) {
+ convertConfigToCoordinatorMessage(config);
+ }
+
+
+ private void convertConfigToCoordinatorMessage(Config config) {
+ try {
+ for (Map.Entry<String, String> configPair : config.entrySet()) {
+ byte[] keyBytes = null;
+ byte[] messgeBytes = null;
+ if(configPair.getKey().startsWith(CHECKPOINTPREFIX))
+ {
+ String[] checkpointInfo = configPair.getKey().split(":");
+ String[] sspOffsetPair = configPair.getValue().split(":");
+ HashMap<SystemStreamPartition, String> checkpointMap = new HashMap<SystemStreamPartition, String>();
+ checkpointMap.put(Util.stringToSsp(sspOffsetPair[0]), sspOffsetPair[1]);
+ Checkpoint cp = new Checkpoint(checkpointMap);
+ CoordinatorStreamMessage.SetCheckpoint setCheckpoint = new CoordinatorStreamMessage.SetCheckpoint(checkpointInfo[1], checkpointInfo[2], cp);
+ keyBytes = MAPPER.writeValueAsString(setCheckpoint.getKeyArray()).getBytes("UTF-8");
+ messgeBytes = MAPPER.writeValueAsString(setCheckpoint.getMessageMap()).getBytes("UTF-8");
+ }
+ else if (configPair.getKey().startsWith(CHANGELOGPREFIX)) {
+ String[] changelogInfo = configPair.getKey().split(":");
+ String changeLogPartition = configPair.getValue();
+ CoordinatorStreamMessage.SetChangelogMapping changelogMapping = new CoordinatorStreamMessage.SetChangelogMapping(changelogInfo[1], changelogInfo[2], Integer.parseInt(changeLogPartition));
+ keyBytes = MAPPER.writeValueAsString(changelogMapping.getKeyArray()).getBytes("UTF-8");
+ messgeBytes = MAPPER.writeValueAsString(changelogMapping.getMessageMap()).getBytes("UTF-8");
+ }
+ else {
+ SetConfig setConfig = new SetConfig("source", configPair.getKey(), configPair.getValue());
+ keyBytes = MAPPER.writeValueAsString(setConfig.getKeyArray()).getBytes("UTF-8");
+ messgeBytes = MAPPER.writeValueAsString(setConfig.getMessageMap()).getBytes("UTF-8");
+ }
+ // The ssp here is the coordinator ssp (which is always fixed) and not the task ssp.
+ put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "", keyBytes, messgeBytes));
+ }
+ setIsAtHead(systemStreamPartition, true);
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ @Override
+ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+ Set<SystemStreamPartition> systemStreamPartitions, long timeout)
+ throws InterruptedException {
+
+ if(blockpollFlag) {
+ blockConsumerPoll.await();
+ }
+
+ return super.poll(systemStreamPartitions, timeout);
+ }
+
+ public CountDownLatch blockPool()
+ {
+ blockpollFlag = true;
+ return blockConsumerPoll;
+ }
+
+
+ public void stop() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
new file mode 100644
index 0000000..15181bb
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import static org.junit.Assert.*;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.Delete;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.junit.Test;
+
+public class TestCoordinatorStreamMessage {
+ @Test
+ public void testCoordinatorStreamMessage() {
+ CoordinatorStreamMessage message = new CoordinatorStreamMessage("source");
+ assertEquals("source", message.getSource());
+ assertEquals(CoordinatorStreamMessage.VERSION, message.getVersion());
+ assertNotNull(message.getUsername());
+ assertTrue(message.getTimestamp() > 0);
+ assertTrue(!message.isDelete());
+ CoordinatorStreamMessage secondMessage = new CoordinatorStreamMessage(message.getKeyArray(), message.getMessageMap());
+ assertEquals(secondMessage, message);
+ }
+
+ @Test
+ public void testCoordinatorStreamMessageIsDelete() {
+ CoordinatorStreamMessage message = new CoordinatorStreamMessage(new Object[] {}, null);
+ assertTrue(message.isDelete());
+ assertNull(message.getMessageMap());
+ }
+
+ @Test
+ public void testSetConfig() {
+ SetConfig setConfig = new SetConfig("source", "key", "value");
+ assertEquals(SetConfig.TYPE, setConfig.getType());
+ assertEquals("key", setConfig.getKey());
+ assertEquals("value", setConfig.getConfigValue());
+ assertFalse(setConfig.isDelete());
+ assertEquals(CoordinatorStreamMessage.VERSION, setConfig.getVersion());
+ }
+
+ @Test
+ public void testDelete() {
+ Delete delete = new Delete("source2", "key", "delete-type");
+ assertEquals("delete-type", delete.getType());
+ assertEquals("key", delete.getKey());
+ assertNull(delete.getMessageMap());
+ assertTrue(delete.isDelete());
+ assertEquals(CoordinatorStreamMessage.VERSION, delete.getVersion());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
new file mode 100644
index 0000000..5e193f8
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.Delete;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
+import org.junit.Test;
+
+public class TestCoordinatorStreamSystemConsumer {
+ @Test
+ public void testCoordinatorStreamSystemConsumer() {
+ Map<String, String> expectedConfig = new HashMap<String, String>();
+ expectedConfig.put("job.id", "1234");
+ SystemStream systemStream = new SystemStream("system", "stream");
+ MockSystemConsumer systemConsumer = new MockSystemConsumer(new SystemStreamPartition(systemStream, new Partition(0)));
+ CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(systemStream, systemConsumer, new SinglePartitionWithoutOffsetsSystemAdmin());
+ assertFalse(systemConsumer.isRegistered());
+ consumer.register();
+ assertTrue(systemConsumer.isRegistered());
+ assertFalse(systemConsumer.isStarted());
+ consumer.start();
+ assertTrue(systemConsumer.isStarted());
+ try {
+ consumer.getConfig();
+ fail("Should have failed when retrieving config before bootstrapping.");
+ } catch (SamzaException e) {
+ // Expected.
+ }
+ consumer.bootstrap();
+ assertEquals(expectedConfig, consumer.getConfig());
+ assertFalse(systemConsumer.isStopped());
+ consumer.stop();
+ assertTrue(systemConsumer.isStopped());
+ }
+
+ private static class MockSystemConsumer implements SystemConsumer {
+ private boolean started = false;
+ private boolean stopped = false;
+ private boolean registered = false;
+ private final SystemStreamPartition expectedSystemStreamPartition;
+ private int pollCount = 0;
+
+ public MockSystemConsumer(SystemStreamPartition expectedSystemStreamPartition) {
+ this.expectedSystemStreamPartition = expectedSystemStreamPartition;
+ }
+
+ public void start() {
+ started = true;
+ }
+
+ public void stop() {
+ stopped = true;
+ }
+
+ public void register(SystemStreamPartition systemStreamPartition, String offset) {
+ registered = true;
+ assertEquals(expectedSystemStreamPartition, systemStreamPartition);
+ }
+
+ public boolean isRegistered() {
+ return registered;
+ }
+
+ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> map = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
+ assertEquals(1, systemStreamPartitions.size());
+ SystemStreamPartition systemStreamPartition = systemStreamPartitions.iterator().next();
+ assertEquals(expectedSystemStreamPartition, systemStreamPartition);
+
+ if (pollCount++ == 0) {
+ List<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>();
+ SetConfig setConfig1 = new SetConfig("test", "job.name", "my-job-name");
+ SetConfig setConfig2 = new SetConfig("test", "job.id", "1234");
+ Delete delete = new Delete("test", "job.name", SetConfig.TYPE);
+ list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig1.getKeyArray()), serialize(setConfig1.getMessageMap())));
+ list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig2.getKeyArray()), serialize(setConfig2.getMessageMap())));
+ list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(delete.getKeyArray()), delete.getMessageMap()));
+ map.put(systemStreamPartition, list);
+ }
+
+ return map;
+ }
+
+ private byte[] serialize(Object obj) {
+ try {
+ return SamzaObjectMapper.getObjectMapper().writeValueAsString(obj).getBytes("UTF-8");
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ public boolean isStarted() {
+ return started;
+ }
+
+ public boolean isStopped() {
+ return stopped;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
new file mode 100644
index 0000000..728fa53
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.Delete;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
+import org.codehaus.jackson.type.TypeReference;
+import org.junit.Test;
+
+public class TestCoordinatorStreamSystemProducer {
+ @Test
+ public void testCoordinatorStreamSystemProducer() {
+ String source = "source";
+ SystemStream systemStream = new SystemStream("system", "stream");
+ MockSystemProducer systemProducer = new MockSystemProducer(source);
+ MockSystemAdmin systemAdmin = new MockSystemAdmin();
+ CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(systemStream, systemProducer, systemAdmin);
+ SetConfig setConfig1 = new SetConfig(source, "job.name", "my-job-name");
+ SetConfig setConfig2 = new SetConfig(source, "job.id", "1234");
+ Delete delete = new Delete(source, "job.name", SetConfig.TYPE);
+ assertFalse(systemProducer.isRegistered());
+ producer.register(source);
+ assertTrue(systemProducer.isRegistered());
+ assertFalse(systemProducer.isStarted());
+ producer.start();
+ assertTrue(systemProducer.isStarted());
+ producer.send(setConfig1);
+ producer.send(setConfig2);
+ producer.send(delete);
+ assertFalse(systemProducer.isStopped());
+ producer.stop();
+ assertTrue(systemProducer.isStopped());
+ List<OutgoingMessageEnvelope> envelopes = systemProducer.getEnvelopes();
+ OutgoingMessageEnvelope envelope0 = envelopes.get(0);
+ OutgoingMessageEnvelope envelope1 = envelopes.get(1);
+ OutgoingMessageEnvelope envelope2 = envelopes.get(2);
+ TypeReference<Object[]> keyRef = new TypeReference<Object[]>() {
+ };
+ TypeReference<Map<String, Object>> msgRef = new TypeReference<Map<String, Object>>() {
+ };
+ assertEquals(3, envelopes.size());
+ assertEquals(new CoordinatorStreamMessage(setConfig1), new CoordinatorStreamMessage(deserialize((byte[]) envelope0.getKey(), keyRef), deserialize((byte[]) envelope0.getMessage(), msgRef)));
+ assertEquals(new CoordinatorStreamMessage(setConfig2), new CoordinatorStreamMessage(deserialize((byte[]) envelope1.getKey(), keyRef), deserialize((byte[]) envelope1.getMessage(), msgRef)));
+ assertEquals(new CoordinatorStreamMessage(delete), new CoordinatorStreamMessage(deserialize((byte[]) envelope2.getKey(), keyRef), deserialize((byte[]) envelope2.getMessage(), msgRef)));
+ }
+
+ private <T> T deserialize(byte[] bytes, TypeReference<T> reference) {
+ try {
+ if (bytes != null) {
+ String valueStr = new String((byte[]) bytes, "UTF-8");
+ return SamzaObjectMapper.getObjectMapper().readValue(valueStr, reference);
+ }
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+
+ return null;
+ }
+
+ private static class MockSystemAdmin extends SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
+ public void createCoordinatorStream(String streamName) {
+ // Do nothing.
+ }
+ }
+
+ private static class MockSystemProducer implements SystemProducer {
+ private final String expectedSource;
+ private final List<OutgoingMessageEnvelope> envelopes;
+ private boolean started = false;
+ private boolean stopped = false;
+ private boolean registered = false;
+ private boolean flushed = false;
+
+ public MockSystemProducer(String expectedSource) {
+ this.expectedSource = expectedSource;
+ this.envelopes = new ArrayList<OutgoingMessageEnvelope>();
+ }
+
+ public void start() {
+ started = true;
+ }
+
+ public void stop() {
+ stopped = true;
+ }
+
+ public void register(String source) {
+ assertEquals(expectedSource, source);
+ registered = true;
+ }
+
+ public void send(String source, OutgoingMessageEnvelope envelope) {
+ envelopes.add(envelope);
+ }
+
+ public void flush(String source) {
+ assertEquals(expectedSource, source);
+ flushed = true;
+ }
+
+ public List<OutgoingMessageEnvelope> getEnvelopes() {
+ return envelopes;
+ }
+
+ public boolean isStarted() {
+ return started;
+ }
+
+ public boolean isStopped() {
+ return stopped;
+ }
+
+ public boolean isRegistered() {
+ return registered;
+ }
+
+ public boolean isFlushed() {
+ return flushed;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
index 76bc681..72b134c 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -40,12 +40,12 @@ public class TestSamzaObjectMapper {
public void testJsonTaskModel() throws Exception {
ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
Map<String, String> configMap = new HashMap<String, String>();
+ Map<SystemStreamPartition, String> sspOffset = new HashMap<SystemStreamPartition, String>();
configMap.put("a", "b");
Config config = new MapConfig(configMap);
- Set<SystemStreamPartition> inputSystemStreamPartitions = new HashSet<SystemStreamPartition>();
- inputSystemStreamPartitions.add(new SystemStreamPartition("foo", "bar", new Partition(1)));
TaskName taskName = new TaskName("test");
- TaskModel taskModel = new TaskModel(taskName, inputSystemStreamPartitions, new Partition(2));
+ sspOffset.put(new SystemStreamPartition("foo", "bar", new Partition(1)), "");
+ TaskModel taskModel = new TaskModel(taskName, sspOffset, new Partition(2));
Map<TaskName, TaskModel> tasks = new HashMap<TaskName, TaskModel>();
tasks.put(taskName, taskModel);
ContainerModel containerModel = new ContainerModel(1, tasks);
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/resources/test.properties
----------------------------------------------------------------------
diff --git a/samza-core/src/test/resources/test.properties b/samza-core/src/test/resources/test.properties
index 9348c7d..41eb82e 100644
--- a/samza-core/src/test/resources/test.properties
+++ b/samza-core/src/test/resources/test.properties
@@ -22,3 +22,4 @@
job.factory.class=org.apache.samza.job.MockJobFactory
job.name=test-job
foo=bar
+systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index af800df..0ba932c 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -31,8 +31,9 @@ import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.junit.AssertionsForJUnit
import org.scalatest.mock.MockitoSugar
-
import scala.collection.JavaConversions._
+import org.apache.samza.config.JobConfig
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
object TestCheckpointTool {
var checkpointManager: CheckpointManager = null
@@ -40,8 +41,8 @@ object TestCheckpointTool {
var systemProducer: SystemProducer = null
var systemAdmin: SystemAdmin = null
- class MockCheckpointManagerFactory extends CheckpointManagerFactory {
- override def getCheckpointManager(config: Config, registry: MetricsRegistry) = checkpointManager
+ class MockCheckpointManagerFactory {
+ def getCheckpointManager(config: Config, registry: MetricsRegistry) = checkpointManager
}
class MockSystemFactory extends SystemFactory {
@@ -62,15 +63,17 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
@Before
def setup {
config = new MapConfig(Map(
+ JobConfig.JOB_NAME -> "test",
+ JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
TaskConfig.INPUT_STREAMS -> "test.foo",
TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getName,
- SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName
+ SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName,
+ SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName
))
val metadata = new SystemStreamMetadata("foo", Map[Partition, SystemStreamPartitionMetadata](
new Partition(0) -> new SystemStreamPartitionMetadata("0", "100", "101"),
new Partition(1) -> new SystemStreamPartitionMetadata("0", "200", "201")
))
-
TestCheckpointTool.checkpointManager = mock[CheckpointManager]
TestCheckpointTool.systemAdmin = mock[SystemAdmin]
when(TestCheckpointTool.systemAdmin.getSystemStreamMetadata(Set("foo")))
@@ -79,12 +82,12 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
.thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "1234")))
when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn1))
.thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p1) -> "4321")))
-
}
@Test
def testReadLatestCheckpoint {
- new CheckpointTool(config, null).run
+ val checkpointTool = new CheckpointTool(config, null, TestCheckpointTool.checkpointManager)
+ checkpointTool.run
verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn0)
verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn1)
verify(TestCheckpointTool.checkpointManager, never()).writeCheckpoint(any(), any())
@@ -95,7 +98,8 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
val toOverwrite = Map(tn0 -> Map(new SystemStreamPartition("test", "foo", p0) -> "42"),
tn1 -> Map(new SystemStreamPartition("test", "foo", p1) -> "43"))
- new CheckpointTool(config, toOverwrite).run
+ val checkpointTool = new CheckpointTool(config, toOverwrite, TestCheckpointTool.checkpointManager)
+ checkpointTool.run
verify(TestCheckpointTool.checkpointManager)
.writeCheckpoint(tn0, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "42")))
verify(TestCheckpointTool.checkpointManager)
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index a281e79..8d54c46 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -65,7 +65,7 @@ class TestOffsetManager {
val config = new MapConfig
val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
val systemAdmins = Map("test-system" -> getSystemAdmin)
- val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
+ val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics, checkpointManager.getOffets)
offsetManager.register(taskName, Set(systemStreamPartition))
offsetManager.start
assertTrue(checkpointManager.isStarted)
@@ -97,7 +97,7 @@ class TestOffsetManager {
val config = new MapConfig
val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
val systemAdmins = Map("test-system" -> getSystemAdmin)
- val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
+ val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics, checkpointManager.getOffets)
offsetManager.register(taskName, Set(systemStreamPartition))
offsetManager.start
// Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
@@ -179,22 +179,6 @@ class TestOffsetManager {
}
}
- @Ignore("OffsetManager.start is supposed to throw an exception - but it doesn't") @Test
- def testShouldFailWhenMissingDefault {
- val taskName = new TaskName("c")
- val systemStream = new SystemStream("test-system", "test-stream")
- val partition = new Partition(0)
- val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
- val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
- val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
- val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig(Map[String, String]()))
- offsetManager.register(taskName, Set(systemStreamPartition))
-
- intercept[SamzaException] {
- offsetManager.start
- }
- }
-
@Test
def testDefaultSystemShouldFailWhenFailIsSpecified {
val systemStream = new SystemStream("test-system", "test-stream")
@@ -255,24 +239,26 @@ class TestOffsetManager {
assertEquals(Some("13"), offsetManager.getStartingOffset(ssp))
}
+
private def getCheckpointManager(systemStreamPartition: SystemStreamPartition, taskName:TaskName = new TaskName("taskName")) = {
val checkpoint = new Checkpoint(Map(systemStreamPartition -> "45"))
-
- new CheckpointManager {
+ new CheckpointManager(null, null, null) {
var isStarted = false
var isStopped = false
var registered = Set[TaskName]()
var checkpoints: Map[TaskName, Checkpoint] = Map(taskName -> checkpoint)
var taskNameToPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]()
- def start { isStarted = true }
- def register(taskName: TaskName) { registered += taskName }
- def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { checkpoints += taskName -> checkpoint }
- def readLastCheckpoint(taskName: TaskName) = checkpoints.getOrElse(taskName, null)
- def stop { isStopped = true }
-
- override def writeChangeLogPartitionMapping(mapping: util.Map[TaskName, java.lang.Integer]): Unit = taskNameToPartitionMapping = mapping
+ override def start { isStarted = true }
+ override def register(taskName: TaskName) { registered += taskName }
+ override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { checkpoints += taskName -> checkpoint }
+ override def readLastCheckpoint(taskName: TaskName) = checkpoints.getOrElse(taskName, null)
+ override def stop { isStopped = true }
- override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = taskNameToPartitionMapping
+ // Only for testing purposes - not present in actual checkpoint manager
+ def getOffets: util.Map[SystemStreamPartition, String] =
+ {
+ checkpoint.getOffsets()
+ }
}
}
@@ -284,8 +270,12 @@ class TestOffsetManager {
def getSystemStreamMetadata(streamNames: java.util.Set[String]) =
Map[String, SystemStreamMetadata]()
- override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
- new SamzaException("Method not implemented")
+ override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
+ new UnsupportedOperationException("Method not implemented.")
+ }
+
+ override def createCoordinatorStream(streamName: String) {
+ new UnsupportedOperationException("Method not implemented.")
}
}
}