You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/04 17:17:17 UTC
incubator-quarks git commit: [QUARKS-50] duplicate tags across Peek
and FanOut
Repository: incubator-quarks
Updated Branches:
refs/heads/master 6ab46289f -> 9589cc023
[QUARKS-50] duplicate tags across Peek and FanOut
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/9589cc02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/9589cc02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/9589cc02
Branch: refs/heads/master
Commit: 9589cc0238df26bb02a648a2e55617c5c95010d9
Parents: 6ab4628
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Wed May 4 12:26:41 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Wed May 4 12:26:41 2016 -0400
----------------------------------------------------------------------
.../development/DevelopmentProvider.java | 48 ++++++++++++++++++++
1 file changed, 48 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/9589cc02/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
----------------------------------------------------------------------
diff --git a/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java b/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
index 2f0af05..137b936 100644
--- a/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
+++ b/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
@@ -19,6 +19,7 @@ under the License.
package quarks.providers.development;
import java.util.Hashtable;
+import java.util.Set;
import java.util.concurrent.Future;
import com.codahale.metrics.MetricRegistry;
@@ -27,9 +28,13 @@ import com.google.gson.JsonObject;
import quarks.console.server.HttpServer;
import quarks.execution.Job;
import quarks.execution.services.ControlService;
+import quarks.graph.Connector;
+import quarks.graph.Edge;
import quarks.metrics.Metrics;
import quarks.metrics.MetricsSetup;
import quarks.metrics.oplets.CounterOp;
+import quarks.oplet.core.FanOut;
+import quarks.oplet.core.Peek;
import quarks.providers.direct.DirectProvider;
import quarks.runtime.jmxcontrol.JMXControlService;
import quarks.topology.Topology;
@@ -82,6 +87,49 @@ public class DevelopmentProvider extends DirectProvider {
@Override
public Future<Job> submit(Topology topology, JsonObject config) {
Metrics.counter(topology);
+ duplicateTags(topology);
return super.submit(topology, config);
}
+
+ /**
+ * Duplicate stream tags across oplets as appropriate.
+ * <P>
+ * While this action is semantically appropriate on its own,
+ * the motivation for it was graph presentation in the Console.
+ * Specifically, without tag promotion, metric oplet injections cause
+ * stream/connection tag coloring discontinuities even though the
+ * metricOp output stream is semantically identical to the input stream.
+ * i.e.,
+ * Cases where duplication is required:
+ * <ul>
+ * <li>tags on Peek oplet input streams to output streams</li>
+ * <li>tags on FanOut oplet input streams to output streams
+ * (fortunately, Split is not a FanOut)</li>
+ * </ul>
+ * </P>
+ *
+ * @param topology the topology
+ */
+ private void duplicateTags(Topology topology) {
+ // This one pass implementation is dependent on Edges being
+ // topologically sorted - ancestor Edges appear before their descendants.
+ for (Edge e : topology.graph().getEdges()) {
+ Object o = e.getTarget().getInstance();
+ if (o instanceof Peek || o instanceof FanOut) {
+ duplicateTags(e);
+ }
+ }
+ }
+
+ /**
+ * Duplicate the tags on Edge {@code e} to the Edge's target's connectors.
+ * @param e the Edge
+ */
+ private void duplicateTags(Edge e) {
+ Set<String> tags = e.getTags();
+ String[] ta = tags.toArray(new String[tags.size()]);
+ for (Connector<?> c : e.getTarget().getConnectors()) {
+ c.tag(ta);
+ }
+ }
}