You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:43 UTC

[27/50] [abbrv] samza git commit: SAMZA-788 - coordinator stream configuration should not guess the system names

SAMZA-788 - coordinator stream configuration should not guess the system names


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/092e3811
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/092e3811
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/092e3811

Branch: refs/heads/samza-sql
Commit: 092e381131cb7de9e9ffc0807e997418daa3498a
Parents: 62254d0
Author: Navina <na...@gmail.com>
Authored: Thu Nov 19 14:21:16 2015 -0800
Committer: Navina <na...@gmail.com>
Committed: Thu Nov 19 14:21:16 2015 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/samza/config/JobConfig.scala | 13 ++-----------
 .../stream/TestCoordinatorStreamWriter.java            |  1 +
 .../src/test/resources/test-migration-fail.properties  |  1 +
 samza-core/src/test/resources/test.properties          |  3 ++-
 4 files changed, 6 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/092e3811/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 6d73bb9..85a1ca4 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -50,17 +50,8 @@ object JobConfig {
 class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getName = getOption(JobConfig.JOB_NAME)
 
-  def getCoordinatorSystemName = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse({
-    // If no coordinator system is configured, try and guess it if there's just one system configured.
-    val systemNames = config.getSystemNames.toSet
-    if (systemNames.size == 1) {
-      val systemName = systemNames.iterator.next
-      info("No coordinator system defined, so defaulting to %s" format systemName)
-      systemName
-    } else {
-      throw new ConfigException("Missing job.coordinator.system configuration.")
-    }
-  })
+  def getCoordinatorSystemName = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse(
+      throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution."))
 
   def getContainerCount = {
     getOption(JobConfig.JOB_CONTAINER_COUNT) match {

http://git-wip-us.apache.org/repos/asf/samza/blob/092e3811/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
index f9c6304..f83487d 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
@@ -51,6 +51,7 @@ public class TestCoordinatorStreamWriter {
     Map<String, String> configMap = new HashMap<>();
     configMap.put("systems.coordinatorStreamWriter.samza.factory", "org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory");
     configMap.put("job.name", "coordinator-stream-writer-test");
+    configMap.put("job.coordinator.system", "coordinatorStreamWriter");
     Config config = new MapConfig(configMap);
     coordinatorStreamWriter = new CoordinatorStreamWriter(config);
     boolean exceptionHappened = false;

http://git-wip-us.apache.org/repos/asf/samza/blob/092e3811/samza-core/src/test/resources/test-migration-fail.properties
----------------------------------------------------------------------
diff --git a/samza-core/src/test/resources/test-migration-fail.properties b/samza-core/src/test/resources/test-migration-fail.properties
index b0657de..3b6c1f6 100644
--- a/samza-core/src/test/resources/test-migration-fail.properties
+++ b/samza-core/src/test/resources/test-migration-fail.properties
@@ -24,3 +24,4 @@ job.name=test-job
 foo=bar
 systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
 task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory
+job.coordinator.system=coordinator

http://git-wip-us.apache.org/repos/asf/samza/blob/092e3811/samza-core/src/test/resources/test.properties
----------------------------------------------------------------------
diff --git a/samza-core/src/test/resources/test.properties b/samza-core/src/test/resources/test.properties
index 41eb82e..be16c86 100644
--- a/samza-core/src/test/resources/test.properties
+++ b/samza-core/src/test/resources/test.properties
@@ -22,4 +22,5 @@
 job.factory.class=org.apache.samza.job.MockJobFactory
 job.name=test-job
 foo=bar
-systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
\ No newline at end of file
+systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+job.coordinator.system=coordinator
\ No newline at end of file