You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/04/15 08:19:38 UTC
samza git commit: SAMZA-833 - ProcessJob mishandling containers
Repository: samza
Updated Branches:
refs/heads/master ad23e69b7 -> 0169912c6
SAMZA-833 - ProcessJob mishandling containers
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0169912c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0169912c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0169912c
Branch: refs/heads/master
Commit: 0169912c65249ea18e1da856838866ee622006a9
Parents: ad23e69
Author: Tao Feng <fe...@gmail.com>
Authored: Thu Apr 14 23:12:18 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Thu Apr 14 23:12:18 2016 -0700
----------------------------------------------------------------------
.../org/apache/samza/job/local/ProcessJobFactory.scala | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/0169912c/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index 17c2e5b..81ef59a 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -22,6 +22,7 @@ package org.apache.samza.job.local
import java.io.File
+import org.apache.samza.SamzaException
import org.apache.samza.config.{JobConfig, Config}
import org.apache.samza.config.TaskConfig._
import org.apache.samza.coordinator.JobCoordinator
@@ -32,7 +33,13 @@ import org.apache.samza.util.{Logging, Util}
* Creates a stand alone ProcessJob with the specified config.
*/
class ProcessJobFactory extends StreamJobFactory with Logging {
- def getJob(config: Config): StreamJob = {
+ def getJob(config: Config): StreamJob = {
+ val containerCount = JobConfig.Config2Job(config).getContainerCount
+
+ if (containerCount > 1) {
+ throw new SamzaException("Container count larger than 1 is not supported for ProcessJobFactory")
+ }
+
val coordinator = JobCoordinator(config)
val containerModel = coordinator.jobModel.getContainers.get(0)