You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/02/28 18:16:21 UTC

samza git commit: Fixes to integrate execution env and api

Repository: samza
Updated Branches:
  refs/heads/samza-fluent-api-v1 8c1f56d6d -> 7340afa6b


Fixes to integrate execution env and api


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7340afa6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7340afa6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7340afa6

Branch: refs/heads/samza-fluent-api-v1
Commit: 7340afa6b5fd3e55b4c72a3010806feae11f5996
Parents: 8c1f56d
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Tue Feb 28 10:15:18 2017 -0800
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Tue Feb 28 10:15:18 2017 -0800

----------------------------------------------------------------------
 .../apache/samza/system/ExecutionEnvironment.java |  2 +-
 .../apache/samza/operators/StreamGraphImpl.java   |  4 ++--
 .../samza/processorgraph/ExecutionPlanner.java    | 18 +++++++++++-------
 .../scala/org/apache/samza/job/JobRunner.scala    |  4 ++--
 4 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7340afa6/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
index ef46626..a45c004 100644
--- a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
+++ b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
@@ -32,7 +32,7 @@ import org.apache.samza.config.Config;
 public interface ExecutionEnvironment extends StreamProvider {
 
   String ENVIRONMENT_CONFIG = "job.execution.environment.class";
-  String DEFAULT_ENVIRONMENT_CLASS = "org.apache.samza.system.StandaloneExecutionEnvironment";
+  String DEFAULT_ENVIRONMENT_CLASS = "org.apache.samza.system.AbstractExecutionEnvironment";
 
   /**
    * Static method to load the local standalone environment

http://git-wip-us.apache.org/repos/asf/samza/blob/7340afa6/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 353f455..3e28380 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -208,8 +208,8 @@ public class StreamGraphImpl implements StreamGraph {
    */
   public MessageStreamImpl getInputStream(SystemStream sstream) {
     for(MessageStream entry: this.inStreams.values()) {
-      if (((InputStreamImpl) entry).getSpec().getSystemName() == sstream.getSystem() &&
-          ((InputStreamImpl) entry).getSpec().getPhysicalName() == sstream.getStream()) {
+      if (((InputStreamImpl) entry).getSpec().getSystemName().equals(sstream.getSystem()) &&
+          ((InputStreamImpl) entry).getSpec().getPhysicalName().equals(sstream.getStream())) {
         return (MessageStreamImpl) entry;
       }
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/7340afa6/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
index 055f87c..56df480 100644
--- a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
@@ -58,11 +58,13 @@ public class ExecutionPlanner {
     // create physical processors based on stream graph
     ProcessorGraph processorGraph = splitStages(streamGraph);
 
-    // figure out the partition for internal streams
-    Multimap<String, StreamSpec> streams = calculatePartitions(streamGraph, processorGraph, sysAdmins);
+    if(!processorGraph.getInternalStreams().isEmpty()) {
+      // figure out the partition for internal streams
+      Multimap<String, StreamSpec> streams = calculatePartitions(streamGraph, processorGraph, sysAdmins);
 
-    // create the streams
-    createStreams(streams, sysAdmins);
+      // create the streams
+      createStreams(streams, sysAdmins);
+    }
 
     return processorGraph;
   }
@@ -75,8 +77,8 @@ public class ExecutionPlanner {
     ProcessorGraph processorGraph = new ProcessorGraph(config);
 
     // TODO: remote the casting once we have the correct types in StreamGraph
-    Set<StreamSpec> sourceStreams = (Set) streamGraph.getInStreams().keySet();
-    Set<StreamSpec> sinkStreams = (Set) streamGraph.getOutStreams().keySet();
+    Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInStreams().keySet());
+    Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutStreams().keySet());
     Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
     intStreams.retainAll(sinkStreams);
     sourceStreams.removeAll(intStreams);
@@ -146,7 +148,9 @@ public class ExecutionPlanner {
       SystemAdmin systemAdmin = sysAdmins.get(systemName);
       Map<String, SystemStreamMetadata> metadata = systemAdmin.getSystemStreamMetadata(streamToEdge.keySet());
       metadata.forEach((stream, data) -> {
-          streamToEdge.get(stream).setPartitions(data.getSystemStreamPartitionMetadata().size());
+          int partitions = data.getSystemStreamPartitionMetadata().size();
+          streamToEdge.get(stream).setPartitions(partitions);
+          log.info("Partition count is {} for stream {}", partitions, stream);
         });
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/7340afa6/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 61bfafb..1d81b33 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -72,9 +72,9 @@ object JobRunner extends Logging {
     // start execution env if it's defined
     val envClass: String = config.get(ExecutionEnvironment.ENVIRONMENT_CONFIG, "")
     if (!envClass.isEmpty) {
-      val env: ExecutionEnvironment = ClassLoaderHelper.fromClassName(envClass)
+      val env: ExecutionEnvironment = ExecutionEnvironment.fromConfig(config)
       val graphBuilder: StreamGraphBuilder = Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)).newInstance.asInstanceOf[StreamGraphBuilder]
-      env.run(graphBuilder, config)
+      env.run(graphBuilder, rewriteConfig(config))
     } else {
       new JobRunner(rewriteConfig(config)).run()
     }