You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:36:02 UTC
[46/50] [abbrv] samza git commit: SAMZA-843 - Slow start of Samza
jobs with large number of containers
SAMZA-843 - Slow start of Samza jobs with large number of containers
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/868cff7b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/868cff7b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/868cff7b
Branch: refs/heads/samza-sql
Commit: 868cff7b6591b8dd4b413649ca4a92ca30dfd731
Parents: 0bacbbe
Author: Navina <na...@gmail.com>
Authored: Wed Jan 6 14:15:26 2016 -0800
Committer: Navina <na...@gmail.com>
Committed: Wed Jan 6 14:15:26 2016 -0800
----------------------------------------------------------------------
.../autoscaling/deployer/ConfigManager.java | 5 +-
.../apache/samza/container/SamzaContainer.scala | 13 ++-
.../samza/coordinator/JobCoordinator.scala | 91 ++++++++++----------
.../samza/coordinator/server/JobServlet.scala | 13 ++-
.../main/scala/org/apache/samza/util/Util.scala | 15 +++-
.../samza/container/TestSamzaContainer.scala | 49 ++++++++++-
.../samza/coordinator/TestJobCoordinator.scala | 17 +++-
7 files changed, 143 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
----------------------------------------------------------------------
diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
index 87346bc..e3839ca 100644
--- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
+++ b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
@@ -68,6 +68,7 @@ public class ConfigManager {
private SystemStreamPartitionIterator coordinatorStreamIterator;
private static final Logger log = LoggerFactory.getLogger(ConfigManager.class);
private final long defaultPollingInterval = 100;
+ private final int defaultReadJobModelDelayMs = 100;
private final long interval;
private String coordinatorServerURL = null;
private final String jobName;
@@ -324,7 +325,7 @@ public class ConfigManager {
*/
public int getCurrentNumTasks() {
int currentNumTasks = 0;
- for (ContainerModel containerModel : SamzaContainer.readJobModel(coordinatorServerURL).getContainers().values()) {
+ for (ContainerModel containerModel : SamzaContainer.readJobModel(coordinatorServerURL, defaultReadJobModelDelayMs).getContainers().values()) {
currentNumTasks += containerModel.getTasks().size();
}
return currentNumTasks;
@@ -337,7 +338,7 @@ public class ConfigManager {
* @return current number of containers in the job
*/
public int getCurrentNumContainers() {
- return SamzaContainer.readJobModel(coordinatorServerURL).getContainers().values().size();
+ return SamzaContainer.readJobModel(coordinatorServerURL, defaultReadJobModelDelayMs).getContainers().values().size();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index ddce148..e3d0b6c 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -52,8 +52,7 @@ import org.apache.samza.system.chooser.MessageChooserFactory
import org.apache.samza.system.chooser.RoundRobinChooserFactory
import org.apache.samza.task.StreamTask
import org.apache.samza.task.TaskInstanceCollector
-import org.apache.samza.util.Logging
-import org.apache.samza.util.Util
+import org.apache.samza.util.{ExponentialSleepStrategy, Logging, Util}
import scala.collection.JavaConversions._
import java.net.{UnknownHostException, InetAddress, URL}
import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
@@ -62,6 +61,8 @@ import org.apache.samza.config.JobConfig.Config2Job
import java.lang.Thread.UncaughtExceptionHandler
object SamzaContainer extends Logging {
+ val DEFAULT_READ_JOBMODEL_DELAY_MS = 100
+
def main(args: Array[String]) {
safeMain(() => new JmxServer, new SamzaContainerExceptionHandler(() => System.exit(1)))
}
@@ -102,11 +103,15 @@ object SamzaContainer extends Logging {
* assignments, and returns objects to be used for SamzaContainer's
* constructor.
*/
- def readJobModel(url: String) = {
+ def readJobModel(url: String, initialDelayMs: Int = scala.util.Random.nextInt(DEFAULT_READ_JOBMODEL_DELAY_MS)) = {
info("Fetching configuration from: %s" format url)
SamzaObjectMapper
.getObjectMapper
- .readValue(Util.read(new URL(url)), classOf[JobModel])
+ .readValue(
+ Util.read(
+ url = new URL(url),
+ retryBackoff = new ExponentialSleepStrategy(initialDelayMs = initialDelayMs)),
+ classOf[JobModel])
}
def apply(containerModel: ContainerModel, jobModel: JobModel, jmxServer: JmxServer) = {
http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 112ec1c..06a96ad 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -20,6 +20,8 @@
package org.apache.samza.coordinator
+import java.util.concurrent.atomic.AtomicReference
+
import org.apache.samza.config.StorageConfig
import org.apache.samza.job.model.{JobModel, TaskModel}
import org.apache.samza.config.Config
@@ -55,6 +57,7 @@ object JobCoordinator extends Logging {
* a volatile value to store the current instantiated <code>JobCoordinator</code>
*/
@volatile var currentJobCoordinator: JobCoordinator = null
+ val jobModelRef: AtomicReference[JobModel] = new AtomicReference[JobModel]()
/**
* @param coordinatorSystemConfig A config object that contains job.name,
@@ -105,10 +108,12 @@ object JobCoordinator extends Logging {
changelogManager: ChangelogPartitionManager,
localityManager: LocalityManager,
streamMetadataCache: StreamMetadataCache) = {
- val jobModelGenerator = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache)
+ val jobModel: JobModel = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache)
+ jobModelRef.set(jobModel)
+
val server = new HttpServer
- server.addServlet("/*", new JobServlet(jobModelGenerator))
- currentJobCoordinator = new JobCoordinator(jobModelGenerator(), server)
+ server.addServlet("/*", new JobServlet(jobModelRef))
+ currentJobCoordinator = new JobCoordinator(jobModel, server)
currentJobCoordinator
}
@@ -141,15 +146,13 @@ object JobCoordinator extends Logging {
}
/**
- * The method intializes the jobModel and creates a JobModel generator which can be used to generate new JobModels
- * which catchup with the latest content from the coordinator stream.
+ * The method intializes the jobModel and returns it to the caller.
+ * Note: refreshJobModel can be used as a lambda for JobModel generation in the future.
*/
private def initializeJobModel(config: Config,
changelogManager: ChangelogPartitionManager,
localityManager: LocalityManager,
- streamMetadataCache: StreamMetadataCache): () => JobModel = {
-
-
+ streamMetadataCache: StreamMetadataCache): JobModel = {
// Do grouping to fetch TaskName to SSP mapping
val allSystemStreamPartitions = getInputStreamPartitions(config, streamMetadataCache)
val grouper = getSystemStreamPartitionGrouper(config)
@@ -195,56 +198,54 @@ object JobCoordinator extends Logging {
info("Saving task-to-changelog partition mapping: %s" format newChangelogMapping)
changelogManager.writeChangeLogPartitionMapping(newChangelogMapping)
}
- // Return a jobModelGenerator lambda that can be used to refresh the job model
- jobModelGenerator
+
+ jobModel
}
/**
* Build a full Samza job model. The function reads the latest checkpoint from the underlying coordinator stream and
* builds a new JobModel.
- * This method needs to be thread safe, the reason being, for every HTTP request from a container, this method is called
- * and underlying it uses the same instance of coordinator stream producer and coordinator stream consumer.
+ * Note: This method no longer needs to be thread safe because HTTP request from a container no longer triggers a jobmodel
+ * refresh. Hence, there is no need for synchronization as before.
*/
private def refreshJobModel(config: Config,
allSystemStreamPartitions: util.Set[SystemStreamPartition],
groups: util.Map[TaskName, util.Set[SystemStreamPartition]],
previousChangelogMapping: util.Map[TaskName, Integer],
localityManager: LocalityManager): JobModel = {
- this.synchronized
+
+ // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change
+ // mapping.
+ var maxChangelogPartitionId = previousChangelogMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
+
+ // Assign all SystemStreamPartitions to TaskNames.
+ val taskModels =
{
- // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change
- // mapping.
- var maxChangelogPartitionId = previousChangelogMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
-
- // Assign all SystemStreamPartitions to TaskNames.
- val taskModels =
- {
- groups.map
- { case (taskName, systemStreamPartitions) =>
- val changelogPartition = Option(previousChangelogMapping.get(taskName)) match
- {
- case Some(changelogPartitionId) => new Partition(changelogPartitionId)
- case _ =>
- // If we've never seen this TaskName before, then assign it a
- // new changelog.
- maxChangelogPartitionId += 1
- info("New task %s is being assigned changelog partition %s." format(taskName, maxChangelogPartitionId))
- new Partition(maxChangelogPartitionId)
- }
- new TaskModel(taskName, systemStreamPartitions, changelogPartition)
- }.toSet
- }
-
- // Here is where we should put in a pluggable option for the
- // SSPTaskNameGrouper for locality, load-balancing, etc.
-
- val containerGrouperFactory = Util.getObj[TaskNameGrouperFactory](config.getTaskNameGrouperFactory)
- val containerGrouper = containerGrouperFactory.build(config)
- val containerModels = asScalaSet(containerGrouper.group(setAsJavaSet(taskModels))).map
- { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
-
- new JobModel(config, containerModels, localityManager)
+ groups.map
+ { case (taskName, systemStreamPartitions) =>
+ val changelogPartition = Option(previousChangelogMapping.get(taskName)) match
+ {
+ case Some(changelogPartitionId) => new Partition(changelogPartitionId)
+ case _ =>
+ // If we've never seen this TaskName before, then assign it a
+ // new changelog.
+ maxChangelogPartitionId += 1
+ info("New task %s is being assigned changelog partition %s." format(taskName, maxChangelogPartitionId))
+ new Partition(maxChangelogPartitionId)
+ }
+ new TaskModel(taskName, systemStreamPartitions, changelogPartition)
+ }.toSet
}
+
+ // Here is where we should put in a pluggable option for the
+ // SSPTaskNameGrouper for locality, load-balancing, etc.
+
+ val containerGrouperFactory = Util.getObj[TaskNameGrouperFactory](config.getTaskNameGrouperFactory)
+ val containerGrouper = containerGrouperFactory.build(config)
+ val containerModels = asScalaSet(containerGrouper.group(setAsJavaSet(taskModels))).map
+ { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
+
+ new JobModel(config, containerModels, localityManager)
}
private def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int, streamMetadataCache: StreamMetadataCache) {
http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
index a3baddb..5750c2d 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
@@ -20,12 +20,21 @@
package org.apache.samza.coordinator.server
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.samza.SamzaException
import org.apache.samza.job.model.JobModel
import org.apache.samza.util.Logging
/**
* A servlet that dumps the job model for a Samza job.
*/
-class JobServlet(jobModelGenerator: () => JobModel) extends ServletBase with Logging {
- protected def getObjectToWrite() = jobModelGenerator()
+class JobServlet(jobModelRef: AtomicReference[JobModel]) extends ServletBase with Logging {
+ protected def getObjectToWrite() = {
+ val jobModel = jobModelRef.get()
+ if (jobModel == null) { // This should never happen because JobServlet is instantiated only after a jobModel is generated and its reference is updated
+ throw new SamzaException("Job Model is not defined in the JobCoordinator. This indicates that the Samza job is unstable. Exiting...")
+ }
+ jobModel
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 58fbb8f..bd0fe5f 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -119,15 +119,22 @@ object Util extends Logging {
}
/**
+ * Overriding read method defined below so that it can be accessed from Java classes with default values
+ */
+ def read(url: URL, timeout: Int): String = {
+ read(url, timeout, new ExponentialSleepStrategy)
+ }
+
+ /**
* Reads a URL and returns its body as a string. Does no error handling.
*
* @param url HTTP URL to read from.
* @param timeout How long to wait before timing out when connecting to or reading from the HTTP server.
+ * @param retryBackoff Instance of exponentialSleepStrategy that encapsulates info on how long to sleep and retry operation
* @return String payload of the body of the HTTP response.
*/
- def read(url: URL, timeout: Int = 60000): String = {
+ def read(url: URL, timeout: Int = 60000, retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy): String = {
var httpConn = getHttpConnection(url, timeout)
- val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
retryBackoff.run(loop => {
if(httpConn.getResponseCode != 200)
{
@@ -143,6 +150,10 @@ object Util extends Logging {
},
(exception, loop) => {
exception match {
+ case ioe: IOException => {
+ warn("Error getting response from Job coordinator server. received IOException: %s. Retrying..." format ioe.getClass)
+ httpConn = getHttpConnection(url, timeout)
+ }
case e: Exception =>
loop.done
error("Unable to connect to Job coordinator server, received exception", e)
http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 365ff0a..9df12d2 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -19,7 +19,9 @@
package org.apache.samza.container
+import java.net.SocketTimeoutException
import java.util
+import java.util.concurrent.atomic.AtomicReference
import org.apache.samza.storage.TaskStorageManager
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
@@ -30,8 +32,7 @@ import org.apache.samza.Partition
import org.apache.samza.config.Config
import org.apache.samza.config.MapConfig
import org.apache.samza.coordinator.JobCoordinator
-import org.apache.samza.coordinator.server.HttpServer
-import org.apache.samza.coordinator.server.JobServlet
+import org.apache.samza.coordinator.server.{ServletBase, HttpServer, JobServlet}
import org.apache.samza.job.model.ContainerModel
import org.apache.samza.job.model.JobModel
import org.apache.samza.job.model.TaskModel
@@ -76,7 +77,8 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
def jobModelGenerator(): JobModel = jobModel
val server = new HttpServer
val coordinator = new JobCoordinator(jobModel, server)
- coordinator.server.addServlet("/*", new JobServlet(jobModelGenerator))
+ JobCoordinator.jobModelRef.set(jobModelGenerator())
+ coordinator.server.addServlet("/*", new JobServlet(JobCoordinator.jobModelRef))
try {
coordinator.start
assertEquals(jobModel, SamzaContainer.readJobModel(server.getUrl.toString))
@@ -86,6 +88,33 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
}
@Test
+ def testReadJobModelWithTimeouts {
+ val config = new MapConfig(Map("a" -> "b"))
+ val offsets = new util.HashMap[SystemStreamPartition, String]()
+ offsets.put(new SystemStreamPartition("system","stream", new Partition(0)), "1")
+ val tasks = Map(
+ new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets.keySet(), new Partition(0)),
+ new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets.keySet(), new Partition(0)))
+ val containers = Map(
+ Integer.valueOf(0) -> new ContainerModel(0, tasks),
+ Integer.valueOf(1) -> new ContainerModel(1, tasks))
+ val jobModel = new JobModel(config, containers)
+ def jobModelGenerator(): JobModel = jobModel
+ val server = new HttpServer
+ val coordinator = new JobCoordinator(jobModel, server)
+ JobCoordinator.jobModelRef.set(jobModelGenerator())
+ val mockJobServlet = new MockJobServlet(2, JobCoordinator.jobModelRef)
+ coordinator.server.addServlet("/*", mockJobServlet)
+ try {
+ coordinator.start
+ assertEquals(jobModel, SamzaContainer.readJobModel(server.getUrl.toString))
+ } finally {
+ coordinator.stop
+ }
+ assertEquals(2, mockJobServlet.exceptionCount)
+ }
+
+ @Test
def testChangelogPartitions {
val config = new MapConfig(Map("a" -> "b"))
val offsets = new util.HashMap[SystemStreamPartition, String]()
@@ -272,3 +301,17 @@ class MockCheckpointManager extends CheckpointManager {
override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint): Unit = { }
}
+
+class MockJobServlet(exceptionLimit: Int, jobModelRef: AtomicReference[JobModel]) extends JobServlet(jobModelRef) {
+ var exceptionCount = 0
+
+ override protected def getObjectToWrite() = {
+ if (exceptionCount < exceptionLimit) {
+ exceptionCount += 1
+ throw new java.io.IOException("Throwing exception")
+ } else {
+ val jobModel = jobModelRef.get()
+ jobModel
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index 80cccf3..9ab1dd5 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -19,6 +19,8 @@
package org.apache.samza.coordinator
+import org.apache.samza.serializers.model.SamzaObjectMapper
+import org.apache.samza.util.Util
import org.junit.{After, Test}
import org.junit.Assert._
import scala.collection.JavaConversions._
@@ -98,10 +100,21 @@ class TestJobCoordinator {
MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
val coordinator = JobCoordinator(new MapConfig(config ++ otherConfigs))
+ val expectedJobModel = new JobModel(new MapConfig(config), containers)
+
+ // Verify that the atomicReference is initialized
+ assertNotNull(JobCoordinator.jobModelRef.get())
+ assertEquals(expectedJobModel, JobCoordinator.jobModelRef.get())
+
coordinator.start
- val jobModel = new JobModel(new MapConfig(config), containers)
assertEquals(new MapConfig(config), coordinator.jobModel.getConfig)
- assertEquals(jobModel, coordinator.jobModel)
+ assertEquals(expectedJobModel, coordinator.jobModel)
+
+ // Verify that the JobServlet is serving the correct jobModel
+ val jobModelFromCoordinatorUrl = SamzaObjectMapper.getObjectMapper.readValue(Util.read(coordinator.server.getUrl), classOf[JobModel])
+ assertEquals(expectedJobModel, jobModelFromCoordinatorUrl)
+
+ coordinator.stop
}
@Test