You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:43 UTC
[27/50] [abbrv] samza git commit: SAMZA-788 - coordinator stream
configuration should not guess the system names
SAMZA-788 - coordinator stream configuration should not guess the system names
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/092e3811
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/092e3811
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/092e3811
Branch: refs/heads/samza-sql
Commit: 092e381131cb7de9e9ffc0807e997418daa3498a
Parents: 62254d0
Author: Navina <na...@gmail.com>
Authored: Thu Nov 19 14:21:16 2015 -0800
Committer: Navina <na...@gmail.com>
Committed: Thu Nov 19 14:21:16 2015 -0800
----------------------------------------------------------------------
.../main/scala/org/apache/samza/config/JobConfig.scala | 13 ++-----------
.../stream/TestCoordinatorStreamWriter.java | 1 +
.../src/test/resources/test-migration-fail.properties | 1 +
samza-core/src/test/resources/test.properties | 3 ++-
4 files changed, 6 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/092e3811/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 6d73bb9..85a1ca4 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -50,17 +50,8 @@ object JobConfig {
class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
def getName = getOption(JobConfig.JOB_NAME)
- def getCoordinatorSystemName = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse({
- // If no coordinator system is configured, try and guess it if there's just one system configured.
- val systemNames = config.getSystemNames.toSet
- if (systemNames.size == 1) {
- val systemName = systemNames.iterator.next
- info("No coordinator system defined, so defaulting to %s" format systemName)
- systemName
- } else {
- throw new ConfigException("Missing job.coordinator.system configuration.")
- }
- })
+ def getCoordinatorSystemName = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse(
+ throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution."))
def getContainerCount = {
getOption(JobConfig.JOB_CONTAINER_COUNT) match {
http://git-wip-us.apache.org/repos/asf/samza/blob/092e3811/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
index f9c6304..f83487d 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
@@ -51,6 +51,7 @@ public class TestCoordinatorStreamWriter {
Map<String, String> configMap = new HashMap<>();
configMap.put("systems.coordinatorStreamWriter.samza.factory", "org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory");
configMap.put("job.name", "coordinator-stream-writer-test");
+ configMap.put("job.coordinator.system", "coordinatorStreamWriter");
Config config = new MapConfig(configMap);
coordinatorStreamWriter = new CoordinatorStreamWriter(config);
boolean exceptionHappened = false;
http://git-wip-us.apache.org/repos/asf/samza/blob/092e3811/samza-core/src/test/resources/test-migration-fail.properties
----------------------------------------------------------------------
diff --git a/samza-core/src/test/resources/test-migration-fail.properties b/samza-core/src/test/resources/test-migration-fail.properties
index b0657de..3b6c1f6 100644
--- a/samza-core/src/test/resources/test-migration-fail.properties
+++ b/samza-core/src/test/resources/test-migration-fail.properties
@@ -24,3 +24,4 @@ job.name=test-job
foo=bar
systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory
+job.coordinator.system=coordinator
http://git-wip-us.apache.org/repos/asf/samza/blob/092e3811/samza-core/src/test/resources/test.properties
----------------------------------------------------------------------
diff --git a/samza-core/src/test/resources/test.properties b/samza-core/src/test/resources/test.properties
index 41eb82e..be16c86 100644
--- a/samza-core/src/test/resources/test.properties
+++ b/samza-core/src/test/resources/test.properties
@@ -22,4 +22,5 @@
job.factory.class=org.apache.samza.job.MockJobFactory
job.name=test-job
foo=bar
-systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
\ No newline at end of file
+systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+job.coordinator.system=coordinator
\ No newline at end of file