You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/04/15 08:19:38 UTC

samza git commit: SAMZA-833 - ProcessJob mishandling containers

Repository: samza
Updated Branches:
  refs/heads/master ad23e69b7 -> 0169912c6


SAMZA-833 - ProcessJob mishandling containers


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0169912c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0169912c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0169912c

Branch: refs/heads/master
Commit: 0169912c65249ea18e1da856838866ee622006a9
Parents: ad23e69
Author: Tao Feng <fe...@gmail.com>
Authored: Thu Apr 14 23:12:18 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Thu Apr 14 23:12:18 2016 -0700

----------------------------------------------------------------------
 .../org/apache/samza/job/local/ProcessJobFactory.scala      | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/0169912c/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index 17c2e5b..81ef59a 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -22,6 +22,7 @@ package org.apache.samza.job.local
 
 import java.io.File
 
+import org.apache.samza.SamzaException
 import org.apache.samza.config.{JobConfig, Config}
 import org.apache.samza.config.TaskConfig._
 import org.apache.samza.coordinator.JobCoordinator
@@ -32,7 +33,13 @@ import org.apache.samza.util.{Logging, Util}
  * Creates a stand alone ProcessJob with the specified config.
  */
 class ProcessJobFactory extends StreamJobFactory with Logging {
-  def   getJob(config: Config): StreamJob = {
+  def  getJob(config: Config): StreamJob = {
+    val containerCount = JobConfig.Config2Job(config).getContainerCount
+
+    if (containerCount > 1) {
+      throw new SamzaException("Container count larger than 1 is not supported for ProcessJobFactory")
+    }
+    
     val coordinator = JobCoordinator(config)
     val containerModel = coordinator.jobModel.getContainers.get(0)