You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/02/22 01:17:44 UTC
samza git commit: SAMZA-1067: Physical execution plan for logic
expressions written in fluent API
Repository: samza
Updated Branches:
refs/heads/samza-fluent-api-v1 8815b0392 -> ea37b7463
SAMZA-1067: Physical execution plan for logic expressions written in fluent API
This is the initial commit for integration. No tests have been done.
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ea37b746
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ea37b746
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ea37b746
Branch: refs/heads/samza-fluent-api-v1
Commit: ea37b7463f83c5c176c73d0b8f4c6a6199854f40
Parents: 8815b03
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Tue Feb 21 17:16:34 2017 -0800
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Tue Feb 21 17:16:34 2017 -0800
----------------------------------------------------------------------
.../samza/processorgraph/ExecutionPlanner.java | 192 ++++++++++++
.../samza/processorgraph/ProcessorGraph.java | 294 +++++++++++++++++++
.../samza/processorgraph/ProcessorNode.java | 103 +++++++
.../apache/samza/processorgraph/StreamEdge.java | 108 +++++++
.../system/RemoteExecutionEnvironment.java | 35 ++-
.../apache/samza/util/ConfigInheritence.java | 60 ++++
.../org/apache/samza/config/JobConfig.scala | 8 +
.../scala/org/apache/samza/job/JobRunner.scala | 29 +-
.../main/scala/org/apache/samza/util/Util.scala | 25 ++
.../processorgraph/TestProcessorGraph.java | 198 +++++++++++++
10 files changed, 1043 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
new file mode 100644
index 0000000..a990463
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExecutionPlanner {
+ private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class);
+
+ private final Config config;
+
+ public ExecutionPlanner(Config config) {
+ this.config = config;
+ }
+
+ public ProcessorGraph plan(StreamGraph streamGraph) throws Exception {
+ Map<String, SystemAdmin> sysAdmins = getSystemAdmins(config);
+
+ // create physical processors based on stream graph
+ ProcessorGraph processorGraph = splitStages(streamGraph);
+
+ // figure out the partition for internal streams
+ Multimap<String, StreamSpec> streams = calculatePartitions(processorGraph, sysAdmins);
+
+ // create the streams
+ createStreams(streams, sysAdmins);
+
+ return processorGraph;
+ }
+
+ public ProcessorGraph splitStages(StreamGraph streamGraph) throws Exception {
+ String pipelineId = String.format("%s-%s", config.get(JobConfig.JOB_NAME()), config.getOrDefault(JobConfig.JOB_ID(), "1"));
+ // For this phase, we are going to create a processor with the whole dag
+ String processorId = pipelineId; // only one processor, name it the same as pipeline itself
+
+ ProcessorGraph processorGraph = new ProcessorGraph(config);
+
+ // TODO: remote the casting once we have the correct types in StreamGraph
+ Set<StreamSpec> sourceStreams = (Set) streamGraph.getInStreams().keySet();
+ Set<StreamSpec> sinkStreams = (Set) streamGraph.getOutStreams().keySet();
+ Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
+ intStreams.retainAll(sinkStreams);
+ sourceStreams.removeAll(intStreams);
+ sinkStreams.removeAll(intStreams);
+
+ // add sources
+ sourceStreams.forEach(spec -> processorGraph.addSource(spec, processorId));
+
+ // add sinks
+ sinkStreams.forEach(spec -> processorGraph.addSink(spec, processorId));
+
+ // add intermediate streams
+ intStreams.forEach(spec -> processorGraph.addEdge(spec, processorId, processorId));
+
+ processorGraph.validate();
+
+ return processorGraph;
+ }
+
+ private Multimap<String, StreamSpec> calculatePartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
+ // fetch the external streams partition info
+ getExternalStreamPartitions(processorGraph, sysAdmins);
+
+ // TODO this algorithm assumes only one processor, and it does not consider join
+ Multimap<String, StreamSpec> streamsGroupedBySystem = HashMultimap.create();
+ List<ProcessorNode> processors = processorGraph.topologicalSort();
+ processors.forEach(processor -> {
+ Set<StreamEdge> outStreams = new HashSet<>(processor.getOutEdges());
+ outStreams.retainAll(processorGraph.getInternalStreams());
+ if (!outStreams.isEmpty()) {
+ int maxInPartition = maxPartition(processor.getInEdges());
+ int maxOutPartition = maxPartition(processor.getOutEdges());
+ int partition = Math.max(maxInPartition, maxOutPartition);
+
+ outStreams.forEach(streamEdge -> {
+ streamEdge.setPartitions(partition);
+ StreamSpec streamSpec = createStreamSpec(streamEdge);
+ streamsGroupedBySystem.put(streamEdge.getSystemStream().getSystem(), streamSpec);
+ });
+ }
+ });
+
+ return streamsGroupedBySystem;
+ }
+
+ private void getExternalStreamPartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) {
+ Set<StreamEdge> externalStreams = new HashSet<>();
+ externalStreams.addAll(processorGraph.getSources());
+ externalStreams.addAll(processorGraph.getSinks());
+
+ Multimap<String, StreamEdge> externalStreamsMap = HashMultimap.create();
+ externalStreams.forEach(streamEdge -> {
+ SystemStream systemStream = streamEdge.getSystemStream();
+ externalStreamsMap.put(systemStream.getSystem(), streamEdge);
+ });
+ for (Map.Entry<String, Collection<StreamEdge>> entry : externalStreamsMap.asMap().entrySet()) {
+ String systemName = entry.getKey();
+ Collection<StreamEdge> streamEdges = entry.getValue();
+ Map<String, StreamEdge> streamToEdge = new HashMap<>();
+ streamEdges.forEach(streamEdge -> streamToEdge.put(streamEdge.getSystemStream().getStream(), streamEdge));
+ SystemAdmin systemAdmin = sysAdmins.get(systemName);
+ Map<String, SystemStreamMetadata> metadata = systemAdmin.getSystemStreamMetadata(streamToEdge.keySet());
+ metadata.forEach((stream, data) -> {
+ streamToEdge.get(stream).setPartitions(data.getSystemStreamPartitionMetadata().size());
+ });
+ }
+ }
+
+ private void createStreams(Multimap<String, StreamSpec> streams, Map<String, SystemAdmin> sysAdmins) {
+ for (Map.Entry<String, Collection<StreamSpec>> entry : streams.asMap().entrySet()) {
+ String systemName = entry.getKey();
+ SystemAdmin systemAdmin = sysAdmins.get(systemName);
+
+ for (StreamSpec stream : entry.getValue()) {
+ log.info("Creating stream {} on system {}", stream.getPhysicalName(), systemName);
+ systemAdmin.createStream(stream);
+ }
+ }
+ }
+
+ private static int maxPartition(Collection<StreamEdge> edges) {
+ return edges.stream().map(StreamEdge::getPartitions).reduce(Integer::max).get();
+ }
+
+ private static StreamSpec createStreamSpec(StreamEdge edge) {
+ StreamSpec orgSpec = edge.getStreamSpec();
+ return orgSpec.copyWithPartitionCount(edge.getPartitions());
+ }
+
+ private static Map<String, SystemAdmin> getSystemAdmins(Config config) {
+ return getSystemFactories(config).entrySet()
+ .stream()
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().getAdmin(entry.getKey(), config)));
+ }
+
+ private static Map<String, SystemFactory> getSystemFactories(Config config) {
+ Map<String, SystemFactory> systemFactories =
+ getSystemNames(config).stream().collect(Collectors.toMap(systemName -> systemName, systemName -> {
+ String systemFactoryClassName = new JavaSystemConfig(config).getSystemFactory(systemName);
+ if (systemFactoryClassName == null) {
+ throw new SamzaException(
+ String.format("A stream uses system %s, which is missing from the configuration.", systemName));
+ }
+ return Util.getObj(systemFactoryClassName);
+ }));
+
+ return systemFactories;
+ }
+
+ private static Collection<String> getSystemNames(Config config) {
+ return new JavaSystemConfig(config).getSystemNames();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java
new file mode 100644
index 0000000..d4ad84b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.StreamSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The ProcessorGraph represents the multi-stage Samza processors of a pipeline on the physical execution layer.
+ * High level APIs are transformed into ProcessorGraph for future plan, validation and execution.
+ *
+ * <p>The ProcessorGraph is a graph of source/sink/intermediate streams and processors are connected together. Each
+ * ProcessorNode contains the config which is required to run the processor.
+ *
+ */
+public class ProcessorGraph {
+ private static final Logger log = LoggerFactory.getLogger(ProcessorGraph.class);
+
+ private final Map<String, ProcessorNode> nodes = new HashMap<>();
+ private final Map<String, StreamEdge> edges = new HashMap<>();
+ private final Set<StreamEdge> sources = new HashSet<>();
+ private final Set<StreamEdge> sinks = new HashSet<>();
+ private final Set<StreamEdge> internalStreams = new HashSet<>();
+ private final Config config;
+
+ ProcessorGraph(Config config) {
+ this.config = config;
+ }
+
+ void addSource(StreamSpec input, String targetProcessorId) {
+ ProcessorNode node = getNode(targetProcessorId);
+ StreamEdge edge = getEdge(input);
+ edge.addTargetNode(node);
+ node.addInEdge(edge);
+ sources.add(edge);
+
+ log.info(edge.toString());
+ }
+
+ void addSink(StreamSpec output, String sourceProcessorId) {
+ ProcessorNode node = getNode(sourceProcessorId);
+ StreamEdge edge = getEdge(output);
+ edge.addSourceNode(node);
+ node.addOutEdge(edge);
+ sinks.add(edge);
+
+ log.info(edge.toString());
+ }
+
+ void addEdge(StreamSpec streamSpec, String sourceProcessorId, String targetProcessorId) {
+ ProcessorNode sourceNode = getNode(sourceProcessorId);
+ ProcessorNode targetNode = getNode(targetProcessorId);
+ StreamEdge edge = getEdge(streamSpec);
+ edge.addSourceNode(sourceNode);
+ edge.addTargetNode(targetNode);
+ sourceNode.addOutEdge(edge);
+ targetNode.addInEdge(edge);
+ internalStreams.add(edge);
+
+ log.info(edge.toString());
+ }
+
+ ProcessorNode getNode(String processorId) {
+ ProcessorNode node = nodes.get(processorId);
+ if (node == null) {
+ node = new ProcessorNode(processorId, config);
+ nodes.put(processorId, node);
+ }
+ return node;
+ }
+
+ StreamEdge getEdge(StreamSpec streamSpec) {
+ String streamId = streamSpec.getId();
+ StreamEdge edge = edges.get(streamId);
+ if (edge == null) {
+ edge = new StreamEdge(streamSpec, config);
+ edges.put(streamId, edge);
+ }
+ return edge;
+ }
+
+ /**
+ * Returns the processor with configs to be executed in the topological order
+ * @return list of ProcessorConfig
+ */
+ public List<ProcessorNode> getProcessors() {
+ List<ProcessorNode> sortedNodes = topologicalSort();
+ return Collections.unmodifiableList(sortedNodes);
+ }
+
+ public Set<StreamEdge> getSources() {
+ return Collections.unmodifiableSet(sources);
+ }
+
+ public Set<StreamEdge> getSinks() {
+ return Collections.unmodifiableSet(sinks);
+ }
+
+ public Set<StreamEdge> getInternalStreams() {
+ return Collections.unmodifiableSet(internalStreams);
+ }
+
+
+ /**
+ * Validate the graph
+ */
+ public void validate() {
+ validateSources();
+ validateSinks();
+ validateInternalStreams();
+ validateReachability();
+ }
+
+ /**
+ * Validate the sources should have indegree being 0 and outdegree greater than 0
+ */
+ private void validateSources() {
+ sources.forEach(edge -> {
+ if (!edge.getSourceNodes().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Source stream %s should not have producers.", edge.getFormattedSystemStream()));
+ }
+ if (edge.getTargetNodes().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Source stream %s should have consumers.", edge.getFormattedSystemStream()));
+ }
+ });
+ }
+
+ /**
+ * Validate the sinks should have outdegree being 0 and indegree greater than 0
+ */
+ private void validateSinks() {
+ sinks.forEach(edge -> {
+ if (!edge.getTargetNodes().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Sink stream %s should not have consumers", edge.getFormattedSystemStream()));
+ }
+ if (edge.getSourceNodes().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Sink stream %s should have producers", edge.getFormattedSystemStream()));
+ }
+ });
+ }
+
+ /**
+ * Validate the internal streams should have both indegree and outdegree greater than 0
+ */
+ private void validateInternalStreams() {
+ Set<StreamEdge> internalEdges = new HashSet<>(edges.values());
+ internalEdges.removeAll(sources);
+ internalEdges.removeAll(sinks);
+
+ internalEdges.forEach(edge -> {
+ if (edge.getSourceNodes().isEmpty() || edge.getTargetNodes().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Internal stream %s should have both producers and consumers", edge.getFormattedSystemStream()));
+ }
+ });
+ }
+
+ /**
+ * Validate all nodes are reachable by sources.
+ */
+ private void validateReachability() {
+ // validate all nodes are reachable from the sources
+ final Set<ProcessorNode> reachable = findReachable();
+ if (reachable.size() != nodes.size()) {
+ Set<ProcessorNode> unreachable = new HashSet<>(nodes.values());
+ unreachable.removeAll(reachable);
+ throw new IllegalArgumentException(String.format("Processors %s cannot be reached from Sources.",
+ String.join(", ", unreachable.stream().map(ProcessorNode::getId).collect(Collectors.toList()))));
+ }
+ }
+
+ /**
+ * Find the reachable set of nodes using BFS.
+ * Package private for test.
+ * @return reachable set of nodes
+ */
+ Set<ProcessorNode> findReachable() {
+ Queue<ProcessorNode> queue = new ArrayDeque<>();
+ Set<ProcessorNode> visited = new HashSet<>();
+
+ sources.forEach(source -> {
+ List<ProcessorNode> next = source.getTargetNodes();
+ queue.addAll(next);
+ visited.addAll(next);
+ });
+
+ while (!queue.isEmpty()) {
+ ProcessorNode node = queue.poll();
+ node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(target -> {
+ if (!visited.contains(target)) {
+ visited.add(target);
+ queue.offer(target);
+ }
+ });
+ }
+
+ return visited;
+ }
+
+ /**
+ * An variation of Kahn's algorithm of topological sorting.
+ * This algorithm also takes account of the simple loops in the graph
+ * Package private for test.
+ * @return topologically sorted ProcessorNode(s)
+ */
+ List<ProcessorNode> topologicalSort() {
+ Collection<ProcessorNode> pnodes = nodes.values();
+ Queue<ProcessorNode> q = new ArrayDeque<>();
+ Map<String, Long> indegree = new HashMap<>();
+ pnodes.forEach(node -> {
+ String nid = node.getId();
+ long degree = node.getInEdges().stream().filter(e -> !sources.contains(e)).count();
+ indegree.put(nid, degree);
+
+ if (degree == 0L) {
+ q.add(node);
+ }
+ });
+
+ List<ProcessorNode> sortedNodes = new ArrayList<>();
+ Set<ProcessorNode> visited = new HashSet<>();
+ while (sortedNodes.size() < pnodes.size()) {
+ while (!q.isEmpty()) {
+ ProcessorNode node = q.poll();
+ sortedNodes.add(node);
+ node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> {
+ String nid = n.getId();
+ Long degree = indegree.get(nid) - 1;
+ indegree.put(nid, degree);
+ if (degree == 0L && !visited.contains(n)) {
+ q.add(n);
+ }
+ visited.add(n);
+ });
+ }
+
+ if (sortedNodes.size() < pnodes.size()) {
+ // The remaining nodes have circles
+ // use the following simple approach to break the circles
+ // start from the node that have been seen
+ visited.removeAll(sortedNodes);
+ //find out the nodes with minimal input edge
+ long min = Long.MAX_VALUE;
+ ProcessorNode minNode = null;
+ for (ProcessorNode node : visited) {
+ Long degree = indegree.get(node.getId());
+ if (degree < min) {
+ min = degree;
+ minNode = node;
+ }
+ }
+ // start from the node with minimal input edge again
+ q.add(minNode);
+ }
+ }
+
+ return sortedNodes;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java
new file mode 100644
index 0000000..0b02377
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.util.ConfigInheritence;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The ProcessorNode represents a Samza processor.
+ * It contains the input/output, and the config to run the processor.
+ */
+public class ProcessorNode {
+ private static final Logger log = LoggerFactory.getLogger(ProcessorNode.class);
+ private static final String CONFIG_PROCESSOR_PREFIX = "processors.%s.";
+
+ private final String id;
+ private final List<StreamEdge> inEdges = new ArrayList<>();
+ private final List<StreamEdge> outEdges = new ArrayList<>();
+ private final Config config;
+
+ ProcessorNode(String id, Config config) {
+ this.id = id;
+ this.config = config;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ void addInEdge(StreamEdge in) {
+ inEdges.add(in);
+ }
+
+ void addOutEdge(StreamEdge out) {
+ outEdges.add(out);
+ }
+
+ List<StreamEdge> getInEdges() {
+ return inEdges;
+ }
+
+ List<StreamEdge> getOutEdges() {
+ return outEdges;
+ }
+
+ public Config generateConfig() {
+ String configPrefix = String.format(CONFIG_PROCESSOR_PREFIX, id);
+ // TODO: Disallow user specifying processor inputs/outputs. This info comes strictly from the pipeline.
+ return Util.rewriteConfig(ConfigInheritence.extractScopedConfig(config, generateProcessorConfig(), configPrefix));
+ }
+
+ private Config generateProcessorConfig() {
+ Map<String, String> configs = new HashMap<>();
+ List<String> inputs = inEdges.stream().map(edge -> edge.getFormattedSystemStream()).collect(Collectors.toList());
+
+ // TODO temp logs for debugging
+ log.info("Processor {} has formatted inputs {}", id, inputs);
+
+ // TODO hack alert: hard coded string literals!
+ configs.put("task.inputs", Joiner.on(',').join(inputs));
+
+ // TODO: DISCUSS how does the processor know it's output names?
+ outEdges.forEach(edge -> {
+ if (!edge.getName().isEmpty()) {
+ configs.put(String.format("task.outputs.%s.stream", edge.getName()), edge.getFormattedSystemStream());
+ }
+ });
+
+ configs.put(JobConfig.JOB_NAME(), id);
+
+ log.info("Processor {} has generated configs {}", id, configs);
+ return new MapConfig(configs);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java
new file mode 100644
index 0000000..879d705
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.Util;
+
+
+/**
+ * A StreamEdge connects the source {@link ProcessorNode}s to the target {@link ProcessorNode}s with a stream.
+ * If it's a sink StreamEdge, the target ProcessorNode is empty.
+ * If it's a source StreamEdge, the source ProcessorNode is empty.
+ */
+public class StreamEdge {
+ private final StreamSpec streamSpec;
+ private final List<ProcessorNode> sourceNodes = new ArrayList<>();
+ private final List<ProcessorNode> targetNodes = new ArrayList<>();
+ private final Config config;
+
+ private String name = "";
+ private int partitions = -1;
+
+ StreamEdge(StreamSpec streamSpec, Config config) {
+ this.streamSpec = streamSpec;
+ this.config = config;
+ this.name = Util.getNameFromSystemStream(getSystemStream());
+ }
+
+ void addSourceNode(ProcessorNode sourceNode) {
+ sourceNodes.add(sourceNode);
+ }
+
+ void addTargetNode(ProcessorNode targetNode) {
+ targetNodes.add(targetNode);
+ }
+
+ StreamSpec getStreamSpec() {
+ return streamSpec;
+ }
+
+ SystemStream getSystemStream() {
+ return new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
+ }
+
+ String getFormattedSystemStream() {
+ return Util.getNameFromSystemStream(getSystemStream());
+ }
+
+ List<ProcessorNode> getSourceNodes() {
+ return sourceNodes;
+ }
+
+ List<ProcessorNode> getTargetNodes() {
+ return targetNodes;
+ }
+
+ int getPartitions() {
+ return partitions;
+ }
+
+ void setPartitions(int partitions) {
+ this.partitions = partitions;
+ }
+
+ String getName() {
+ return name;
+ }
+
+ void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder("StreamEdge ");
+ builder.append(getSystemStream().toString()).append(": (");
+ List<String> sourceIds = sourceNodes.stream().map(node -> node.getId()).collect(Collectors.toList());
+ String sources = Joiner.on(',').join(sourceIds);
+ builder.append(sources).append(") -> (");
+ List<String> targetIds = targetNodes.stream().map(node -> node.getId()).collect(Collectors.toList());
+ String targets = Joiner.on(',').join(targetIds);
+ builder.append(targets).append(")");
+ return builder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
index fafa2cb..1dbc5f4 100644
--- a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
+++ b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
@@ -18,20 +18,47 @@
*/
package org.apache.samza.system;
+import org.apache.samza.SamzaException;
+import org.apache.samza.job.JobRunner;
+import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.StreamGraphBuilder;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.processorgraph.ExecutionPlanner;
+import org.apache.samza.processorgraph.ProcessorGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment
*/
public class RemoteExecutionEnvironment implements ExecutionEnvironment {
+ private static final Logger log = LoggerFactory.getLogger(RemoteExecutionEnvironment.class);
@Override public void run(StreamGraphBuilder app, Config config) {
// TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}
// TODO: actually instantiate the tasks and run the job, i.e.
- // 1. create all input/output/intermediate topics
- // 2. create the single job configuration
- // 3. execute JobRunner to submit the single job for the whole graph
- }
+ try {
+ // 1. build stream graph
+ StreamGraph streamGraph = new StreamGraphImpl();
+ app.init(streamGraph, config);
+
+ // 2. create the physical execution plan
+ ExecutionPlanner planner = new ExecutionPlanner(config);
+ ProcessorGraph processorGraph = planner.plan(streamGraph);
+ // 3. submit jobs for remote execution
+ processorGraph.getProcessors().forEach(processor -> {
+ Config processorConfig = processor.generateConfig();
+ String processorId = processor.getId();
+ log.info("Starting processor {} with config {}", processorId, config);
+
+ JobRunner runner = new JobRunner(processorConfig);
+ runner.run(true);
+ });
+ } catch (Exception e) {
+ throw new SamzaException("fail to run graph", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java b/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java
new file mode 100644
index 0000000..2eba59b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConfigInheritence {
+ private static final Logger log = LoggerFactory.getLogger(ConfigInheritence.class);
+ private static final boolean INHERIT_ROOT_CONFIGS = true;
+
+ public static Config extractScopedConfig(Config fullConfig, Config generatedConfig, String configPrefix) {
+ Config scopedConfig = fullConfig.subset(configPrefix);
+ log.info("Prefix '{}' has extracted config {}", configPrefix, scopedConfig);
+ log.info("Prefix '{}' has generated config {}", configPrefix, generatedConfig);
+
+ Config[] configPrecedence;
+ if (INHERIT_ROOT_CONFIGS) {
+ configPrecedence = new Config[] {fullConfig, generatedConfig, scopedConfig};
+ } else {
+ configPrecedence = new Config[] {generatedConfig, scopedConfig};
+ }
+
+ // Strip empty configs so they don't override the configs before them.
+ Map<String, String> mergedConfig = new HashMap<>();
+ for (Map<String, String> config : configPrecedence) {
+ for (Map.Entry<String, String> property : config.entrySet()) {
+ String value = property.getValue();
+ if (!(value == null || value.isEmpty())) {
+ mergedConfig.put(property.getKey(), property.getValue());
+ }
+ }
+ }
+ scopedConfig = new MapConfig(mergedConfig);
+ log.info("Prefix '{}' has merged config {}", configPrefix, scopedConfig);
+
+ return scopedConfig;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index b64e406..a797ac2 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -78,6 +78,10 @@ object JobConfig {
// Processor Config Constants
val PROCESSOR_ID = "processor.id"
+ val EXECUTION_ENV = "job.execution.env"
+
+ val STREAM_GRAPH_BUILDER = "job.stream.graph.builder"
+
implicit def Config2Job(config: Config) = new JobConfig(config)
/**
@@ -181,4 +185,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
case Some(mode) => mode.toBoolean
case _ => false
}
+
+ def getExecutionEnv = getOrElse(JobConfig.EXECUTION_ENV, "")
+
+ def getStreamGraphBuilder = getOrElse(JobConfig.STREAM_GRAPH_BUILDER, "")
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 022b480..a34cedb 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -20,17 +20,22 @@
package org.apache.samza.job
import org.apache.samza.SamzaException
-import org.apache.samza.config.{ConfigRewriter, Config}
import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
+import org.apache.samza.config.Config
+import org.apache.samza.config.ConfigRewriter
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
+import org.apache.samza.coordinator.stream.messages.Delete
+import org.apache.samza.coordinator.stream.messages.SetConfig
import org.apache.samza.job.ApplicationStatus.Running
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.operators.StreamGraphBuilder
+import org.apache.samza.system.ExecutionEnvironment
import org.apache.samza.util.ClassLoaderHelper
import org.apache.samza.util.CommandLine
import org.apache.samza.util.Logging
import org.apache.samza.util.Util
+
import scala.collection.JavaConversions._
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
object JobRunner extends Logging {
@@ -63,7 +68,21 @@ object JobRunner extends Logging {
val cmdline = new CommandLine
val options = cmdline.parser.parse(args: _*)
val config = cmdline.loadConfig(options)
- new JobRunner(rewriteConfig(config)).run()
+
+ // start execution env if it's defined
+ val envClass: String = config.getExecutionEnv
+ if (!envClass.isEmpty) {
+ val env: ExecutionEnvironment = ClassLoaderHelper.fromClassName(envClass)
+ val streamGraphBuilderClass: String = config.getStreamGraphBuilder
+ if (!streamGraphBuilderClass.isEmpty) {
+ val streamGraphBuilder: StreamGraphBuilder = ClassLoaderHelper.fromClassName(streamGraphBuilderClass)
+ env.run(streamGraphBuilder, config)
+ } else {
+ throw new SamzaException("No stream graph builder defined")
+ }
+ } else {
+ new JobRunner(rewriteConfig(config)).run()
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 9019d02..97bd22a 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -23,6 +23,7 @@ import java.net._
import java.io._
import java.lang.management.ManagementFactory
import java.util.zip.CRC32
+import org.apache.samza.config.ConfigRewriter
import org.apache.samza.{SamzaException, Partition}
import org.apache.samza.system.{SystemFactory, SystemStreamPartition, SystemStream}
import java.util.Random
@@ -395,4 +396,28 @@ object Util extends Logging {
* @return Scala clock function
*/
implicit def asScalaClock(c: HighResolutionClock): () => Long = () => c.nanoTime()
+
+ /**
+ * Re-writes configuration using a ConfigRewriter, if one is defined. If
+ * there is no ConfigRewriter defined for the job, then this method is a
+ * no-op.
+ *
+ * @param config The config to re-write
+ * @return re-written config
+ */
+ def rewriteConfig(config: Config): Config = {
+ def rewrite(c: Config, rewriterName: String): Config = {
+ val klass = config
+ .getConfigRewriterClass(rewriterName)
+ .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
+ val rewriter = Util.getObj[ConfigRewriter](klass)
+ info("Re-writing config with " + rewriter)
+ rewriter.rewrite(rewriterName, c)
+ }
+
+ config.getConfigRewriters match {
+ case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _))
+ case _ => config
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ea37b746/samza-core/src/test/java/org/apache/samza/processorgraph/TestProcessorGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/processorgraph/TestProcessorGraph.java b/samza-core/src/test/java/org/apache/samza/processorgraph/TestProcessorGraph.java
new file mode 100644
index 0000000..7aa9f41
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/processorgraph/TestProcessorGraph.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.processorgraph;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class TestProcessorGraph {
+
+ ProcessorGraph graph1;
+ ProcessorGraph graph2;
+ int streamSeq = 0;
+
+ private StreamSpec genStream() {
+ ++streamSeq;
+
+ return new StreamSpec(String.valueOf(streamSeq), "test-stream", "test-system");
+ }
+
+ @Before
+ public void setup() {
+ /**
+ * graph1 is the example graph from wikipedia
+ *
+ * 5 7 3
+ * | / | / |
+ * v v |
+ * 11 8 |
+ * | \X /
+ * v v \v
+ * 2 9 10
+ */
+ // init graph1
+ graph1 = new ProcessorGraph(null);
+ graph1.addSource(genStream(), "5");
+ graph1.addSource(genStream(), "7");
+ graph1.addSource(genStream(), "3");
+ graph1.addEdge(genStream(), "5", "11");
+ graph1.addEdge(genStream(), "7", "11");
+ graph1.addEdge(genStream(), "7", "8");
+ graph1.addEdge(genStream(), "3", "8");
+ graph1.addEdge(genStream(), "11", "2");
+ graph1.addEdge(genStream(), "11", "9");
+ graph1.addEdge(genStream(), "8", "9");
+ graph1.addEdge(genStream(), "11", "10");
+ graph1.addSink(genStream(), "2");
+ graph1.addSink(genStream(), "9");
+ graph1.addSink(genStream(), "10");
+
+ /**
+ * graph2 is a graph with a loop
+ * 1 -> 2 -> 3 -> 4 -> 5 -> 7
+ * |<---6 <--| <>
+ */
+ graph2 = new ProcessorGraph(null);
+ graph2.addSource(genStream(), "1");
+ graph2.addEdge(genStream(), "1", "2");
+ graph2.addEdge(genStream(), "2", "3");
+ graph2.addEdge(genStream(), "3", "4");
+ graph2.addEdge(genStream(), "4", "5");
+ graph2.addEdge(genStream(), "4", "6");
+ graph2.addEdge(genStream(), "6", "2");
+ graph2.addEdge(genStream(), "5", "5");
+ graph2.addEdge(genStream(), "5", "7");
+ graph2.addSink(genStream(), "7");
+ }
+
+ @Test
+ public void testAddSource() {
+ ProcessorGraph graph = new ProcessorGraph(null);
+
+ /**
+ * s1 -> 1
+ * s2 ->|
+ *
+ * s3 -> 2
+ * |-> 3
+ */
+ StreamSpec s1 = genStream();
+ StreamSpec s2 = genStream();
+ StreamSpec s3 = genStream();
+ graph.addSource(s1, "1");
+ graph.addSource(s2, "1");
+ graph.addSource(s3, "2");
+ graph.addSource(s3, "3");
+
+ assertTrue(graph.getSources().size() == 3);
+
+ assertTrue(graph.getNode("1").getInEdges().size() == 2);
+ assertTrue(graph.getNode("2").getInEdges().size() == 1);
+ assertTrue(graph.getNode("3").getInEdges().size() == 1);
+
+ assertTrue(graph.getEdge(s1).getSourceNodes().size() == 0);
+ assertTrue(graph.getEdge(s1).getTargetNodes().size() == 1);
+ assertTrue(graph.getEdge(s2).getSourceNodes().size() == 0);
+ assertTrue(graph.getEdge(s2).getTargetNodes().size() == 1);
+ assertTrue(graph.getEdge(s3).getSourceNodes().size() == 0);
+ assertTrue(graph.getEdge(s3).getTargetNodes().size() == 2);
+ }
+
+ @Test
+ public void testAddSink() {
+ /**
+ * 1 -> s1
+ * 2 -> s2
+ * 2 -> s3
+ */
+ StreamSpec s1 = genStream();
+ StreamSpec s2 = genStream();
+ StreamSpec s3 = genStream();
+ ProcessorGraph graph = new ProcessorGraph(null);
+ graph.addSink(s1, "1");
+ graph.addSink(s2, "2");
+ graph.addSink(s3, "2");
+
+ assertTrue(graph.getSinks().size() == 3);
+ assertTrue(graph.getNode("1").getOutEdges().size() == 1);
+ assertTrue(graph.getNode("2").getOutEdges().size() == 2);
+
+ assertTrue(graph.getEdge(s1).getSourceNodes().size() == 1);
+ assertTrue(graph.getEdge(s1).getTargetNodes().size() == 0);
+ assertTrue(graph.getEdge(s2).getSourceNodes().size() == 1);
+ assertTrue(graph.getEdge(s2).getTargetNodes().size() == 0);
+ assertTrue(graph.getEdge(s3).getSourceNodes().size() == 1);
+ assertTrue(graph.getEdge(s3).getTargetNodes().size() == 0);
+ }
+
+ @Test
+ public void testReachable() {
+ Set<ProcessorNode> reachable1 = graph1.findReachable();
+ assertTrue(reachable1.size() == 8);
+
+ Set<ProcessorNode> reachable2 = graph2.findReachable();
+ assertTrue(reachable2.size() == 7);
+ }
+
+ @Test
+ public void testTopologicalSort() {
+
+ // test graph1
+ List<ProcessorNode> sortedNodes1 = graph1.topologicalSort();
+ Map<String, Integer> idxMap1 = new HashMap<>();
+ for (int i = 0; i < sortedNodes1.size(); i++) {
+ idxMap1.put(sortedNodes1.get(i).getId(), i);
+ }
+
+ assertTrue(idxMap1.size() == 8);
+ assertTrue(idxMap1.get("11") > idxMap1.get("5"));
+ assertTrue(idxMap1.get("11") > idxMap1.get("7"));
+ assertTrue(idxMap1.get("8") > idxMap1.get("7"));
+ assertTrue(idxMap1.get("8") > idxMap1.get("3"));
+ assertTrue(idxMap1.get("2") > idxMap1.get("11"));
+ assertTrue(idxMap1.get("9") > idxMap1.get("8"));
+ assertTrue(idxMap1.get("9") > idxMap1.get("11"));
+ assertTrue(idxMap1.get("10") > idxMap1.get("11"));
+ assertTrue(idxMap1.get("10") > idxMap1.get("3"));
+
+ // test graph2
+ List<ProcessorNode> sortedNodes2 = graph2.topologicalSort();
+ Map<String, Integer> idxMap2 = new HashMap<>();
+ for (int i = 0; i < sortedNodes2.size(); i++) {
+ idxMap2.put(sortedNodes2.get(i).getId(), i);
+ }
+
+ assertTrue(idxMap2.size() == 7);
+ assertTrue(idxMap2.get("2") > idxMap2.get("1"));
+ assertTrue(idxMap2.get("3") > idxMap2.get("1"));
+ assertTrue(idxMap2.get("4") > idxMap2.get("1"));
+ assertTrue(idxMap2.get("6") > idxMap2.get("1"));
+ assertTrue(idxMap2.get("5") > idxMap2.get("4"));
+ assertTrue(idxMap2.get("7") > idxMap2.get("5"));
+ }
+}