You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/04 17:17:17 UTC

incubator-quarks git commit: [QUARKS-50] duplicate tags across Peek and FanOut

Repository: incubator-quarks
Updated Branches:
  refs/heads/master 6ab46289f -> 9589cc023


[QUARKS-50] duplicate tags across Peek and FanOut

Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/9589cc02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/9589cc02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/9589cc02

Branch: refs/heads/master
Commit: 9589cc0238df26bb02a648a2e55617c5c95010d9
Parents: 6ab4628
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Wed May 4 12:26:41 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Wed May 4 12:26:41 2016 -0400

----------------------------------------------------------------------
 .../development/DevelopmentProvider.java        | 48 ++++++++++++++++++++
 1 file changed, 48 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/9589cc02/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
----------------------------------------------------------------------
diff --git a/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java b/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
index 2f0af05..137b936 100644
--- a/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
+++ b/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
@@ -19,6 +19,7 @@ under the License.
 package quarks.providers.development;
 
 import java.util.Hashtable;
+import java.util.Set;
 import java.util.concurrent.Future;
 
 import com.codahale.metrics.MetricRegistry;
@@ -27,9 +28,13 @@ import com.google.gson.JsonObject;
 import quarks.console.server.HttpServer;
 import quarks.execution.Job;
 import quarks.execution.services.ControlService;
+import quarks.graph.Connector;
+import quarks.graph.Edge;
 import quarks.metrics.Metrics;
 import quarks.metrics.MetricsSetup;
 import quarks.metrics.oplets.CounterOp;
+import quarks.oplet.core.FanOut;
+import quarks.oplet.core.Peek;
 import quarks.providers.direct.DirectProvider;
 import quarks.runtime.jmxcontrol.JMXControlService;
 import quarks.topology.Topology;
@@ -82,6 +87,49 @@ public class DevelopmentProvider extends DirectProvider {
     @Override
     public Future<Job> submit(Topology topology, JsonObject config) {
         Metrics.counter(topology);
+        duplicateTags(topology);
         return super.submit(topology, config);
     }
+    
+    /**
+     * Duplicate stream tags across oplets as appropriate.
+     * <P>
+     * While this action is semantically appropriate on its own,
+     * the motivation for it was graph presentation in the Console.
+     * Specifically, without tag promotion, metric oplet injections cause
+     * stream/connection tag coloring discontinuities even though the
+     * metricOp output stream is semantically identical to the input stream.
+     * i.e., 
+     * Cases where duplication is required:
+     * <ul>
+     * <li>tags on Peek oplet input streams to output streams</li>
+     * <li>tags on FanOut oplet input streams to output streams
+     *     (fortunately, Split is not a FanOut)</li>
+     * </ul>
+     * </P>
+     * 
+     * @param topology the topology
+     */
+    private void duplicateTags(Topology topology) {
+      // This one pass implementation is dependent on Edges being
+      // topologically sorted - ancestor Edges appear before their descendants.
+      for (Edge e : topology.graph().getEdges()) {
+        Object o = e.getTarget().getInstance();
+        if (o instanceof Peek || o instanceof FanOut) {
+          duplicateTags(e);
+        }
+      }
+    }
+    
+    /**
+     * Duplicate the tags on Edge {@code e} to the Edge's target's connectors.
+     * @param e the Edge
+     */
+    private void duplicateTags(Edge e) {
+      Set<String> tags = e.getTags();
+      String[] ta = tags.toArray(new String[tags.size()]);
+      for (Connector<?> c : e.getTarget().getConnectors()) {
+        c.tag(ta);
+      }
+    }
 }