You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/02/28 18:16:21 UTC
samza git commit: Fixes to integrate execution env and api
Repository: samza
Updated Branches:
refs/heads/samza-fluent-api-v1 8c1f56d6d -> 7340afa6b
Fixes to integrate execution env and api
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7340afa6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7340afa6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7340afa6
Branch: refs/heads/samza-fluent-api-v1
Commit: 7340afa6b5fd3e55b4c72a3010806feae11f5996
Parents: 8c1f56d
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Tue Feb 28 10:15:18 2017 -0800
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Tue Feb 28 10:15:18 2017 -0800
----------------------------------------------------------------------
.../apache/samza/system/ExecutionEnvironment.java | 2 +-
.../apache/samza/operators/StreamGraphImpl.java | 4 ++--
.../samza/processorgraph/ExecutionPlanner.java | 18 +++++++++++-------
.../scala/org/apache/samza/job/JobRunner.scala | 4 ++--
4 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/7340afa6/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
index ef46626..a45c004 100644
--- a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
+++ b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
@@ -32,7 +32,7 @@ import org.apache.samza.config.Config;
public interface ExecutionEnvironment extends StreamProvider {
String ENVIRONMENT_CONFIG = "job.execution.environment.class";
- String DEFAULT_ENVIRONMENT_CLASS = "org.apache.samza.system.StandaloneExecutionEnvironment";
+ String DEFAULT_ENVIRONMENT_CLASS = "org.apache.samza.system.AbstractExecutionEnvironment";
/**
* Static method to load the local standalone environment
http://git-wip-us.apache.org/repos/asf/samza/blob/7340afa6/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 353f455..3e28380 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -208,8 +208,8 @@ public class StreamGraphImpl implements StreamGraph {
*/
public MessageStreamImpl getInputStream(SystemStream sstream) {
for(MessageStream entry: this.inStreams.values()) {
- if (((InputStreamImpl) entry).getSpec().getSystemName() == sstream.getSystem() &&
- ((InputStreamImpl) entry).getSpec().getPhysicalName() == sstream.getStream()) {
+ if (((InputStreamImpl) entry).getSpec().getSystemName().equals(sstream.getSystem()) &&
+ ((InputStreamImpl) entry).getSpec().getPhysicalName().equals(sstream.getStream())) {
return (MessageStreamImpl) entry;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/7340afa6/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
index 055f87c..56df480 100644
--- a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
@@ -58,11 +58,13 @@ public class ExecutionPlanner {
// create physical processors based on stream graph
ProcessorGraph processorGraph = splitStages(streamGraph);
- // figure out the partition for internal streams
- Multimap<String, StreamSpec> streams = calculatePartitions(streamGraph, processorGraph, sysAdmins);
+ if(!processorGraph.getInternalStreams().isEmpty()) {
+ // figure out the partition for internal streams
+ Multimap<String, StreamSpec> streams = calculatePartitions(streamGraph, processorGraph, sysAdmins);
- // create the streams
- createStreams(streams, sysAdmins);
+ // create the streams
+ createStreams(streams, sysAdmins);
+ }
return processorGraph;
}
@@ -75,8 +77,8 @@ public class ExecutionPlanner {
ProcessorGraph processorGraph = new ProcessorGraph(config);
// TODO: remote the casting once we have the correct types in StreamGraph
- Set<StreamSpec> sourceStreams = (Set) streamGraph.getInStreams().keySet();
- Set<StreamSpec> sinkStreams = (Set) streamGraph.getOutStreams().keySet();
+ Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInStreams().keySet());
+ Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutStreams().keySet());
Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
intStreams.retainAll(sinkStreams);
sourceStreams.removeAll(intStreams);
@@ -146,7 +148,9 @@ public class ExecutionPlanner {
SystemAdmin systemAdmin = sysAdmins.get(systemName);
Map<String, SystemStreamMetadata> metadata = systemAdmin.getSystemStreamMetadata(streamToEdge.keySet());
metadata.forEach((stream, data) -> {
- streamToEdge.get(stream).setPartitions(data.getSystemStreamPartitionMetadata().size());
+ int partitions = data.getSystemStreamPartitionMetadata().size();
+ streamToEdge.get(stream).setPartitions(partitions);
+ log.info("Partition count is {} for stream {}", partitions, stream);
});
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/7340afa6/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 61bfafb..1d81b33 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -72,9 +72,9 @@ object JobRunner extends Logging {
// start execution env if it's defined
val envClass: String = config.get(ExecutionEnvironment.ENVIRONMENT_CONFIG, "")
if (!envClass.isEmpty) {
- val env: ExecutionEnvironment = ClassLoaderHelper.fromClassName(envClass)
+ val env: ExecutionEnvironment = ExecutionEnvironment.fromConfig(config)
val graphBuilder: StreamGraphBuilder = Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)).newInstance.asInstanceOf[StreamGraphBuilder]
- env.run(graphBuilder, config)
+ env.run(graphBuilder, rewriteConfig(config))
} else {
new JobRunner(rewriteConfig(config)).run()
}