You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:36:02 UTC

[46/50] [abbrv] samza git commit: SAMZA-843 - Slow start of Samza jobs with large number of containers

SAMZA-843 - Slow start of Samza jobs with large number of containers


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/868cff7b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/868cff7b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/868cff7b

Branch: refs/heads/samza-sql
Commit: 868cff7b6591b8dd4b413649ca4a92ca30dfd731
Parents: 0bacbbe
Author: Navina <na...@gmail.com>
Authored: Wed Jan 6 14:15:26 2016 -0800
Committer: Navina <na...@gmail.com>
Committed: Wed Jan 6 14:15:26 2016 -0800

----------------------------------------------------------------------
 .../autoscaling/deployer/ConfigManager.java     |  5 +-
 .../apache/samza/container/SamzaContainer.scala | 13 ++-
 .../samza/coordinator/JobCoordinator.scala      | 91 ++++++++++----------
 .../samza/coordinator/server/JobServlet.scala   | 13 ++-
 .../main/scala/org/apache/samza/util/Util.scala | 15 +++-
 .../samza/container/TestSamzaContainer.scala    | 49 ++++++++++-
 .../samza/coordinator/TestJobCoordinator.scala  | 17 +++-
 7 files changed, 143 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
----------------------------------------------------------------------
diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
index 87346bc..e3839ca 100644
--- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
+++ b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
@@ -68,6 +68,7 @@ public class ConfigManager {
   private SystemStreamPartitionIterator coordinatorStreamIterator;
   private static final Logger log = LoggerFactory.getLogger(ConfigManager.class);
   private final long defaultPollingInterval = 100;
+  private final int defaultReadJobModelDelayMs = 100;
   private final long interval;
   private String coordinatorServerURL = null;
   private final String jobName;
@@ -324,7 +325,7 @@ public class ConfigManager {
    */
   public int getCurrentNumTasks() {
     int currentNumTasks = 0;
-    for (ContainerModel containerModel : SamzaContainer.readJobModel(coordinatorServerURL).getContainers().values()) {
+    for (ContainerModel containerModel : SamzaContainer.readJobModel(coordinatorServerURL, defaultReadJobModelDelayMs).getContainers().values()) {
       currentNumTasks += containerModel.getTasks().size();
     }
     return currentNumTasks;
@@ -337,7 +338,7 @@ public class ConfigManager {
    * @return current number of containers in the job
    */
   public int getCurrentNumContainers() {
-    return SamzaContainer.readJobModel(coordinatorServerURL).getContainers().values().size();
+    return SamzaContainer.readJobModel(coordinatorServerURL, defaultReadJobModelDelayMs).getContainers().values().size();
   }
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index ddce148..e3d0b6c 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -52,8 +52,7 @@ import org.apache.samza.system.chooser.MessageChooserFactory
 import org.apache.samza.system.chooser.RoundRobinChooserFactory
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskInstanceCollector
-import org.apache.samza.util.Logging
-import org.apache.samza.util.Util
+import org.apache.samza.util.{ExponentialSleepStrategy, Logging, Util}
 import scala.collection.JavaConversions._
 import java.net.{UnknownHostException, InetAddress, URL}
 import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
@@ -62,6 +61,8 @@ import org.apache.samza.config.JobConfig.Config2Job
 import java.lang.Thread.UncaughtExceptionHandler
 
 object SamzaContainer extends Logging {
+  val DEFAULT_READ_JOBMODEL_DELAY_MS = 100
+
   def main(args: Array[String]) {
     safeMain(() => new JmxServer, new SamzaContainerExceptionHandler(() => System.exit(1)))
   }
@@ -102,11 +103,15 @@ object SamzaContainer extends Logging {
    * assignments, and returns objects to be used for SamzaContainer's
    * constructor.
    */
-  def readJobModel(url: String) = {
+  def readJobModel(url: String, initialDelayMs: Int = scala.util.Random.nextInt(DEFAULT_READ_JOBMODEL_DELAY_MS)) = {
     info("Fetching configuration from: %s" format url)
     SamzaObjectMapper
       .getObjectMapper
-      .readValue(Util.read(new URL(url)), classOf[JobModel])
+      .readValue(
+        Util.read(
+          url = new URL(url),
+          retryBackoff = new ExponentialSleepStrategy(initialDelayMs = initialDelayMs)),
+        classOf[JobModel])
   }
 
   def apply(containerModel: ContainerModel, jobModel: JobModel, jmxServer: JmxServer) = {

http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 112ec1c..06a96ad 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -20,6 +20,8 @@
 package org.apache.samza.coordinator
 
 
+import java.util.concurrent.atomic.AtomicReference
+
 import org.apache.samza.config.StorageConfig
 import org.apache.samza.job.model.{JobModel, TaskModel}
 import org.apache.samza.config.Config
@@ -55,6 +57,7 @@ object JobCoordinator extends Logging {
    * a volatile value to store the current instantiated <code>JobCoordinator</code>
    */
   @volatile var currentJobCoordinator: JobCoordinator = null
+  val jobModelRef: AtomicReference[JobModel] = new AtomicReference[JobModel]()
 
   /**
    * @param coordinatorSystemConfig A config object that contains job.name,
@@ -105,10 +108,12 @@ object JobCoordinator extends Logging {
                         changelogManager: ChangelogPartitionManager,
                         localityManager: LocalityManager,
                         streamMetadataCache: StreamMetadataCache) = {
-    val jobModelGenerator = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache)
+    val jobModel: JobModel = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache)
+    jobModelRef.set(jobModel)
+
     val server = new HttpServer
-    server.addServlet("/*", new JobServlet(jobModelGenerator))
-    currentJobCoordinator = new JobCoordinator(jobModelGenerator(), server)
+    server.addServlet("/*", new JobServlet(jobModelRef))
+    currentJobCoordinator = new JobCoordinator(jobModel, server)
     currentJobCoordinator
   }
 
@@ -141,15 +146,13 @@ object JobCoordinator extends Logging {
   }
 
   /**
-   * The method intializes the jobModel and creates a JobModel generator which can be used to generate new JobModels
-   * which catchup with the latest content from the coordinator stream.
+   * The method intializes the jobModel and returns it to the caller.
+   * Note: refreshJobModel can be used as a lambda for JobModel generation in the future.
    */
   private def initializeJobModel(config: Config,
                                  changelogManager: ChangelogPartitionManager,
                                  localityManager: LocalityManager,
-                                 streamMetadataCache: StreamMetadataCache): () => JobModel = {
-
-
+                                 streamMetadataCache: StreamMetadataCache): JobModel = {
     // Do grouping to fetch TaskName to SSP mapping
     val allSystemStreamPartitions = getInputStreamPartitions(config, streamMetadataCache)
     val grouper = getSystemStreamPartitionGrouper(config)
@@ -195,56 +198,54 @@ object JobCoordinator extends Logging {
       info("Saving task-to-changelog partition mapping: %s" format newChangelogMapping)
       changelogManager.writeChangeLogPartitionMapping(newChangelogMapping)
     }
-    // Return a jobModelGenerator lambda that can be used to refresh the job model
-    jobModelGenerator
+
+    jobModel
   }
 
   /**
    * Build a full Samza job model. The function reads the latest checkpoint from the underlying coordinator stream and
    * builds a new JobModel.
-   * This method needs to be thread safe, the reason being, for every HTTP request from a container, this method is called
-   * and underlying it uses the same instance of coordinator stream producer and coordinator stream consumer.
+   * Note: This method no longer needs to be thread safe because HTTP request from a container no longer triggers a jobmodel
+   * refresh. Hence, there is no need for synchronization as before.
    */
   private def refreshJobModel(config: Config,
                               allSystemStreamPartitions: util.Set[SystemStreamPartition],
                               groups: util.Map[TaskName, util.Set[SystemStreamPartition]],
                               previousChangelogMapping: util.Map[TaskName, Integer],
                               localityManager: LocalityManager): JobModel = {
-    this.synchronized
+
+    // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change
+    // mapping.
+    var maxChangelogPartitionId = previousChangelogMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
+
+    // Assign all SystemStreamPartitions to TaskNames.
+    val taskModels =
     {
-      // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change
-      // mapping.
-      var maxChangelogPartitionId = previousChangelogMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
-
-      // Assign all SystemStreamPartitions to TaskNames.
-      val taskModels =
-      {
-        groups.map
-                { case (taskName, systemStreamPartitions) =>
-                  val changelogPartition = Option(previousChangelogMapping.get(taskName)) match
-                  {
-                    case Some(changelogPartitionId) => new Partition(changelogPartitionId)
-                    case _ =>
-                      // If we've never seen this TaskName before, then assign it a
-                      // new changelog.
-                      maxChangelogPartitionId += 1
-                      info("New task %s is being assigned changelog partition %s." format(taskName, maxChangelogPartitionId))
-                      new Partition(maxChangelogPartitionId)
-                  }
-                  new TaskModel(taskName, systemStreamPartitions, changelogPartition)
-                }.toSet
-      }
-
-      // Here is where we should put in a pluggable option for the
-      // SSPTaskNameGrouper for locality, load-balancing, etc.
-
-      val containerGrouperFactory = Util.getObj[TaskNameGrouperFactory](config.getTaskNameGrouperFactory)
-      val containerGrouper = containerGrouperFactory.build(config)
-      val containerModels = asScalaSet(containerGrouper.group(setAsJavaSet(taskModels))).map
-              { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
-
-      new JobModel(config, containerModels, localityManager)
+      groups.map
+              { case (taskName, systemStreamPartitions) =>
+                val changelogPartition = Option(previousChangelogMapping.get(taskName)) match
+                {
+                  case Some(changelogPartitionId) => new Partition(changelogPartitionId)
+                  case _ =>
+                    // If we've never seen this TaskName before, then assign it a
+                    // new changelog.
+                    maxChangelogPartitionId += 1
+                    info("New task %s is being assigned changelog partition %s." format(taskName, maxChangelogPartitionId))
+                    new Partition(maxChangelogPartitionId)
+                }
+                new TaskModel(taskName, systemStreamPartitions, changelogPartition)
+              }.toSet
     }
+
+    // Here is where we should put in a pluggable option for the
+    // SSPTaskNameGrouper for locality, load-balancing, etc.
+
+    val containerGrouperFactory = Util.getObj[TaskNameGrouperFactory](config.getTaskNameGrouperFactory)
+    val containerGrouper = containerGrouperFactory.build(config)
+    val containerModels = asScalaSet(containerGrouper.group(setAsJavaSet(taskModels))).map
+            { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
+
+    new JobModel(config, containerModels, localityManager)
   }
 
   private def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int, streamMetadataCache: StreamMetadataCache) {

http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
index a3baddb..5750c2d 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
@@ -20,12 +20,21 @@
 package org.apache.samza.coordinator.server
 
 
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.samza.SamzaException
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.util.Logging
 
 /**
  * A servlet that dumps the job model for a Samza job.
  */
-class JobServlet(jobModelGenerator: () => JobModel) extends ServletBase with Logging {
-  protected def getObjectToWrite() = jobModelGenerator()
+class JobServlet(jobModelRef: AtomicReference[JobModel]) extends ServletBase with Logging {
+  protected def getObjectToWrite() = {
+    val jobModel = jobModelRef.get()
+    if (jobModel == null) { // This should never happen because JobServlet is instantiated only after a jobModel is generated and its reference is updated
+      throw new SamzaException("Job Model is not defined in the JobCoordinator. This indicates that the Samza job is unstable. Exiting...")
+    }
+    jobModel
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 58fbb8f..bd0fe5f 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -119,15 +119,22 @@ object Util extends Logging {
   }
 
   /**
+   * Overriding read method defined below so that it can be accessed from Java classes with default values
+   */
+  def read(url: URL, timeout: Int): String = {
+    read(url, timeout, new ExponentialSleepStrategy)
+  }
+
+  /**
    * Reads a URL and returns its body as a string. Does no error handling.
    *
    * @param url HTTP URL to read from.
    * @param timeout How long to wait before timing out when connecting to or reading from the HTTP server.
+   * @param retryBackoff Instance of exponentialSleepStrategy that encapsulates info on how long to sleep and retry operation
    * @return String payload of the body of the HTTP response.
    */
-  def read(url: URL, timeout: Int = 60000): String = {
+  def read(url: URL, timeout: Int = 60000, retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy): String = {
     var httpConn = getHttpConnection(url, timeout)
-    val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
     retryBackoff.run(loop => {
       if(httpConn.getResponseCode != 200)
       {
@@ -143,6 +150,10 @@ object Util extends Logging {
     },
     (exception, loop) => {
       exception match {
+        case ioe: IOException => {
+          warn("Error getting response from Job coordinator server. received IOException: %s. Retrying..." format ioe.getClass)
+          httpConn = getHttpConnection(url, timeout)
+        }
         case e: Exception =>
           loop.done
           error("Unable to connect to Job coordinator server, received exception", e)

http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 365ff0a..9df12d2 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -19,7 +19,9 @@
 
 package org.apache.samza.container
 
+import java.net.SocketTimeoutException
 import java.util
+import java.util.concurrent.atomic.AtomicReference
 import org.apache.samza.storage.TaskStorageManager
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
@@ -30,8 +32,7 @@ import org.apache.samza.Partition
 import org.apache.samza.config.Config
 import org.apache.samza.config.MapConfig
 import org.apache.samza.coordinator.JobCoordinator
-import org.apache.samza.coordinator.server.HttpServer
-import org.apache.samza.coordinator.server.JobServlet
+import org.apache.samza.coordinator.server.{ServletBase, HttpServer, JobServlet}
 import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.job.model.TaskModel
@@ -76,7 +77,8 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
     def jobModelGenerator(): JobModel = jobModel
     val server = new HttpServer
     val coordinator = new JobCoordinator(jobModel, server)
-    coordinator.server.addServlet("/*", new JobServlet(jobModelGenerator))
+    JobCoordinator.jobModelRef.set(jobModelGenerator())
+    coordinator.server.addServlet("/*", new JobServlet(JobCoordinator.jobModelRef))
     try {
       coordinator.start
       assertEquals(jobModel, SamzaContainer.readJobModel(server.getUrl.toString))
@@ -86,6 +88,33 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
   }
 
   @Test
+  def testReadJobModelWithTimeouts {
+    val config = new MapConfig(Map("a" -> "b"))
+    val offsets = new util.HashMap[SystemStreamPartition, String]()
+    offsets.put(new SystemStreamPartition("system","stream", new Partition(0)), "1")
+    val tasks = Map(
+      new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets.keySet(), new Partition(0)),
+      new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets.keySet(), new Partition(0)))
+    val containers = Map(
+      Integer.valueOf(0) -> new ContainerModel(0, tasks),
+      Integer.valueOf(1) -> new ContainerModel(1, tasks))
+    val jobModel = new JobModel(config, containers)
+    def jobModelGenerator(): JobModel = jobModel
+    val server = new HttpServer
+    val coordinator = new JobCoordinator(jobModel, server)
+    JobCoordinator.jobModelRef.set(jobModelGenerator())
+    val mockJobServlet = new MockJobServlet(2, JobCoordinator.jobModelRef)
+    coordinator.server.addServlet("/*", mockJobServlet)
+    try {
+      coordinator.start
+      assertEquals(jobModel, SamzaContainer.readJobModel(server.getUrl.toString))
+    } finally {
+      coordinator.stop
+    }
+    assertEquals(2, mockJobServlet.exceptionCount)
+  }
+
+  @Test
   def testChangelogPartitions {
     val config = new MapConfig(Map("a" -> "b"))
     val offsets = new util.HashMap[SystemStreamPartition, String]()
@@ -272,3 +301,17 @@ class MockCheckpointManager extends CheckpointManager {
 
   override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint): Unit = { }
 }
+
+class MockJobServlet(exceptionLimit: Int, jobModelRef: AtomicReference[JobModel]) extends JobServlet(jobModelRef) {
+  var exceptionCount = 0
+
+  override protected def getObjectToWrite() = {
+    if (exceptionCount < exceptionLimit) {
+      exceptionCount += 1
+      throw new java.io.IOException("Throwing exception")
+    } else {
+      val jobModel = jobModelRef.get()
+      jobModel
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 80cccf3..9ab1dd5 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -19,6 +19,8 @@
 
 package org.apache.samza.coordinator
 
+import org.apache.samza.serializers.model.SamzaObjectMapper
+import org.apache.samza.util.Util
 import org.junit.{After, Test}
 import org.junit.Assert._
 import scala.collection.JavaConversions._
@@ -98,10 +100,21 @@ class TestJobCoordinator {
     MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
 
     val coordinator = JobCoordinator(new MapConfig(config ++ otherConfigs))
+    val expectedJobModel = new JobModel(new MapConfig(config), containers)
+
+    // Verify that the atomicReference is initialized
+    assertNotNull(JobCoordinator.jobModelRef.get())
+    assertEquals(expectedJobModel, JobCoordinator.jobModelRef.get())
+
     coordinator.start
-    val jobModel = new JobModel(new MapConfig(config), containers)
     assertEquals(new MapConfig(config), coordinator.jobModel.getConfig)
-    assertEquals(jobModel, coordinator.jobModel)
+    assertEquals(expectedJobModel, coordinator.jobModel)
+
+    // Verify that the JobServlet is serving the correct jobModel
+    val jobModelFromCoordinatorUrl = SamzaObjectMapper.getObjectMapper.readValue(Util.read(coordinator.server.getUrl), classOf[JobModel])
+    assertEquals(expectedJobModel, jobModelFromCoordinatorUrl)
+
+    coordinator.stop
   }
 
   @Test