You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/26 12:33:08 UTC

[1/9] incubator-quarks git commit: Add MXBean controls and helper for use by Console

Repository: incubator-quarks
Updated Branches:
  refs/heads/master 10075c632 -> 9f97f31c7


Add MXBean controls and helper for use by Console


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/fbcb22f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/fbcb22f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/fbcb22f7

Branch: refs/heads/master
Commit: fbcb22f72f157be03660f773f3e54834a1b4f0b5
Parents: 8b68c6d
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Wed May 18 11:13:45 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Wed May 18 16:03:04 2016 -0400

----------------------------------------------------------------------
 .../java/quarks/oplet/core/PeriodicSource.java  |   4 +
 .../main/java/quarks/oplet/functional/Peek.java |   4 +
 console/.classpath                              |   1 +
 console/servlets/build.xml                      |   1 +
 .../console/servlets/StreamScopeUtil.java       |  81 ++++++++++
 .../development/DevelopmentProvider.java        |  38 +++--
 .../providers/dev/DevelopmentProviderTest.java  |  55 ++++++-
 .../streamscope/DevelopmentStreamScopeTest.java |  87 +++++++++-
 .../java/quarks/streamscope/StreamScope.java    |  33 ++--
 .../quarks/streamscope/StreamScopeBean.java     |  93 +++++++++++
 .../quarks/streamscope/StreamScopeRegistry.java |  79 +++++++--
 .../streamscope/StreamScopeRegistryBean.java    |  93 +++++++++++
 .../quarks/streamscope/StreamScopeSetup.java    | 160 +++++++++++++++++++
 .../streamscope/mbeans/StreamScopeMXBean.java   | 152 ++++++++++++++++++
 .../mbeans/StreamScopeRegistryMXBean.java       |  49 ++++++
 .../quarks/streamscope/oplets/StreamScope.java  |  32 +++-
 .../test/streamscope/StreamScopeTest.java       |  57 ++++---
 17 files changed, 943 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fbcb22f7/api/oplet/src/main/java/quarks/oplet/core/PeriodicSource.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/quarks/oplet/core/PeriodicSource.java b/api/oplet/src/main/java/quarks/oplet/core/PeriodicSource.java
index 582314a..0ecac6a 100644
--- a/api/oplet/src/main/java/quarks/oplet/core/PeriodicSource.java
+++ b/api/oplet/src/main/java/quarks/oplet/core/PeriodicSource.java
@@ -49,6 +49,10 @@ public abstract class PeriodicSource<T> extends Source<T> implements Runnable, P
     @Override
     public synchronized void start() {
         ControlService cs = getOpletContext().getService(ControlService.class);
+        // TODO BUG HERE: the control alias needs to be unique across the
+        // entire provider instance (multiple topologies) because the ControlService
+        // is provider-wide, not topology specific.
+        // Scope it with just the jobId.  What's going to unregister this control?
         if (cs != null)
             cs.registerControl(TSTREAM_TYPE, getOpletContext().uniquify(getClass().getSimpleName()), 
                     getAlias(), PeriodMXBean.class, this);

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fbcb22f7/api/oplet/src/main/java/quarks/oplet/functional/Peek.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/quarks/oplet/functional/Peek.java b/api/oplet/src/main/java/quarks/oplet/functional/Peek.java
index ef2a811..2e36538 100644
--- a/api/oplet/src/main/java/quarks/oplet/functional/Peek.java
+++ b/api/oplet/src/main/java/quarks/oplet/functional/Peek.java
@@ -41,6 +41,10 @@ public class Peek<T> extends quarks.oplet.core.Peek<T> {
     public Peek(Consumer<T> peeker) {
         this.peeker = peeker;
     }
+    
+    protected Consumer<T> getPeeker() {
+      return peeker;
+    }
 
     @Override
     protected void peek(T tuple) {

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fbcb22f7/console/.classpath
----------------------------------------------------------------------
diff --git a/console/.classpath b/console/.classpath
index 43c3ac0..0053b83 100644
--- a/console/.classpath
+++ b/console/.classpath
@@ -15,5 +15,6 @@
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
 	<classpathentry combineaccessrules="false" kind="src" path="/ext"/>
+	<classpathentry combineaccessrules="false" kind="src" path="/utils"/>
 	<classpathentry kind="output" path="bin"/>
 </classpath>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fbcb22f7/console/servlets/build.xml
----------------------------------------------------------------------
diff --git a/console/servlets/build.xml b/console/servlets/build.xml
index de3a71c..9fa09bb 100644
--- a/console/servlets/build.xml
+++ b/console/servlets/build.xml
@@ -36,6 +36,7 @@
   <path id="compile.classpath">
   	<pathelement location="${quarks.console}/server/ext/jetty-9.3.6/servlet-api-3.1.jar" />
 	<pathelement location="${ext}/slf4j-1.7.12/slf4j-api-1.7.12.jar"/>
+    <pathelement location="${quarks.utils}/streamscope/lib/quarks.utils.streamscope.jar"/>
     <path refid="quarks.ext.classpath" />
   </path>
 

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fbcb22f7/console/servlets/src/main/java/quarks/console/servlets/StreamScopeUtil.java
----------------------------------------------------------------------
diff --git a/console/servlets/src/main/java/quarks/console/servlets/StreamScopeUtil.java b/console/servlets/src/main/java/quarks/console/servlets/StreamScopeUtil.java
new file mode 100644
index 0000000..0f25345
--- /dev/null
+++ b/console/servlets/src/main/java/quarks/console/servlets/StreamScopeUtil.java
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.console.servlets;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.JMX;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import quarks.streamscope.mbeans.StreamScopeMXBean;
+import quarks.streamscope.mbeans.StreamScopeRegistryMXBean;
+
+public class StreamScopeUtil {
+  
+  /**
+   * Get the StreamScope for the specified stream.
+   * <P>
+   * N.B. until certain runtime issues are worked out, the stream that a
+   * StreamScopeMXBean is registered for is NOT the "origin stream" (opletId/oport)
+   * that the StreamScope was created for.  Rather the registration is for
+   * the actual StreamScope oplet's opletId and oport 0, so that's what must be
+   * supplied as the parameters.
+   * <BR>
+   * See the commentary in StreamScope oplet code.
+   * <BR>
+   * Once that is addressed, opletId/oport of the "origin stream" will
+   * need to be supplied as parameters.
+   * </P>
+   * 
+   * @param jobId the job id (e.g., "JOB_0")
+   * @param opletId the oplet id (e.g., "OP_2")
+   * @param oport the oplet output port index (0-based)
+   * 
+   * @return null if no StreamScope registered for that stream.
+   */
+  public static StreamScopeMXBean getStreamScope(String jobId, String opletId, int oport) {
+    return getRgy().lookup(jobId, opletId, oport);
+  }
+
+  private static StreamScopeRegistryMXBean getRgy() {
+    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+    ObjectName objName = mkObjectName(StreamScopeRegistryMXBean.class, StreamScopeRegistryMXBean.TYPE);
+    return JMX.newMXBeanProxy(mbs, objName, StreamScopeRegistryMXBean.class);
+  }
+  
+  private static ObjectName mkObjectName(Class<?> klass, String beanType) {
+    StringBuffer sbuf = new StringBuffer();
+    try {
+      sbuf.append("*:interface=");
+      sbuf.append(ObjectName.quote(klass.getCanonicalName()));
+      sbuf.append(",type=");
+      sbuf.append(ObjectName.quote(beanType));
+      return new ObjectName(sbuf.toString());
+    }
+    catch (MalformedObjectNameException e) {
+      
+      // TODO logger.error("Unable to create ObjectName for "+sbuf, e);
+      
+      throw new RuntimeException("Unable to create ObjectName for "+sbuf, e);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fbcb22f7/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
----------------------------------------------------------------------
diff --git a/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java b/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
index a6a4ae7..799a5db 100644
--- a/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
+++ b/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
@@ -40,6 +40,7 @@ import quarks.providers.direct.DirectProvider;
 import quarks.runtime.jmxcontrol.JMXControlService;
 import quarks.streamscope.StreamScope;
 import quarks.streamscope.StreamScopeRegistry;
+import quarks.streamscope.StreamScopeSetup;
 import quarks.topology.Topology;
 
 /**
@@ -82,8 +83,7 @@ public class DevelopmentProvider extends DirectProvider {
         getServices().addService(ControlService.class,
                 new JMXControlService(JMX_DOMAIN, new Hashtable<>()));
         
-        getServices().addService(StreamScopeRegistry.class,
-                new StreamScopeRegistry());
+        StreamScopeSetup.register(getServices());
 
         HttpServer server = HttpServer.getInstance();
         getServices().addService(HttpServer.class, server);   
@@ -150,23 +150,37 @@ public class DevelopmentProvider extends DirectProvider {
             .getService(StreamScopeRegistry.class);
       if (rgy == null)
         return;
+      
+      // N.B. at runtime, the Console will need to be able to lookup
+      // StreamScopeMXBean be streamId (see StreamScopeRegistryMXBean).
+      // Nominally, that streamId should be the jobId/opletId/oport of
+      // the stream that the StreamScope was created for - what I'll
+      // call the "origin stream". i.e., the Console shouldn't be 
+      // looking up a streamId for the StreamScopeOplet's opletId.
+      //
+      // Registration is left to the StreamScope oplet initialization processing
+      // since the jobId isn't known until that time.
+      // 
+      // As noted above we really want the StreamScope's streamId registration
+      // to be for the "origin stream".  Today the API (Graph,Vertex,Oplet)
+      // doesn't provide that information so we can't capture that as part
+      // of the StreamScope oplet's info.  The net is that for the time being
+      // a StreamScope will end up having to register itself with its opletId,
+      // not the origin oplet's id and that has implications for the Console. 
+      //
+      // We could create a peekAllFn that takes a BiFunction that receives
+      // the Vertex+oport the StreamScope is being created for but that
+      // Vertex/Oplet's opletId still isn't accessible today.
+      // (The Etaio provider maintains the opletId in its Instance object fwiw).
+      //
+      // TODO straighten this all out
 
       t.graph().peekAll( 
           () -> {
               StreamScope<?> streamScope = new StreamScope<>();
               Peek<?> peekOp = new quarks.streamscope.oplets.StreamScope<>(streamScope);
-              registerStreamScope(rgy, peekOp, streamScope);
               return peekOp;
             },
           (Vertex<?, ?, ?> v) -> !(v.getInstance() instanceof quarks.oplet.core.FanOut));
     }
-    
-    private int hackCount = 0;  // TODO temp hack to enable some test development
-    private void registerStreamScope(StreamScopeRegistry rgy, Peek<?> peekOp, StreamScope<?> streamScope) {
-      hackCount++;
-      String id = "oplet-"+ hackCount;  // TODO get from peekOp's source oport  <opletId>.oport.<n>
-      String alias = "streamAlias-"+ hackCount;  //  TODO get from peekOp's source oport context
-      rgy.register(StreamScopeRegistry.nameByStreamAlias(alias), streamScope);
-      rgy.register(StreamScopeRegistry.nameByStreamId(id), streamScope);
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fbcb22f7/providers/development/src/test/java/quarks/test/providers/dev/DevelopmentProviderTest.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/quarks/test/providers/dev/DevelopmentProviderTest.java b/providers/development/src/test/java/quarks/test/providers/dev/DevelopmentProviderTest.java
index 9d5ed8f..23b4682 100644
--- a/providers/development/src/test/java/quarks/test/providers/dev/DevelopmentProviderTest.java
+++ b/providers/development/src/test/java/quarks/test/providers/dev/DevelopmentProviderTest.java
@@ -21,18 +21,20 @@ package quarks.test.providers.dev;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Collection;
+
 import org.junit.Test;
+
 import quarks.graph.Graph;
 import quarks.graph.Vertex;
 import quarks.metrics.oplets.CounterOp;
 import quarks.oplet.Oplet;
+import quarks.streamscope.oplets.StreamScope;
 import quarks.test.topology.TopologyAbstractTest;
 import quarks.topology.TStream;
 import quarks.topology.Topology;
 import quarks.topology.tester.Condition;
 
-import java.util.Collection;
-
 public class DevelopmentProviderTest extends TopologyAbstractTest implements DevelopmentTestSetup {
 
     // DevelopmentProvider inserts CounterOp metric oplets into the graph
@@ -53,18 +55,55 @@ public class DevelopmentProviderTest extends TopologyAbstractTest implements Dev
 
         complete(t, tc);
   
-        // Three vertices after submission
+        // At least three vertices after submission
+        // (provide may have added other oplets as well)
         Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> verticesAfterSubmit = g.getVertices();
-        assertEquals(3, verticesAfterSubmit.size());
+        assertTrue("size="+verticesAfterSubmit.size(), verticesAfterSubmit.size() >= 3);
         
-        // The new vertex is for a metric oplet
-        boolean found = false;
+        // There is exactly one vertex for a metric oplet
+        int numOplets = 0;
         for (Vertex<? extends Oplet<?, ?>, ?, ?> v : verticesAfterSubmit) {
             Oplet<?,?> oplet = v.getInstance();
             if (oplet instanceof CounterOp) {
-                found = true;
+                numOplets++;
+            }
+        }
+        assertEquals(1, numOplets);
+    }
+
+    // DevelopmentProvider inserts StreamScope oplets into the graph
+    @Test
+    public void testStreaScopesEverywhere() throws Exception {
+
+        Topology t = newTopology();
+        TStream<String> s = t.strings("a", "b", "c");
+        s = s.map(tuple -> tuple)
+            .filter(tuple -> true);
+
+        // Condition inserts a sink
+        Condition<Long> tc = t.getTester().tupleCount(s, 3);
+
+        Graph g = t.graph();
+        Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices();
+        
+        // Four vertices before submission
+        assertEquals(4, vertices.size());
+
+        complete(t, tc);
+  
+        // At least 4+3 vertices after submission
+        // (provide may have added other oplets as well)
+        Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> verticesAfterSubmit = g.getVertices();
+        assertTrue("size="+verticesAfterSubmit.size(), verticesAfterSubmit.size() >= 7);
+        
+        // There are exactly 3 vertex for a StreamScope oplet
+        int numOplets = 0;
+        for (Vertex<? extends Oplet<?, ?>, ?, ?> v : verticesAfterSubmit) {
+            Oplet<?,?> oplet = v.getInstance();
+            if (oplet instanceof StreamScope) {
+                numOplets++;
             }
         }
-        assertTrue(found);
+        assertEquals(3, numOplets);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fbcb22f7/providers/development/src/test/java/quarks/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/quarks/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java b/providers/development/src/test/java/quarks/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java
index 50c45e4..7182167 100644
--- a/providers/development/src/test/java/quarks/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java
+++ b/providers/development/src/test/java/quarks/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java
@@ -18,12 +18,20 @@ under the License.
 */
 package quarks.test.providers.dev.streamscope;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
 
 import org.junit.Test;
 
+import com.google.gson.Gson;
+
+import quarks.execution.services.ControlService;
+import quarks.streamscope.StreamScope;
+import quarks.streamscope.StreamScope.Sample;
 import quarks.streamscope.StreamScopeRegistry;
+import quarks.streamscope.mbeans.StreamScopeMXBean;
+import quarks.streamscope.mbeans.StreamScopeRegistryMXBean;
 import quarks.test.providers.dev.DevelopmentTestSetup;
 import quarks.test.streamscope.StreamScopeTest;
 import quarks.topology.Topology;
@@ -42,10 +50,83 @@ public class DevelopmentStreamScopeTest extends StreamScopeTest implements Devel
         .getService(StreamScopeRegistry.class);
     assertNotNull(rgy2);
     
-    // Yikes... the registry / service is provider-wide, not topo/app/job-specific
-    // assertNotSame(rgy1, rgy2);
-    
     assertSame(rgy1, rgy2);
   }
   
+  @Test
+  public void testRegistryControlRegistered() throws Exception {
+    Topology t1 = newTopology();
+    ControlService cs1 = t1.getRuntimeServiceSupplier().get()
+        .getService(ControlService.class);
+    StreamScopeRegistryMXBean rgy1 = cs1.getControl(StreamScopeRegistryMXBean.TYPE,
+        StreamScopeRegistryMXBean.TYPE, StreamScopeRegistryMXBean.class);
+    assertNotNull(rgy1);
+    
+    Topology t2 = newTopology();
+    ControlService cs2 = t2.getRuntimeServiceSupplier().get()
+        .getService(ControlService.class);
+    StreamScopeRegistryMXBean rgy2 = cs2.getControl(StreamScopeRegistryMXBean.TYPE,
+        StreamScopeRegistryMXBean.TYPE, StreamScopeRegistryMXBean.class);
+    assertNotNull(rgy2);
+    
+    // The rgy1, rgy1 mbean instances may or may not be the same object
+    // depending on the ControlService implementation.  For JMXControlService,
+    // each getControl() yields a different MXBeanProxy instance but they are
+    // for the underlying bean (same objectname).
+    //assertSame(rgy1, rgy2);
+  }
+  
+  @Test
+  public void testStreamScopeBeans() throws Exception {
+    testStreamScopeBeans("JOB_1000");
+  }
+  
+  private void testStreamScopeBeans(String jobId) throws Exception {
+    // Development provider should have controls registered.
+    
+    // Get the Rgy and RgyBean
+    Topology t1 = newTopology();
+    StreamScopeRegistry rgy = t1.getRuntimeServiceSupplier().get()
+        .getService(StreamScopeRegistry.class);
+    assertNotNull(rgy);
+    ControlService cs = t1.getRuntimeServiceSupplier().get()
+                          .getService(ControlService.class);
+    StreamScopeRegistryMXBean rgyBean = 
+        cs.getControl(StreamScopeRegistryMXBean.TYPE,
+            StreamScopeRegistryMXBean.TYPE, StreamScopeRegistryMXBean.class);
+    assertNotNull(rgyBean);
+    
+    // Add a StreamScope and verify it can be located via the controls
+    StreamScope<Integer> ss1 = new StreamScope<Integer>();
+    String streamId = StreamScopeRegistry.mkStreamId(jobId, "OP_1", 2);
+    rgy.register(StreamScopeRegistry.nameForStreamId(streamId), ss1);
+    
+    StreamScopeMXBean ss1Bean = rgyBean.lookup(jobId, "OP_1", 2);
+    assertNotNull(ss1Bean);
+    
+    ss1.setEnabled(true);
+    ss1.accept(100);
+    ss1.accept(101);
+    ss1.accept(102);
+    // access via the bean
+    assertEquals(3, ss1Bean.getSampleCount());
+    String json = ss1Bean.getSamples();
+    assertNotNull(json);
+    
+    Gson gson = new Gson();
+    Sample<?>[] sa = gson.fromJson(json, Sample[].class);
+    for (int i = 0; i < 3; i++) {
+      Sample<?> s = sa[i];
+      Object t = s.tuple(); // fyi, w/o type info fromJson() yields a Double for the numeric
+      assertEquals(t, i+100.0);
+    }
+  }
+  
+  @Test
+  public void testStreamScopeBeans2() throws Exception {
+    // verify successive providers and rgyBean control hackery works
+    testStreamScopeBeans("JOB_1001");
+  }
+
+  // Ideally would test that beans are available via JMX and/or servlet.StreamScopeUtil stuff works
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fbcb22f7/utils/streamscope/src/main/java/quarks/streamscope/StreamScope.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/main/java/quarks/streamscope/StreamScope.java b/utils/streamscope/src/main/java/quarks/streamscope/StreamScope.java
index 17a26ce..5b8d7a6 100644
--- a/utils/streamscope/src/main/java/quarks/streamscope/StreamScope.java
+++ b/utils/streamscope/src/main/java/quarks/streamscope/StreamScope.java
@@ -55,7 +55,7 @@ public class StreamScope<T> implements Consumer<T> {
    * A captured tuple.
    * <P>
    * The Sample captures the tuple, and the system time and nanoTime
-   * that the tuple was received.
+   * when the tuple was captured.
    * </P>
    *
    * @param <T> Tuple type.
@@ -211,7 +211,7 @@ public class StreamScope<T> implements Consumer<T> {
    * Use {@link TriggerManager#setPaused(boolean)} setPaused} to re-enable capture
    * following a triggered pause.
    * </P><P>
-   * The default configuration is {@code continuous} and not paused.
+   * The default configuration is continuous (by-count==1) and not paused.
    * </P>
    */
   public static class TriggerManager<T> {
@@ -260,18 +260,14 @@ public class StreamScope<T> implements Consumer<T> {
     }
     
     /**
-     * Capture every tuple.
-     */
-    public void setContinuous() {
-      setByPredicate(Functions.alwaysTrue());
-    }
-    
-    /**
      * Capture the first and every nth tuple
      * @param count
      */
-    public void setByCount(int count) {
-      setByPredicate(newByCountPredicate(count));
+    public void setCaptureByCount(int count) {
+      if (count == 1)
+        setCaptureByPredicate(Functions.alwaysTrue());
+      else
+        setCaptureByPredicate(newByCountPredicate(count));
     }
     
     /**
@@ -281,15 +277,15 @@ public class StreamScope<T> implements Consumer<T> {
      * @param elapsed time to delay until next capture
      * @param unit {@link TimeUnit}
      */
-    public void setByTime(long elapsed, TimeUnit unit) {
-      setByPredicate(newByTimePredicate(elapsed, unit));
+    public void setCaptureByTime(long elapsed, TimeUnit unit) {
+      setCaptureByPredicate(newByTimePredicate(elapsed, unit));
     }
     
     /**
      * Capture a tuple if the {@code predicate} test of the tuple returns true.
      * @param predicate
      */
-    public void setByPredicate(Predicate<T> predicate) {
+    public void setCaptureByPredicate(Predicate<T> predicate) {
       Objects.requireNonNull(predicate, "predicate");
       this.predicate = predicate;
     }
@@ -328,6 +324,11 @@ public class StreamScope<T> implements Consumer<T> {
         }
       };
     }
+    
+    @Override
+    public String toString() {
+      return "paused="+paused+" pauseOnPredicate="+pauseOnPredicate+" predicate="+predicate;
+    }
 
   }
   
@@ -409,7 +410,9 @@ public class StreamScope<T> implements Consumer<T> {
 
   @Override
   public String toString() {
-    return "isEnabled="+isEnabled+" bufferMgr="+bufferMgr();
+    return "isEnabled="+isEnabled
+        +" bufferMgr={"+bufferMgr()+"}"
+        +" triggerMgr={"+triggerMgr()+"}";
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fbcb22f7/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeBean.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeBean.java b/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeBean.java
new file mode 100644
index 0000000..22af6a2
--- /dev/null
+++ b/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeBean.java
@@ -0,0 +1,93 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.streamscope;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.gson.Gson;
+
+import quarks.streamscope.mbeans.StreamScopeMXBean;
+
+/**
+ * Implementation of {@link StreamScopeMXBean}.
+ * 
+ * @see StreamScopeRegistryBean
+ */
+public class StreamScopeBean implements StreamScopeMXBean {
+  private final StreamScope<?> streamScope;
+  
+  public StreamScopeBean(StreamScope<?> streamScope) {
+    this.streamScope = streamScope;
+  }
+
+  @Override
+  public boolean isEnabled() {
+    return streamScope.isEnabled();
+  }
+
+  @Override
+  public void setEnabled(boolean isEnabled) {
+    streamScope.setEnabled(isEnabled);
+  }
+
+  @Override
+  public boolean isPaused() {
+    return streamScope.triggerMgr().isPaused();
+  }
+
+  @Override
+  public void setPaused(boolean paused) {
+    streamScope.triggerMgr().setPaused(paused);
+  }
+
+  @Override
+  public String getSamples() {
+    List<?> samples = streamScope.getSamples();
+    Gson gson = new Gson();
+    String json = gson.toJson(samples);
+    return json;
+  }
+
+  @Override
+  public int getSampleCount() {
+    return streamScope.getSampleCount();
+  }
+
+  @Override
+  public void setMaxRetentionCount(int maxCount) {
+    streamScope.bufferMgr().setMaxRetentionCount(maxCount);
+  }
+
+  @Override
+  public void setMaxRetentionTime(long age, TimeUnit unit) {
+    streamScope.bufferMgr().setMaxRetentionTime(age, unit);
+  }
+
+  @Override
+  public void setCaptureByCount(int count) {
+    streamScope.triggerMgr().setCaptureByCount(count);
+  }
+
+  @Override
+  public void setCaptureByTime(long elapsed, TimeUnit unit) {
+    streamScope.triggerMgr().setCaptureByTime(elapsed, unit);
+  }  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fbcb22f7/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeRegistry.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeRegistry.java b/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeRegistry.java
index 836428b..a02057e 100644
--- a/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeRegistry.java
+++ b/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeRegistry.java
@@ -42,41 +42,74 @@ import java.util.Set;
 public class StreamScopeRegistry {
   private final Map<String, StreamScope<?>> byNameMap = new HashMap<>();
   private final Map<StreamScope<?>, List<String>> byStreamScopeMap = new HashMap<>();
+  private static final String JOB_OPLET_FMT = "j[%s].op[%s]";
+  private static final String STREAMID_FMT = JOB_OPLET_FMT+".o[%d]";
+  private static final String ID_PREFIX = "id.";
+  private static final String ALIAS_PREFIX = "alias.";
 
   public StreamScopeRegistry() {
     
   }
+
+  /**
+   * Make a streamId for the specified stream.
+   * @param jobId the job id (e.g., "JOB_0")
+   * @param opletId the oplet id (e.g., "OP_2")
+   * @param oport the oplet output port index (0-based)
+   * @return the streamId
+   */
+  public static String mkStreamId(String jobId, String opletId, int oport) {
+    Objects.requireNonNull(jobId, "jobId");
+    Objects.requireNonNull(opletId, "opletId");
+    if (oport < 0)
+      throw new IllegalArgumentException("oport");
+    return String.format(STREAMID_FMT, jobId, opletId, oport);
+  }
+
+  /** create a prefix of a streamId based name */
+  static String mkStreamIdNamePrefix(String jobId, String opletId) {
+    return String.format(ID_PREFIX+JOB_OPLET_FMT, jobId, opletId);
+  }
   
   /** create a registration name for a stream alias */
-  public static String nameByStreamAlias(String alias) {
+  public static String nameForStreamAlias(String alias) {
     Objects.requireNonNull(alias, "alias");
-    return "alias."+alias;
+    return ALIAS_PREFIX+alias;
   }
   
-  /** create a registration name for a stream id */
-  public static String nameByStreamId(String id) {
-    Objects.requireNonNull(id, "id");
-    return "id."+id;
+  /** Create a registration name for a stream id.
+   * @see #mkStreamId(String, String, int)
+   */
+  public static String nameForStreamId(String streamId) {
+    Objects.requireNonNull(streamId, "id");
+    return ID_PREFIX+streamId;
   }
   
   /** returns null if {@code name} is not a from nameByStreamAlias() */
   public static String streamAliasFromName(String name) {
     Objects.requireNonNull(name, "name");
-    if (!name.startsWith("alias."))
+    if (!name.startsWith(ALIAS_PREFIX))
       return null;
-    return name.substring("alias.".length());
+    return name.substring(ALIAS_PREFIX.length());
   }
   
   /** returns null if {@code name} is not a from nameByStreamId() */
   public static String streamIdFromName(String name) {
     Objects.requireNonNull(name, "name");
-    if (!name.startsWith("id."))
+    if (!name.startsWith(ID_PREFIX))
       return null;
-    return name.substring("id.".length());
+    return name.substring(ID_PREFIX.length());
   }
   
-  /** A single StreamScope can be registered with multiple names.
+  /** Register a StreamScope by {@code name}
+   * <P>
+   * A single StreamScope can be registered with multiple names.
+   * </P>
+   * @param name name to register with
+   * @param streamScope the StreamScope
    * @throws IllegalStateException if a registration already exists for {@code name}
+   * @see #nameForStreamId(String)
+   * @see #nameForStreamAlias(String)
    */
   public synchronized void register(String name, StreamScope<?> streamScope) {
     if (byNameMap.containsKey(name))
@@ -91,8 +124,11 @@ public class StreamScopeRegistry {
   }
   
   /**
-   * @param name
+   * Lookup a StreamScope
+   * @param name a StreamScope is registration name
    * @return the StreamScope. null if name is not registered.
+   * @see #nameForStreamId(String)
+   * @see #nameForStreamAlias(String)
    */
   public synchronized StreamScope<?> lookup(String name) {
     return byNameMap.get(name);
@@ -119,10 +155,9 @@ public class StreamScopeRegistry {
    * @see #unregister(StreamScope)
    */
   public synchronized void unregister(String name) {
-    StreamScope<?> streamScope = byNameMap.get(name);
+    StreamScope<?> streamScope = byNameMap.remove(name);
     if (streamScope == null)
       return;
-    byNameMap.remove(name);
     List<String> names = byStreamScopeMap.get(streamScope);
     names.remove(name);
     if (names.isEmpty())
@@ -141,5 +176,21 @@ public class StreamScopeRegistry {
       unregister(name);
   }
   
+  /** remove all name registrations of the StreamScopes for the specified oplet.
+   * no-op if no registrations for the oplet
+   */
+  synchronized void unregister(String jobId, String opletId) {
+    String prefix = mkStreamIdNamePrefix(jobId, opletId);
+    List<StreamScope<?>> toUnregister = new ArrayList<>();
+    for (String name : getNames()) {
+      if (name.startsWith(prefix)) {
+        toUnregister.add(lookup(name));
+      }
+    }
+    for (StreamScope<?> streamScope : toUnregister) {
+      unregister(streamScope);
+    }
+  }
+  
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fbcb22f7/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeRegistryBean.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeRegistryBean.java b/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeRegistryBean.java
new file mode 100644
index 0000000..87c664d
--- /dev/null
+++ b/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeRegistryBean.java
@@ -0,0 +1,93 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.streamscope;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import quarks.execution.services.ControlService;
+import quarks.streamscope.mbeans.StreamScopeMXBean;
+import quarks.streamscope.mbeans.StreamScopeRegistryMXBean;
+
+/**
+ * An implementation of {@link StreamScopeRegistryMXBean}.
+ */
+public class StreamScopeRegistryBean implements StreamScopeRegistryMXBean {
+  private final StreamScopeRegistry rgy;
+  private final ControlService cs;
+  
+  // streamId -> controlId for StreamScopeBean registered for streamId
+  // TODO: seems like if we can cs.getControl(type,alias) to find a control
+  // then there should be an cs.unregister(type,alias).
+  // lacking that we need to record controls that were registered.
+  private final Map<String,String> controlIdMap = new HashMap<>();
+  
+  public StreamScopeRegistryBean(StreamScopeRegistry rgy, ControlService cs) {
+    this.rgy = rgy;
+    this.cs = cs;
+  }
+  
+  @Override
+  public StreamScopeMXBean lookup(String jobId, String opletId, int oport) {
+    String streamId = StreamScopeRegistry.mkStreamId(jobId, opletId, oport);
+    StreamScopeMXBean mbean = cs.getControl(StreamScopeMXBean.TYPE, streamId, StreamScopeMXBean.class);
+    
+    if (mbean == null) {
+      String name = StreamScopeRegistry.nameForStreamId(streamId);
+      StreamScope<?> ss = rgy.lookup(name);
+      if (ss != null) {
+        mbean = new StreamScopeBean(ss);
+        String controlId = cs.registerControl(StreamScopeMXBean.TYPE, 
+            StreamScopeMXBean.TYPE+streamId, streamId,
+            StreamScopeMXBean.class, mbean);
+        controlIdMap.put(name, controlId);
+      }
+    }
+    
+    return mbean;
+  }
+  
+  /**
+   * Unregister all StreamScopeMXBean for the oplet.
+   * no-op if none registered.
+   * <BR>
+   * N.B. This does not unregister StreamScopes from the underlying StreamScopeRegistry.
+   * @param jobId
+   * @param opletId
+   */
+  void unregister(String jobId, String opletId) {
+    for (String controlId : controlIds(jobId, opletId)) {
+      cs.unregister(controlId);
+    }
+  }
+  
+  private List<String> controlIds(String jobId, String opletId) {
+    String namePrefix = StreamScopeRegistry.mkStreamIdNamePrefix(jobId, opletId);
+    List<String> controlIds = new ArrayList<>();
+    for (String streamId : controlIdMap.keySet()) {
+      if (streamId.startsWith(namePrefix))
+        controlIds.add(controlIdMap.get(streamId));
+    }
+    return controlIds;
+  }
+  
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fbcb22f7/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeSetup.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeSetup.java b/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeSetup.java
new file mode 100644
index 0000000..17e522f
--- /dev/null
+++ b/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeSetup.java
@@ -0,0 +1,160 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.streamscope;
+
+import quarks.execution.services.ControlService;
+import quarks.execution.services.ServiceContainer;
+import quarks.streamscope.mbeans.StreamScopeRegistryMXBean;
+
+/**
+ * Utility helpers for StreamScope setup and management.
+ * <p>
+ * This class is not thread safe.
+ */
+public class StreamScopeSetup {
+    private final StreamScopeRegistry rgy;
+    private StreamScopeRegistryBean rgyBean;
+    @SuppressWarnings("unused")
+    private String rgyBeanControlId;
+    
+    // Hackery is needed to work around all of the issues cropping
+    // up with per-provider StreamScopeRegistry and complications
+    // caused by JMXControlService and SteamsScopeRegistryMXBean registrations.
+    //
+    // There are probably a couple of possible long term solutions.
+    // So for now minimize changing anything already created - i.e.,
+    // keep lazy StreamScopeBean creation via use of StreamScopeRegistryBean.
+    //
+    // Avoid rgy / rgyBean mismatches or rgyBean "already registered"
+    // conditions by mirroring the static-ness of using the platform JMX instance
+    // with a singleton StreamScopeSetup instance... which will result in
+    // singleton rgy and rgyBean instances.
+    //
+    // More background on issues:
+    // Nothing is unregistering the registry control bean from the JMX...
+    // and attempting to register by the next new provider instance throws.
+    // Note, there's no cs.unregister(type,alias) (Really? ugh) so we'd
+    // have to remember the rgyBean control controlId for use with cs.unregister(controlId).
+    // 
+    // There's a general issue of missing hooks to make unregistering
+    // controls for jobs or non-job/oplet related entities possible.
+    // ServiceContainer.addCleaner() provides a hook for job oplet shutdown.
+    // But there are no hooks available to be able to unregister JobMXBean,
+    // AppService bean, or StreamScopeRegistry etc beans.
+    //
+    // While there's a storage leak for Job beans regardless of
+    // which ControlService is used, since job beans
+    // register with a jobId that's unique across successive provider instances
+    // in a JVM (the Etiao id generator is static), an "already registered"
+    // condition is avoided.  StreamScopeRegistry bean uses the same control
+    // alias so it encounters this problem.
+    // AppService bean uses the same alias ("quarks") but it's only being
+    // used with the JsonControlService so it doesn't encounter "already
+    // registered" on successive provider instances in a JVM (each of which
+    // allocates a new control service instance).
+
+
+    // the singleton StreamScopeSetup workaround.
+    private static StreamScopeSetup setup = new StreamScopeSetup(new StreamScopeRegistry());
+    
+    public static void register(ServiceContainer services) {
+      setup.registerPvt(services);
+    }
+
+    /**
+     * Create a new {@link StreamScopeSetup} for setup.
+     *
+     * @param registry the registry to use
+     */
+    private StreamScopeSetup(StreamScopeRegistry registry) {
+      this.rgy = registry;
+    }
+    
+    /**
+     * Perform the necessary registrations. 
+     * <P>
+     * <UL>
+     * <LI>register the StreamScopeRegistry service</LI>
+     * <LI>register a cleaner to remove job oplet StreamScope registrations</LI>
+     * <LI>register a StreamScopeRegistryMXBean with the registered ControlService</LI>
+     * </UL>
+     * </P>
+     * @param services ServiceContainer to register with.
+     */
+    private void registerPvt(ServiceContainer services) {
+      services.addService(StreamScopeRegistry.class, rgy);
+      services.addCleaner(
+          (jobId, opletId) -> {
+            rgy.unregister(jobId, opletId);
+            if (rgyBean != null)
+              rgyBean.unregister(jobId, opletId);
+          });
+      registerRegistryBean(services);
+    }
+    
+    /**
+     * Register a {@link StreamScopeRegistryMXBean} with the registered
+     * {@link ControlService} (for use by the Quarks Console).
+     */
+    private void registerRegistryBean(ServiceContainer services) {
+      ControlService cs = services.getService(ControlService.class);
+      if (cs == null || rgy == null)
+        throw new IllegalStateException();
+      {
+        // more workaround...
+        //
+        // If a rgyBean control is already registered, then don't reregister
+        // (this will/should be the JMXControlService case) so as to avoid
+        // an "already registered" exception from the ControlService.
+        // The rgyBean is gotta be for the matching rgy with this 
+        // singleton StreamScopeSetup instance scheme so everything is OK.
+        
+        StreamScopeRegistryMXBean mbean = cs.getControl(StreamScopeRegistryMXBean.TYPE, StreamScopeRegistryMXBean.TYPE, StreamScopeRegistryMXBean.class);
+        if (mbean != null) {
+          return;
+        }
+      }
+      if (rgyBean == null)
+        rgyBean = new StreamScopeRegistryBean(rgy, cs);
+      rgyBeanControlId = cs.registerControl(StreamScopeRegistryMXBean.TYPE, 
+          StreamScopeRegistryMXBean.TYPE+"_0", StreamScopeRegistryMXBean.TYPE,
+          StreamScopeRegistryMXBean.class, rgyBean);
+    }
+    
+    // until there's a hook available to be able to unregister things
+//    
+//    /**
+//     * Unregister the StreamScopeRegistry service and the StreamScopeRegistryMXBean
+//     * control.
+//     * @param services
+//     */
+//    public void unregister(ServiceContainer services) {
+//      services.removeService(StreamScopeRegistry.class);
+//      unregisterRegistryBean(services);
+//    }
+//    
+//    private void unregisterRegistryBean(ServiceContainer services) {
+//      if (rgyBean != null) {
+//        ControlService cs = services.getService(ControlService.class);
+//        if (cs != null && rgyBeanControlId != null) {
+//          cs.unregister(rgyBeanControlId);
+//        }
+//      }
+//    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fbcb22f7/utils/streamscope/src/main/java/quarks/streamscope/mbeans/StreamScopeMXBean.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/main/java/quarks/streamscope/mbeans/StreamScopeMXBean.java b/utils/streamscope/src/main/java/quarks/streamscope/mbeans/StreamScopeMXBean.java
new file mode 100644
index 0000000..00c84da
--- /dev/null
+++ b/utils/streamscope/src/main/java/quarks/streamscope/mbeans/StreamScopeMXBean.java
@@ -0,0 +1,152 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.streamscope.mbeans;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Stream "oscilloscope" for capturing stream tuples for analysis / debug.
+ * @see StreamScopeRegistryMXBean
+ */
+public interface StreamScopeMXBean {
+  
+  /**
+   * TYPE is used to identify this bean as a StreamScope bean when building the bean's {@code ObjectName}.
+   * The value is {@value} 
+   */
+  public static String TYPE = "streamScope";
+  
+  /**
+   * Is tuple capture enabled?
+   * @return true if enabled.
+   */
+  boolean isEnabled();
+  
+  /**
+   * Enable or disable tuple capture.
+   * <P>
+   * Disabling releases the capture buffer.
+   * </P>
+   * @param isEnabled true to enable, false to disable.
+   * @see #setPaused(boolean)
+   */
+  void setEnabled(boolean isEnabled);
+  
+  /**
+   * Is capture paused?
+   * @return true if paused
+   */
+  boolean isPaused();
+  
+  /**
+   * Set capture paused control
+   * <P>
+   * Pausing doesn't affect the capture buffer's current contents.
+   * </P>
+   * @param paused true to pause, false to clear pause.
+   */
+  void setPaused(boolean paused);
+  
+//No ability for Predicate param w/JMX right?   
+///**
+// * Set a pause-on predicate.  Capture is paused if the predicate
+// * returns true;
+// * @param predicate the predicate
+// * @see Functions#alwaysFalse()
+// */
+//void setPauseOn(Predicate<T> predicate);
+
+  /**
+   * Get all captured tuples as JSON.
+   * <P>
+   * The JSON is that generated by {@code Gson.toJson(StreamScope.getSamples())}.
+   * </P><P>
+   * The JSON is an array of {@link quarks.streamscope.StreamScope.Sample Sample} JSON objects.  
+   * Each Sample JSON object consists of:
+   * <UL>
+   * <LI>ts - long. capture timestamp in millis from System.currentTimeMillis()</LI>
+   * <LI>nanos - long. capture nanosecond timestamp from System.nanoTime()</LI>
+   * <LI>tuple - see below</LI>
+   * </UL>
+   * The value of the "tuple" property is:
+   * <UL>
+   * <LI>for a primitive tuple type like String, Numeric or Boolean it's a 
+   * JSON string, numeric, or boolean value respectively.</LI>
+   * <LI>Arrays or collections are a JSON array of JSON elements for the members.</LI>
+   * <LI>For other types it's a JSON object with the type's
+   * members and JSON encoding of their values.</LI>
+   * </UL>
+   * </P><P>
+   * The returned samples are removed from the capture buffer.
+   * </P>
+   * @return JSON of the captured samples
+   */
+  String getSamples();
+
+  /**
+   * Get the number of Samples currently captured
+   * @return the count
+   */
+  int getSampleCount();
+
+  /**
+   * Set the maximum number of tuples to retain.
+   * <P>
+   * The capture buffer is cleared.
+   * </P>
+   * @param maxCount the maximum number of tuples to retain.
+   *        Specify 0 to disable count based retention.
+   */
+  void setMaxRetentionCount(int maxCount);
+  
+  /**
+   * Set the maximum retention time of a tuple.
+   * <P>
+   * The capture buffer is cleared.
+   * </P>
+   * @param age the amount of time to retain a tuple.
+   *        Specify 0 to disable age based retention.
+   * @param unit {@link TimeUnit}
+   */
+  void setMaxRetentionTime(long age, TimeUnit unit);
+  
+  /**
+   * Capture the first and every nth tuple
+   * @param count
+   */
+  void setCaptureByCount(int count);
+  
+  /**
+   * Capture the 1st tuple and then the next tuple after {@code period}
+   * {@code unit} time has elapsed since the previous captured tuple.
+   * 
+   * @param elapsed time to delay until next capture
+   * @param unit {@link TimeUnit}
+   */
+  void setCaptureByTime(long elapsed, TimeUnit unit);
+
+// No ability for Predicate param w/JMX right?   
+//  /**
+//   * Capture a tuple if the {@code predicate} test of the tuple returns true.
+//   * @param predicate
+//   */
+//  void setCaptureByPredicate(Predicate<T> predicate);
+  
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fbcb22f7/utils/streamscope/src/main/java/quarks/streamscope/mbeans/StreamScopeRegistryMXBean.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/main/java/quarks/streamscope/mbeans/StreamScopeRegistryMXBean.java b/utils/streamscope/src/main/java/quarks/streamscope/mbeans/StreamScopeRegistryMXBean.java
new file mode 100644
index 0000000..a1b08ab
--- /dev/null
+++ b/utils/streamscope/src/main/java/quarks/streamscope/mbeans/StreamScopeRegistryMXBean.java
@@ -0,0 +1,49 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.streamscope.mbeans;
+
+import quarks.streamscope.StreamScope;
+
+/**
+ * A registry for Stream "oscilloscope" {@link StreamScope} instances.
+ * <P>
+ * The registry contains a collection of StreamScope instances
+ * that are registered by a stream identifier.
+ * </P>
+ * @see quarks.providers.development.DevelopmentProvider DevelopmentProvider
+ */
+public interface StreamScopeRegistryMXBean {
+  
+  /**
+   * TYPE is used to identify this bean as a StreamScopeRegistry bean when building the bean's {@code ObjectName}.
+   * The value is {@value} 
+   */
+  public static String TYPE = "streamScopeRegistry";
+  
+  /**
+   * Get the {@link StreamScopeMXBean} registered for the specified stream
+   * @param jobId the job id (e.g., "JOB_0")
+   * @param opletId the oplet id (e.g., "OP_2")
+   * @param oport the oplet output port index (0-based)
+   * @return null if not found
+   */
+  public StreamScopeMXBean lookup(String jobId, String opletId, int oport);
+  
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fbcb22f7/utils/streamscope/src/main/java/quarks/streamscope/oplets/StreamScope.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/main/java/quarks/streamscope/oplets/StreamScope.java b/utils/streamscope/src/main/java/quarks/streamscope/oplets/StreamScope.java
index 7ecdfd5..8890e1f 100644
--- a/utils/streamscope/src/main/java/quarks/streamscope/oplets/StreamScope.java
+++ b/utils/streamscope/src/main/java/quarks/streamscope/oplets/StreamScope.java
@@ -19,13 +19,18 @@ under the License.
 package quarks.streamscope.oplets;
 
 import quarks.function.Consumer;
+import quarks.oplet.OpletContext;
 import quarks.oplet.functional.Peek;
+import quarks.streamscope.StreamScopeRegistry;
 
 /**
  * A Stream "oscilloscope" oplet.
  * <P>
- * TODO remove this?  Just added to help the Console specialize its presentation
- * and/or so user can differentiate a StreamScope from any other random Peek oplet use.
+ * This class exists so that we can register a StreamScope at runtime
+ * with jobId info (lacking a Function level initialize(FuntionScope) mechanism)
+ * and so the Console can differentiate a StreamScope peek from any other
+ * random Peek oplet use.  Remove this oplet subclass if/when it's no
+ * longer needed to achieve the above.
  * </P>
  *
  * @param <T> Type of the tuple.
@@ -40,5 +45,28 @@ public class StreamScope<T> extends Peek<T> {
   public StreamScope(Consumer<T> streamScope) {
     super(streamScope);
   }
+
+  @Override
+  public void initialize(OpletContext<T, T> context) {
+    super.initialize(context);
+    registerStreamScope();
+  }
+  
+  private void registerStreamScope() {
+    StreamScopeRegistry rgy = getOpletContext().getService(StreamScopeRegistry.class);
+    
+    // see commentary in DevelopmentProvider.addStreamScopes()
+    // re TODOs for being able to register for the "origin stream/oplet".
+    
+    String jobId = getOpletContext().getJobContext().getId();
+    String opletId = getOpletContext().getId(); // TODO should be "origin oplet's" id
+    int oport = 0;  // TODO should be the origin stream's oport index
+    String streamId = StreamScopeRegistry.mkStreamId(jobId, opletId, oport);
+    
+    rgy.register(StreamScopeRegistry.nameForStreamId(streamId),
+        (quarks.streamscope.StreamScope<?>) getPeeker());
+
+    // rgy.register(StreamScopeRegistry.nameForStreamAlias(alias), streamScope);
+  }
     
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/fbcb22f7/utils/streamscope/src/test/java/quarks/test/streamscope/StreamScopeTest.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/test/java/quarks/test/streamscope/StreamScopeTest.java b/utils/streamscope/src/test/java/quarks/test/streamscope/StreamScopeTest.java
index 3b7afa7..4f3a01b 100644
--- a/utils/streamscope/src/test/java/quarks/test/streamscope/StreamScopeTest.java
+++ b/utils/streamscope/src/test/java/quarks/test/streamscope/StreamScopeTest.java
@@ -32,8 +32,8 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import quarks.streamscope.StreamScope;
-import quarks.streamscope.StreamScopeRegistry;
 import quarks.streamscope.StreamScope.Sample;
+import quarks.streamscope.StreamScopeRegistry;
 import quarks.test.topology.TopologyAbstractTest;
 
 @Ignore
@@ -135,7 +135,7 @@ public abstract class StreamScopeTest extends TopologyAbstractTest {
         ss.bufferMgr().setMaxRetentionCount(10);
         
         // ---------------- trigger byCount
-        ss.triggerMgr().setByCount(3);
+        ss.triggerMgr().setCaptureByCount(3);
         ss.accept(100);
         ss.accept(101);
         ss.accept(102);
@@ -149,8 +149,19 @@ public abstract class StreamScopeTest extends TopologyAbstractTest {
         assertEquals(103, samples.get(1).tuple().intValue());
         assertEquals(106, samples.get(2).tuple().intValue());
         
+        // ---------------- trigger continuous / ByCount(1)
+        ss.triggerMgr().setCaptureByCount(1);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(101, samples.get(1).tuple().intValue());
+        assertEquals(102, samples.get(2).tuple().intValue());
+        
         // ---------------- trigger byPredicate
-        ss.triggerMgr().setByPredicate(t -> t % 2 == 0);
+        ss.triggerMgr().setCaptureByPredicate(t -> t % 2 == 0);
         ss.accept(100);
         ss.accept(101);
         ss.accept(102);
@@ -163,7 +174,7 @@ public abstract class StreamScopeTest extends TopologyAbstractTest {
         assertEquals(104, samples.get(2).tuple().intValue());
         
         // ---------------- trigger byTime
-        ss.triggerMgr().setByTime(100, TimeUnit.MILLISECONDS);
+        ss.triggerMgr().setCaptureByTime(100, TimeUnit.MILLISECONDS);
         ss.accept(100);
         ss.accept(101);
         ss.accept(102);
@@ -179,18 +190,8 @@ public abstract class StreamScopeTest extends TopologyAbstractTest {
         assertEquals(103, samples.get(1).tuple().intValue());
         assertEquals(106, samples.get(2).tuple().intValue());
         
-        // ---------------- trigger continuous
-        ss.triggerMgr().setContinuous();
-        ss.accept(100);
-        ss.accept(101);
-        ss.accept(102);
-        assertEquals(3, ss.getSampleCount());
-        samples = ss.getSamples();
-        assertEquals(100, samples.get(0).tuple().intValue());
-        assertEquals(101, samples.get(1).tuple().intValue());
-        assertEquals(102, samples.get(2).tuple().intValue());
-        
         // ---------------- trigger pause
+        ss.triggerMgr().setCaptureByCount(1);
         ss.accept(100);
         ss.accept(101);
         ss.triggerMgr().setPaused(true);
@@ -207,7 +208,7 @@ public abstract class StreamScopeTest extends TopologyAbstractTest {
         assertEquals(104, samples.get(2).tuple().intValue());
         
         // ---------------- trigger pauseOn
-        
+        ss.triggerMgr().setCaptureByCount(1);
         ss.triggerMgr().setPauseOn(t -> t == 102);
         ss.accept(100);
         ss.accept(101);
@@ -251,24 +252,33 @@ public abstract class StreamScopeTest extends TopologyAbstractTest {
       rgy.unregister(ss1);
       
       // ---------- name generation / parse functions
-      String alias1Name = StreamScopeRegistry.nameByStreamAlias("alias1");
+      String alias1Name = StreamScopeRegistry.nameForStreamAlias("alias1");
       assertNotNull(alias1Name);
-      String alias2Name = StreamScopeRegistry.nameByStreamAlias("alias2");
+      String alias2Name = StreamScopeRegistry.nameForStreamAlias("alias2");
       assertNotNull(alias2Name);
       assertFalse(alias1Name.equals(alias2Name));
       String alias1 = StreamScopeRegistry.streamAliasFromName(alias1Name);
       assertEquals("alias1", alias1);
       
-      String id1Name = StreamScopeRegistry.nameByStreamId("id1");
+      String id1Name = StreamScopeRegistry.nameForStreamId("id1");
       assertNotNull(id1Name);
-      String id2Name = StreamScopeRegistry.nameByStreamId("id2");
+      String id2Name = StreamScopeRegistry.nameForStreamId("id2");
       assertNotNull(id2Name);
       assertFalse(id1Name.equals(id2Name));
       String id1 = StreamScopeRegistry.streamIdFromName(id1Name);
       assertEquals("id1", id1);
+      
+      String streamId1 = StreamScopeRegistry.mkStreamId("JOB_1", "OP_2", 0);
+      assertNotNull(streamId1);
+      String streamId2 = StreamScopeRegistry.mkStreamId("JOB_1", "OP_2", 1);
+      assertNotNull(streamId2);
+      assertFalse(streamId1.equals(streamId2));
+      id1Name = StreamScopeRegistry.nameForStreamId(streamId1);
+      id1 = StreamScopeRegistry.streamIdFromName(id1Name);
+      assertEquals(id1, streamId1);
 
-      assertFalse(StreamScopeRegistry.nameByStreamAlias("1")
-          .equals(StreamScopeRegistry.nameByStreamId("1")));
+      assertFalse(StreamScopeRegistry.nameForStreamAlias("1")
+          .equals(StreamScopeRegistry.nameForStreamId("1")));
       
       // ---------- register
       rgy.register(alias1Name, ss1);
@@ -278,6 +288,7 @@ public abstract class StreamScopeTest extends TopologyAbstractTest {
       // ---------- lookup
       assertSame(ss1, rgy.lookup(alias1Name));
       assertSame(ss2, rgy.lookup(alias2Name));
+      assertSame(null, rgy.lookup(id1Name));
       assertSame(ss2, rgy.lookup(id2Name));
      
       // ----------- getNames
@@ -295,6 +306,7 @@ public abstract class StreamScopeTest extends TopologyAbstractTest {
       assertTrue(rgy.getStreamScopes().get(ss1).contains(alias1Name));
       assertEquals(2, rgy.getStreamScopes().get(ss2).size());
       assertTrue(rgy.getStreamScopes().get(ss2).contains(alias2Name));
+      assertFalse(rgy.getStreamScopes().get(ss2).contains(id1Name));
       assertTrue(rgy.getStreamScopes().get(ss2).contains(id2Name));
       
       // ---------- unregister
@@ -303,6 +315,7 @@ public abstract class StreamScopeTest extends TopologyAbstractTest {
       assertEquals(2, rgy.getNames().size());
       assertFalse(rgy.getNames().contains(alias1Name));
       assertFalse(rgy.getStreamScopes().keySet().contains(ss1));
+      rgy.unregister(id1Name);
       assertTrue(rgy.getStreamScopes().keySet().contains(ss2));
       
       rgy.unregister(alias2Name);


[8/9] incubator-quarks git commit: Console: use maps for lookups; add FanOut metric handling

Posted by dl...@apache.org.
Console: use maps for lookups; add FanOut metric handling

Ugh. Turns out the 'graph' object is new on every refresh hence the,
nominally static-per-job, maps are built each time.  That's an issue for
another day.  Seems to not be an issue for the simpler graphs we're
seeing now... and may still be better than iterative lookups.

Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/7513f23a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/7513f23a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/7513f23a

Branch: refs/heads/master
Commit: 7513f23a8e884501e95e8b89bec6376b58771bc4
Parents: 18c2480
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Wed May 25 13:02:46 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Wed May 25 13:02:46 2016 -0400

----------------------------------------------------------------------
 console/servlets/webapp_content/js/graph.js | 167 +++++++++++++++++------
 1 file changed, 122 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/7513f23a/console/servlets/webapp_content/js/graph.js
----------------------------------------------------------------------
diff --git a/console/servlets/webapp_content/js/graph.js b/console/servlets/webapp_content/js/graph.js
index 80817f9..bbbfefc 100644
--- a/console/servlets/webapp_content/js/graph.js
+++ b/console/servlets/webapp_content/js/graph.js
@@ -34,24 +34,30 @@ addValuesToEdges = function(graph, counterMetrics) {
 		return parseInt(cm.value, 10);
 	});
 	var quartile1 = parseInt(max * 0.25, 10);
+	
+	if (!graph.edgeMap) {
+	    // fwiw, at this time, graph is new object on every call so these
+	    // are rebuilt every time.  ugh.
+        graph.edgeMap = makeEdgeMap(edges);  // edgeKey(edge) -> edge; {incoming,outgoing}EdgesKey(opId) -> edges[]
+	    graph.vertexMap = makeVertexMap(vertices);  // id -> vertex
+	    graph.equivMetricEdgeMap = makeEquivMetricEdgeMap(graph, counterMetrics); // metricEdge -> equivEdges[]
+	}
 
 	// assign the counter metric value to the edge that has the oplet id as a source or target
+	// and set value in equivalent edges
 	counterMetrics.forEach(function(cm){
-		edges.forEach(function(edge){
-			if (edge.sourceId === cm.opId || edge.targetId === cm.opId) {
-				// add a value to this edge from the metric
-				edge.value = cm.value;	
-				// propagate the edge value downstream/upstream as appropriate
-				propagateEdgeValue(graph, edge, edge.sourceId === cm.opId);
-			} 
-		});
+	    var edges = graph.edgeMap[incomingEdgesKey(cm.opId)];
+	    pushArray(edges, graph.edgeMap[outgoingEdgesKey(cm.opId)]);
+        edges.forEach(function(edge){
+            edge.value = cm.value;
+            setEquivalentMetricEdges(graph, edge);
+        });	       
 	});
 	
 	// if there is no counter metric, assign it a mean value, along with a flag that says it is a derived value
 	edges.forEach(function(edge){
 		if (!edge.value) {
 			edge.value = quartile1;
-			edge.derived = true;
 		} else if (edge.value === "0") {
 			edge.value = 0.45;
 			edge.realValue = 0;
@@ -62,47 +68,118 @@ addValuesToEdges = function(graph, counterMetrics) {
 	return graph;
 };
 
-propagateEdgeValue = function(graph, edge, isDownstream) {
-    var opId = isDownstream ? edge.targetId : edge.sourceId;  
-    var vertex = findVertex(graph, opId);
-    if (isPeekish(vertex)) {
-        var edges = findEdges(graph, opId, isDownstream);
-        edges.forEach(function(e2){
-            if (!e2.value) {
-                e2.value = edge.value;
-                propagateEdgeValue(graph, e2, isDownstream);
-            }
-        });
+// augment arr with arr2's items
+function pushArray(arr, arr2) {
+  arr.push.apply(arr, arr2);
+}
+
+// edgeMap key for edge
+function edgeKey(edge) {
+    return edge.sourceId + "," + edge.targetId;
+}
+
+// edgeMap key for all edges whose targetId === opId
+function incomingEdgesKey(opId) {
+    return "*," + opId;
+}
+
+// edgeMap key for all edges whose sourceId === opId
+function outgoingEdgesKey(opId) {
+    return opId + ",*";
+}
+
+// make edge map of:
+// - edgeKey(edge) -> edge
+// - incomingEdgesKey(edge.targetId) -> edge.targetId incoming edges[]
+// - outgoingEdgesKey(edge.sourceId) -> edge.sourceId outgoing edges[]
+//
+function makeEdgeMap(edges) {
+    var edgeMap = {};
+    edges.forEach(function(edge){
+        edgeMap[edgeKey(edge)] = edge;
+        addToEdges(edgeMap, edge, incomingEdgesKey(edge.targetId));
+        addToEdges(edgeMap, edge, outgoingEdgesKey(edge.sourceId));
+    });
+    return edgeMap;
+}
+
+// add edge to edgeMap[key]'s edges[]
+function addToEdges(edgeMap, edge, key) {
+    var edges = edgeMap[key];
+    if (edges == null) {
+        edges = [];
+        edgeMap[key] = edges;
     }
-};
+    edges.push(edge);
+}
 
-isPeekish = function(vertex) {
-  var kind = vertex.invocation.kind;
-  return kind === "quarks.oplet.functional.Peek"
-      || kind === "quarks.streamscope.oplets.StreamScope";
-};
+// make vertex map of opId -> vertex
+function makeVertexMap(vertices) {
+    var vertexMap = {};
+    vertices.forEach(function(vertex){
+        vertexMap[vertex.id] = vertex;
+    });
+    return vertexMap;
+}
+
+// make edge map of:
+// - cmOutputEdge -> equiv downstream edges[]
+// - cmInputEdge -> equiv upstream edges[]
+function makeEquivMetricEdgeMap(graph, counterMetrics) {
+    var map = {};
+    counterMetrics.forEach(function(cm){
+        var edges = graph.edgeMap[outgoingEdgesKey(cm.opId)];
+        var edge = edges[0];
+        map[edgeKey(edge)] = collectEquivMetricEdges(graph, edge, true);
+        
+        var edges = graph.edgeMap[incomingEdgesKey(cm.opId)];
+        var edge = edges[0];
+        map[edgeKey(edge)] = collectEquivMetricEdges(graph, edge, false);
+    });
+    
+    return map;
+}
 
-findVertex = function(graph, opId) {
-  // this doesn't scale well
-  for (var i = 0; i < graph.vertices.length; i++) {
-    var vertex = graph.vertices[i];
-    if (vertex.id === opId) {
-      return vertex;
+// traverse downstream/upstream from "edge" collecting "equivalent" edges.
+// Traverses through peek ops.
+// Also includes a FanOut oplet's outputs when traversing downstream
+// because the runtime doesn't add CounterOps to them.
+// requires graph.edgeMap, graph.vertexMap
+function collectEquivMetricEdges(graph, edge, isDownstream) {
+    var equivEdges = [];
+    var vertex = graph.vertexMap[isDownstream ? edge.targetId : edge.sourceId];
+    if (isaPeek(vertex)) {
+        var key = isDownstream ? outgoingEdgesKey(vertex.id) : incomingEdgesKey(vertex.id);
+        var edges = graph.edgeMap[key];
+        pushArray(equivEdges, edges);
+        edges.forEach(function(e2){
+            pushArray(equivEdges, collectEquivMetricEdges(graph, e2, isDownstream));
+        });
     }
-  }
-  return null;
-};
+    else if (isDownstream
+            && vertex.invocation.kind == "quarks.oplet.core.FanOut") {
+        pushArray(equivEdges, graph.edgeMap[outgoingEdgesKey(vertex.id)]);
+    }
+    return equivEdges;
+}
 
-findEdges = function(graph, opId, isDownstream) {
-  var edges = [];
-  // this doesn't scale well
-  graph.edges.forEach(function(edge){
-      if (opId === (isDownstream ? edge.sourceId : edge.targetId)) {
-        edges.push(edge);
-      }
-  });
-  return edges;
-};
+// set the metricEdge's value in all edges equivalent to it.
+// requires graph.equivMetricEdgeMap
+function setEquivalentMetricEdges(graph, metricEdge) {
+    var edges = graph.equivMetricEdgeMap[edgeKey(metricEdge)];
+    edges.forEach(function(edge){
+        edge.value = metricEdge.value;
+    });
+}
+
+function isaPeek(vertex) {
+  // TODO need an oplet tag or something to generalize this
+  var kind = vertex.invocation.kind;
+  return kind === "quarks.streamscope.oplets.StreamScope"
+      || kind === "quarks.oplet.functional.Peek"
+      || kind === "quarks.metrics.oplet.RateMeter"
+      || kind === "quarks.metrics.oplet.CounterOp";
+}
 
 getVertexFillColor = function(layer, data, cMetrics) {
 	if (layer === "opletColor" || layer === "static") {


[5/9] incubator-quarks git commit: [WIP] [QUARKS-22] Initial StreamScope implementation

Posted by dl...@apache.org.
[WIP] [QUARKS-22] Initial StreamScope implementation

- Add PlumbingStreams.StreamScope (Consumer<T>)
- Add PlumbingStreams.StreamScopeRegistry
- Add quarks.topology.plumbing.StreamScope oplet (just so console can
present it differently - remove?)
- Update DevelopmentProvider to add StreamScopes
- Add StreamScope and StreamScopeRegistry tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/a808a728
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/a808a728
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/a808a728

Branch: refs/heads/master
Commit: a808a72843f2c2b1de76ce79dcb4bbe8f442f41e
Parents: 10075c6
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Wed May 11 15:53:25 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Wed May 18 16:03:04 2016 -0400

----------------------------------------------------------------------
 .../java/quarks/oplet/plumbing/StreamScope.java |  43 ++
 .../quarks/topology/plumbing/StreamScope.java   | 415 +++++++++++++++++++
 .../topology/plumbing/StreamScopeRegistry.java  | 145 +++++++
 .../java/quarks/test/topology/PlumbingTest.java | 295 ++++++++++++-
 .../development/DevelopmentProvider.java        |  37 ++
 5 files changed, 933 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a808a728/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java b/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java
new file mode 100644
index 0000000..fd88d55
--- /dev/null
+++ b/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java
@@ -0,0 +1,43 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.oplet.plumbing;
+
+import quarks.oplet.functional.Peek;
+
+/**
+ * A Stream "oscilloscope" oplet.
+ * <P>
+ * TODO remove this?  Just added to help the Console specialize its presentation
+ * and/or so user can differentiate a StreamScope from any other random Peek oplet use.
+ * </P>
+ *
+ * @param <T> Type of the tuple.
+ */
+public class StreamScope<T> extends Peek<T> {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * Create a new instance.
+   * @param streamScope the consumer function
+   */
+  public StreamScope(quarks.topology.plumbing.StreamScope<T> streamScope) {
+    super(streamScope);
+  }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a808a728/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java b/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java
new file mode 100644
index 0000000..4cf77f8
--- /dev/null
+++ b/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java
@@ -0,0 +1,415 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.topology.plumbing;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import quarks.function.Consumer;
+import quarks.function.Functions;
+import quarks.function.Predicate;
+import quarks.topology.TStream;
+
+/**
+ * A Stream "oscilloscope" for capturing stream tuples for analysis / debug.
+ * <P>
+ * A {@code StreamScope} is expected to be used as parameter to
+ * {@link TStream#peek(Consumer)}.
+ * </P><P>
+ * A {@link TriggerManager} controls which tuples are captured.
+ * A {@link BufferManager} controls the retention policy for captured tuples.
+ * </P>
+ * <P>
+ * StreamScope instances are typically registered in and located via
+ * a {@link StreamScopeRegistry}.
+ * </P>
+ * @see StreamScopeRegistry
+ * @see quarks.providers.development.DevelopmentProvider DevelopmentProvider
+ */
+public class StreamScope<T> implements Consumer<T> {
+  private static final long serialVersionUID = 1L;
+  private final BufferManager<T> buffer = new BufferManager<>();
+  private final TriggerManager<T> trigger = new TriggerManager<>();
+  private boolean isEnabled;
+  
+  /**
+   * A captured tuple.
+   * <P>
+   * The Sample captures the tuple, and the system time and nanoTime
+   * that the tuple was received.
+   * </P>
+   *
+   * @param <T> Tuple type.
+   */
+  public static class Sample<T> {
+    private final long ts;
+    private final long nanoTime;
+    private final T tuple;
+    
+    Sample(T tuple) {
+      this.ts = System.currentTimeMillis();
+      this.nanoTime = System.nanoTime();
+      this.tuple = tuple;
+    }
+    
+    /**
+     * Capture time in msec since the epoch.
+     * @return the timestamp
+     * @see System#currentTimeMillis();
+     */
+    public long timestamp() {
+      return ts;
+    }
+    
+    /**
+     * Capture time in nanoTime.
+     * @return the nanoTime
+     * @see System#nanoTime()
+     */
+    public long nanoTime() {
+      return nanoTime;
+    }
+    
+    /**
+     * The captured tuple.
+     * @return the tuple
+     */
+    public T tuple() {
+      return tuple;
+    }
+    
+    @Override
+    public String toString() {
+      return "ts="+ts+" nano="+nanoTime+" tuple="+tuple;
+    }
+  }
+  
+  /**
+   * Control the retention of captured tuples.
+   * <P>
+   * Captured tuples are retained until either:
+   * <ul>
+   * <li>a maximum retention count is exceeded</li>
+   * <li>TODO a maximum retention time is exceeded</li>
+   * </ul> 
+   * </P><P>
+   * The default configuration is a maxCount of 10.
+   * </P>
+   */
+  public static class BufferManager<T> {
+    private List<Sample<T>> buffer = Collections.emptyList();
+    private int maxCount = 10;
+    private long period;
+    private TimeUnit unit;
+    
+    // TODO timer based eviction
+    // TODO look at using quarks Windows / WindowImpl for the buffer.
+    //      Can window type/"size" be changed?
+    //      Does it support simultaneous maxCount & maxAge?
+    
+    List<Sample<T>> getSamples() {
+      return Collections.unmodifiableList(buffer);
+    }
+    
+    /**
+     * Set the maximum number of tuples to retain.
+     * <P>
+     * The capture buffer is cleared.
+     * </P>
+     * @param maxCount the maximum number of tuples to retain.
+     *        Specify 0 to disable count based retention.
+     */
+    public void setMaxRetentionCount(int maxCount) {
+      this.maxCount = maxCount;
+      allocate();
+    }
+    
+    /**
+     * Set the maximum retention time of a tuple.
+     * <P>
+     * The capture buffer is cleared.
+     * </P>
+     * @param age the amount of time to retain a tuple.
+     *        Specify 0 to disable age based retention.
+     * @param unit {@link TimeUnit}
+     */
+    public void setMaxRetentionTime(long age, TimeUnit unit) {
+      if (age < 0)
+        throw new IllegalArgumentException("age");
+      Objects.requireNonNull(unit, "unit");
+      this.period = age;
+      this.unit = unit;
+      
+      throw new IllegalStateException("setMaxRetentionTime is NYI");
+    }
+    
+    /**
+     * Get the number of tuples in the capture buffer.
+     * @return the count.
+     */
+    int getCount() {
+      return buffer.size();
+    }
+    
+    void release() {
+      buffer = Collections.emptyList();
+    }
+    
+    void allocate() {
+      buffer = new LinkedList<>();
+    }
+    
+    void add(Sample<T> sample) {
+      if (maxCount > 0 && buffer.size() >= maxCount)
+        buffer.remove(0);
+      buffer.add(sample);
+    }
+    
+    @Override
+    public String toString() {
+      return "size="+getCount()+" maxCount="+maxCount+" maxAge="+period+(unit==null ? "" : unit);
+    }
+    
+  }
+  
+  /**
+   * Control what triggers capturing of tuples.
+   * <P>
+   * The following modes are supported:
+   * <ul>
+   * <li>continuous - capture every tuple</li> 
+   * <li>by-count - capture every nth tuple</li>
+   * <li>by-time - capture based on time elapsed since the last capture</li>
+   * <li>by-Predicate - capture based on evaluating a predicate</li>
+   * </ul>
+   * </P><P>
+   * Tuple capture can be temporarily paused via {@link TriggerManager#setPaused(boolean) setPaused}.
+   * Pausing capture does not clear the capture buffer.
+   * </P><P>
+   * Capture processing can be automatically paused when a particular event
+   * has occurred via {@link TriggerManager#setPauseOn(Predicate) setPauseOn}.
+   * The pause predicate is evaluated after processing each received tuple.
+   * Use {@link TriggerManager#setPaused(boolean)} setPaused} to re-enable capture
+   * following a triggered pause.
+   * </P><P>
+   * The default configuration is {@code continuous} and not paused.
+   * </P>
+   */
+  public static class TriggerManager<T> {
+    private Predicate<T> predicate = Functions.alwaysTrue();
+    private Predicate<T> pauseOnPredicate = Functions.alwaysFalse();
+    private boolean paused = false;
+    
+    /**
+     * Test if the tuple should be captured.
+     * @param tuple the tuple
+     * @return true to capture the tuple.
+     */
+    boolean test(T tuple) {
+      if (paused)
+        return false;
+      boolean b = predicate.test(tuple);
+      paused = pauseOnPredicate.test(tuple);
+      return b;
+    }
+    
+    /**
+     * Set capture paused control
+     * @param paused true to pause, false to clear pause.
+     */
+    public void setPaused(boolean paused) {
+      this.paused = paused;
+    }
+    
+    /**
+     * Is capture paused?
+     * @return true if paused
+     */
+    public boolean isPaused() {
+      return paused;
+    }
+    
+    /**
+     * Set a pause-on predicate.  Capture is paused if the predicate
+     * returns true;
+     * @param predicate the predicate
+     * @see Functions#alwaysFalse()
+     */
+    public void setPauseOn(Predicate<T> predicate) {
+      Objects.requireNonNull(predicate, "predicate");
+      pauseOnPredicate = predicate;
+    }
+    
+    /**
+     * Capture every tuple.
+     */
+    public void setContinuous() {
+      setByPredicate(Functions.alwaysTrue());
+    }
+    
+    /**
+     * Capture the first and every nth tuple
+     * @param count
+     */
+    public void setByCount(int count) {
+      setByPredicate(newByCountPredicate(count));
+    }
+    
+    /**
+     * Capture the 1st tuple and then the next tuple after {@code period}
+     * {@code unit} time has elapsed since the previous captured tuple.
+     * 
+     * @param elapsed time to delay until next capture
+     * @param unit {@link TimeUnit}
+     */
+    public void setByTime(long elapsed, TimeUnit unit) {
+      setByPredicate(newByTimePredicate(elapsed, unit));
+    }
+    
+    /**
+     * Capture a tuple if the {@code predicate} test of the tuple returns true.
+     * @param predicate
+     */
+    public void setByPredicate(Predicate<T> predicate) {
+      Objects.requireNonNull(predicate, "predicate");
+      this.predicate = predicate;
+    }
+    
+    private static <T> Predicate<T> newByCountPredicate(int count) {
+      if (count < 1)
+        throw new IllegalArgumentException("count");
+      return new Predicate<T>() {
+        private static final long serialVersionUID = 1L;
+        int byCount = count;
+        int curCount = -1;  // capture 1st and every byCount-nth
+
+        @Override
+        public boolean test(T value) {
+          return ++curCount % byCount == 0;
+        }
+      };
+    }
+    
+    private static <T> Predicate<T> newByTimePredicate(long elapsed, TimeUnit unit) {
+      if (elapsed < 1)
+        throw new IllegalArgumentException("elapsed");
+      Objects.requireNonNull(unit, "unit");
+      return new Predicate<T>() {
+        private static final long serialVersionUID = 1L;
+        private long nextTime;
+
+        @Override
+        public boolean test(T value) {
+          long now = System.currentTimeMillis();
+          if (now > nextTime) {
+            nextTime = now + unit.toMillis(elapsed);
+            return true;
+          }
+          return false;
+        }
+      };
+    }
+
+  }
+  
+  /**
+   * Create a new instance.
+   * <P>
+   * Sample capture is disabled.
+   * </P>
+   */
+  public StreamScope() {
+  }
+  
+  /**
+   * Is tuple capture enabled?
+   * @return true if enabled.
+   */
+  public boolean isEnabled() {
+    return isEnabled;
+  }
+  
+  /**
+   * Enable or disable tuple capture.
+   * <P>
+   * Disabling releases the capture buffer.
+   * </P>
+   * @param isEnabled true to enable, false to disable.
+   */
+  public synchronized void setEnabled(boolean isEnabled) {
+    if (!isEnabled)
+      buffer.release();
+    buffer.allocate();
+    this.isEnabled = isEnabled;
+  }
+  
+  /**
+   * Get the {@link BufferManager}
+   * @return the manager
+   */
+  public BufferManager<T> bufferMgr() {
+    return buffer;
+  }
+  
+  /**
+   * Get the {@link TriggerManager}
+   * @return the manager
+   */
+  public TriggerManager<T> triggerMgr() {
+    return trigger;
+  }
+  
+  /**
+   * Get all captured tuples.
+   * <P>
+   * The returned samples are removed from the capture buffer.
+   * </P>
+   * @return unmodifiable list of captured samples
+   */
+  public synchronized List<Sample<T>> getSamples() {
+    List<Sample<T>> tmp = buffer.getSamples();
+    buffer.allocate();
+    return tmp;
+  }
+
+  /**
+   * Get the number of Samples currently captured
+   * @return the count
+   */
+  public synchronized int getSampleCount() {
+    return buffer.getCount();
+  }
+
+  @Override
+  public synchronized void accept(T tuple) {
+    if (!isEnabled())
+      return;
+    if (trigger.test(tuple))
+      buffer.add(new Sample<T>(tuple));
+  }
+
+  @Override
+  public String toString() {
+    return "isEnabled="+isEnabled+" bufferMgr="+bufferMgr();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a808a728/api/topology/src/main/java/quarks/topology/plumbing/StreamScopeRegistry.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/StreamScopeRegistry.java b/api/topology/src/main/java/quarks/topology/plumbing/StreamScopeRegistry.java
new file mode 100644
index 0000000..dbfdd80
--- /dev/null
+++ b/api/topology/src/main/java/quarks/topology/plumbing/StreamScopeRegistry.java
@@ -0,0 +1,145 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.topology.plumbing;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A registry for Stream "oscilloscope" {@link StreamScope} instances.
+ * <P>
+ * The registry contains a collection of StreamScope instances
+ * that are registered by one or more names.
+ * </P><P>
+ * The names are: by a TStream {@link quarks.topology.TStream#alias(String) alias} or
+ * by a stream's (output port's) unique identifier.
+ * Static methods are provided for composing these names and extracting
+ * the alias/identifier from generated names.
+ * </P>
+ * @see quarks.providers.development.DevelopmentProvider DevelopmentProvider
+ */
+public class StreamScopeRegistry {
+  private final Map<String, StreamScope<?>> byNameMap = new HashMap<>();
+  private final Map<StreamScope<?>, List<String>> byStreamScopeMap = new HashMap<>();
+
+  public StreamScopeRegistry() {
+    
+  }
+  
+  /** create a registration name for a stream alias */
+  public static String nameByStreamAlias(String alias) {
+    Objects.requireNonNull(alias, "alias");
+    return "alias."+alias;
+  }
+  
+  /** create a registration name for a stream id */
+  public static String nameByStreamId(String id) {
+    Objects.requireNonNull(id, "id");
+    return "id."+id;
+  }
+  
+  /** returns null if {@code name} is not a from nameByStreamAlias() */
+  public static String streamAliasFromName(String name) {
+    Objects.requireNonNull(name, "name");
+    if (!name.startsWith("alias."))
+      return null;
+    return name.substring("alias.".length());
+  }
+  
+  /** returns null if {@code name} is not a from nameByStreamId() */
+  public static String streamIdFromName(String name) {
+    Objects.requireNonNull(name, "name");
+    if (!name.startsWith("id."))
+      return null;
+    return name.substring("id.".length());
+  }
+  
+  /** A single StreamScope can be registered with multiple names.
+   * @throws IllegalStateException if a registration already exists for {@code name}
+   */
+  public synchronized void register(String name, StreamScope<?> streamScope) {
+    if (byNameMap.containsKey(name))
+      throw new IllegalStateException("StreamScope already registered by name "+name);
+    byNameMap.put(name, streamScope);
+    List<String> names = byStreamScopeMap.get(streamScope); 
+    if (names == null) {
+      names = new ArrayList<>(2);
+      byStreamScopeMap.put(streamScope, names);
+    }
+    names.add(name);
+  }
+  
+  /**
+   * @param name
+   * @return the StreamScope. null if name is not registered.
+   */
+  public synchronized StreamScope<?> lookup(String name) {
+    return byNameMap.get(name);
+  }
+  
+  /**
+   * Get the registered names.
+   * @return unmodifiable collection of the name.
+   * The set is backed by the registry so the set may change.
+   */
+  public synchronized Set<String> getNames() {
+    return Collections.unmodifiableSet(byNameMap.keySet());
+  }
+  
+  /** Get registered StreamScopes and the name(s) each is registered with.
+   * The map is backed by the registry so its contents may change.
+   */
+  public synchronized Map<StreamScope<?>, List<String>> getStreamScopes() {
+    return Collections.unmodifiableMap(byStreamScopeMap);
+  }
+  
+  /** remove the specific name registration.  Other registration of the same StreamScope may still exist.
+   * no-op if name is not registered.
+   * @see #unregister(StreamScope)
+   */
+  public synchronized void unregister(String name) {
+    StreamScope<?> streamScope = byNameMap.get(name);
+    if (streamScope == null)
+      return;
+    byNameMap.remove(name);
+    List<String> names = byStreamScopeMap.get(streamScope);
+    names.remove(name);
+    if (names.isEmpty())
+      byStreamScopeMap.remove(streamScope);
+  }
+  
+  /** remove all name registrations of the StreamScope.
+   * no-op if no registrations for the StreamScope
+   */
+  public synchronized void unregister(StreamScope<?> streamScope) {
+    List<String> names = byStreamScopeMap.get(streamScope);
+    if (names == null)
+      return;
+    names = new ArrayList<>(names);
+    for (String name : names)
+      unregister(name);
+  }
+  
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a808a728/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index f2535ed..f88da2d 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -1,4 +1,4 @@
-/*
+//*
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
 distributed with this work for additional information
@@ -20,6 +20,10 @@ package quarks.test.topology;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 
@@ -44,6 +48,9 @@ import quarks.function.ToIntFunction;
 import quarks.topology.TStream;
 import quarks.topology.Topology;
 import quarks.topology.plumbing.PlumbingStreams;
+import quarks.topology.plumbing.StreamScope;
+import quarks.topology.plumbing.StreamScope.Sample;
+import quarks.topology.plumbing.StreamScopeRegistry;
 import quarks.topology.plumbing.Valve;
 import quarks.topology.tester.Condition;
 
@@ -743,4 +750,288 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
         assertTrue("valid:" + contents.getResult(), contents.valid());
         assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
     }
-}
\ No newline at end of file
+    
+    @Test
+    public void testStreamScopeFn() throws Exception {
+
+        StreamScope<Integer> ss = new StreamScope<>();
+
+        List<Sample<Integer>> samples; 
+        Sample<Integer> sample;
+
+        assertFalse(ss.isEnabled());
+        assertNotNull(ss.bufferMgr());
+        assertNotNull(ss.triggerMgr());
+        assertEquals(0, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertNotNull(samples);
+        assertEquals(0, samples.size());
+        
+        // ---------------- no capture when not enabled
+        ss.accept(1);
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+        
+        ss.setEnabled(true);
+        
+        // ---------------- single capture
+        // note: getSamples() removes captured tuples
+        ss.accept(100);
+        assertEquals(1, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(1, samples.size());
+        sample = samples.get(0);
+        assertEquals(100, sample.tuple().intValue());
+        assertNotEquals(0, sample.timestamp());
+        assertNotEquals(0, sample.nanoTime());
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+        
+        // ---------------- next capture/get; different lists
+        List<Sample<Integer>> savedSamples = samples;
+        ss.accept(101);
+        assertEquals(1, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(101, samples.get(0).tuple().intValue());
+        assertNotEquals(samples, savedSamples);
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+        
+        // ---------------- multi capture
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(101, samples.get(1).tuple().intValue());
+        assertEquals(102, samples.get(2).tuple().intValue());
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+        
+        // ----------------  disable => clears capture buffer
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        assertEquals(3, ss.getSampleCount());
+        ss.setEnabled(false);
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+
+        ss.setEnabled(true);
+
+        // ---------------- buffer control at the limit (no eviction)
+        ss.bufferMgr().setMaxRetentionCount(3);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(101, samples.get(1).tuple().intValue());
+        assertEquals(102, samples.get(2).tuple().intValue());
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+        
+        // ---------------- buffer control with eviction
+        ss.bufferMgr().setMaxRetentionCount(2);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        assertEquals(2, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(101, samples.get(0).tuple().intValue());
+        assertEquals(102, samples.get(1).tuple().intValue());
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+        ss.bufferMgr().setMaxRetentionCount(10);
+        
+        // ---------------- trigger byCount
+        ss.triggerMgr().setByCount(3);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        ss.accept(103);
+        ss.accept(104);
+        ss.accept(105);
+        ss.accept(106);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(103, samples.get(1).tuple().intValue());
+        assertEquals(106, samples.get(2).tuple().intValue());
+        
+        // ---------------- trigger byPredicate
+        ss.triggerMgr().setByPredicate(t -> t % 2 == 0);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        ss.accept(103);
+        ss.accept(104);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(102, samples.get(1).tuple().intValue());
+        assertEquals(104, samples.get(2).tuple().intValue());
+        
+        // ---------------- trigger byTime
+        ss.triggerMgr().setByTime(100, TimeUnit.MILLISECONDS);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        Thread.sleep(110);
+        ss.accept(103);
+        ss.accept(104);
+        ss.accept(105);
+        Thread.sleep(110);
+        ss.accept(106);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(103, samples.get(1).tuple().intValue());
+        assertEquals(106, samples.get(2).tuple().intValue());
+        
+        // ---------------- trigger continuous
+        ss.triggerMgr().setContinuous();
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(101, samples.get(1).tuple().intValue());
+        assertEquals(102, samples.get(2).tuple().intValue());
+        
+        // ---------------- trigger pause
+        ss.accept(100);
+        ss.accept(101);
+        ss.triggerMgr().setPaused(true);
+        assertTrue(ss.triggerMgr().isPaused());
+        ss.accept(102);
+        ss.accept(103);
+        ss.triggerMgr().setPaused(false);
+        assertFalse(ss.triggerMgr().isPaused());
+        ss.accept(104);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(101, samples.get(1).tuple().intValue());
+        assertEquals(104, samples.get(2).tuple().intValue());
+        
+        // ---------------- trigger pauseOn
+        
+        ss.triggerMgr().setPauseOn(t -> t == 102);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        ss.accept(103);
+        ss.accept(104);
+        ss.accept(105);
+        assertTrue(ss.triggerMgr().isPaused());
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(101, samples.get(1).tuple().intValue());
+        assertEquals(102, samples.get(2).tuple().intValue());
+        ss.triggerMgr().setPaused(false);
+        ss.accept(1000);
+        ss.accept(1010);
+        ss.accept(102);
+        ss.accept(1030);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(1000, samples.get(0).tuple().intValue());
+        assertEquals(1010, samples.get(1).tuple().intValue());
+        assertEquals(102, samples.get(2).tuple().intValue());
+        
+    }
+
+    @Test
+    public void testStreamScopeRegistry() throws Exception {
+
+      StreamScope<Integer> ss1 = new StreamScope<>();
+      StreamScope<Integer> ss2 = new StreamScope<>();
+      
+      StreamScopeRegistry rgy = new StreamScopeRegistry();
+      
+      assertNotNull(rgy.getNames());
+      assertEquals(0, rgy.getNames().size());
+      assertNotNull(rgy.getStreamScopes());
+      assertEquals(0, rgy.getStreamScopes().size());
+      assertNull(rgy.lookup("xyzzy"));
+      rgy.unregister("xyzzy");
+      rgy.unregister(ss1);
+      
+      // ---------- name generation / parse functions
+      String alias1Name = StreamScopeRegistry.nameByStreamAlias("alias1");
+      assertNotNull(alias1Name);
+      String alias2Name = StreamScopeRegistry.nameByStreamAlias("alias2");
+      assertNotNull(alias2Name);
+      assertNotEquals(alias1Name, alias2Name);
+      String alias1 = StreamScopeRegistry.streamAliasFromName(alias1Name);
+      assertEquals("alias1", alias1);
+      
+      String id1Name = StreamScopeRegistry.nameByStreamId("id1");
+      assertNotNull(id1Name);
+      String id2Name = StreamScopeRegistry.nameByStreamId("id2");
+      assertNotNull(id2Name);
+      assertNotEquals(id1Name, id2Name);
+      String id1 = StreamScopeRegistry.streamIdFromName(id1Name);
+      assertEquals("id1", id1);
+
+      assertNotEquals(StreamScopeRegistry.nameByStreamAlias("1"),
+                      StreamScopeRegistry.nameByStreamId("1"));
+      
+      // ---------- register
+      rgy.register(alias1Name, ss1);
+      rgy.register(alias2Name, ss2);
+      rgy.register(id2Name, ss2);
+
+      // ---------- lookup
+      assertSame(ss1, rgy.lookup(alias1Name));
+      assertSame(ss2, rgy.lookup(alias2Name));
+      assertSame(ss2, rgy.lookup(id2Name));
+     
+      // ----------- getNames
+      assertEquals(3, rgy.getNames().size());
+      assertTrue(rgy.getNames().contains(alias1Name));
+      assertFalse(rgy.getNames().contains(id1Name));
+      assertTrue(rgy.getNames().contains(alias2Name));
+      assertTrue(rgy.getNames().contains(id2Name));
+      
+      // ----------- getStreamScopes
+      assertEquals(2, rgy.getStreamScopes().keySet().size());
+      assertTrue(rgy.getStreamScopes().keySet().contains(ss1));
+      assertTrue(rgy.getStreamScopes().keySet().contains(ss2));
+      assertEquals(1, rgy.getStreamScopes().get(ss1).size());
+      assertTrue(rgy.getStreamScopes().get(ss1).contains(alias1Name));
+      assertEquals(2, rgy.getStreamScopes().get(ss2).size());
+      assertTrue(rgy.getStreamScopes().get(ss2).contains(alias2Name));
+      assertTrue(rgy.getStreamScopes().get(ss2).contains(id2Name));
+      
+      // ---------- unregister
+      rgy.unregister(alias1Name);
+      assertNull(rgy.lookup(alias1Name));
+      assertEquals(2, rgy.getNames().size());
+      assertFalse(rgy.getNames().contains(alias1Name));
+      assertFalse(rgy.getStreamScopes().keySet().contains(ss1));
+      assertTrue(rgy.getStreamScopes().keySet().contains(ss2));
+      
+      rgy.unregister(alias2Name);
+      assertNull(rgy.lookup(alias2Name));
+      assertEquals(1, rgy.getNames().size());
+      assertFalse(rgy.getNames().contains(alias2Name));
+      assertTrue(rgy.getStreamScopes().keySet().contains(ss2));
+      assertSame(ss2, rgy.lookup(id2Name));
+      rgy.unregister(id2Name);
+      assertEquals(0, rgy.getNames().size());
+      assertEquals(0, rgy.getStreamScopes().keySet().size());
+      
+      rgy.register(alias2Name, ss2);
+      rgy.register(id2Name, ss2);
+      rgy.unregister(ss2);
+      assertEquals(0, rgy.getNames().size());
+      assertEquals(0, rgy.getStreamScopes().keySet().size());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a808a728/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
----------------------------------------------------------------------
diff --git a/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java b/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
index 137b936..cd2ca9f 100644
--- a/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
+++ b/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
@@ -30,6 +30,7 @@ import quarks.execution.Job;
 import quarks.execution.services.ControlService;
 import quarks.graph.Connector;
 import quarks.graph.Edge;
+import quarks.graph.Vertex;
 import quarks.metrics.Metrics;
 import quarks.metrics.MetricsSetup;
 import quarks.metrics.oplets.CounterOp;
@@ -38,6 +39,8 @@ import quarks.oplet.core.Peek;
 import quarks.providers.direct.DirectProvider;
 import quarks.runtime.jmxcontrol.JMXControlService;
 import quarks.topology.Topology;
+import quarks.topology.plumbing.StreamScope;
+import quarks.topology.plumbing.StreamScopeRegistry;
 
 /**
  * Provider intended for development.
@@ -78,6 +81,9 @@ public class DevelopmentProvider extends DirectProvider {
         
         getServices().addService(ControlService.class,
                 new JMXControlService(JMX_DOMAIN, new Hashtable<>()));
+        
+        getServices().addService(StreamScopeRegistry.class,
+                new StreamScopeRegistry());
 
         HttpServer server = HttpServer.getInstance();
         getServices().addService(HttpServer.class, server);   
@@ -87,6 +93,7 @@ public class DevelopmentProvider extends DirectProvider {
     @Override
     public Future<Job> submit(Topology topology, JsonObject config) {
         Metrics.counter(topology);
+        addStreamScopes(topology);
         duplicateTags(topology);
         return super.submit(topology, config);
     }
@@ -132,4 +139,34 @@ public class DevelopmentProvider extends DirectProvider {
         c.tag(ta);
       }
     }
+
+    /**
+     * Add StreamScope instances to the topology
+     * @param t the Topology
+     */
+    private void addStreamScopes(Topology t) {
+      StreamScopeRegistry rgy = (StreamScopeRegistry) 
+          t.getRuntimeServiceSupplier().get()
+            .getService(StreamScopeRegistry.class);
+      if (rgy == null)
+        return;
+
+      t.graph().peekAll( 
+          () -> {
+              StreamScope<?> streamScope = new StreamScope<>();
+              Peek<?> peekOp = new quarks.oplet.plumbing.StreamScope<>(streamScope);
+              registerStreamScope(rgy, peekOp, streamScope);
+              return peekOp;
+            },
+          (Vertex<?, ?, ?> v) -> !(v.getInstance() instanceof quarks.oplet.core.FanOut));
+    }
+    
+    private int hackCount = 0;  // TODO temp hack to enable some test development
+    private void registerStreamScope(StreamScopeRegistry rgy, Peek<?> peekOp, StreamScope<?> streamScope) {
+      hackCount++;
+      String id = "oplet-"+ hackCount;  // TODO get from peekOp's source oport  <opletId>.oport.<n>
+      String alias = "streamAlias-"+ hackCount;  //  TODO get from peekOp's source oport context
+      rgy.register(StreamScopeRegistry.nameByStreamAlias(alias), streamScope);
+      rgy.register(StreamScopeRegistry.nameByStreamId(id), streamScope);
+    }
 }



[6/9] incubator-quarks git commit: move addStreamScopes into StreamScopeSetup; javadoc improvement

Posted by dl...@apache.org.
move addStreamScopes into StreamScopeSetup; javadoc improvement

Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/73c90f60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/73c90f60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/73c90f60

Branch: refs/heads/master
Commit: 73c90f602d78599f02b185043fd611f029a79af7
Parents: fbcb22f
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Tue May 24 11:00:19 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Tue May 24 11:00:19 2016 -0400

----------------------------------------------------------------------
 .../development/DevelopmentProvider.java        | 59 ++++------------
 .../providers/dev/DevelopmentProviderTest.java  |  2 +-
 .../java/quarks/streamscope/StreamScope.java    | 17 +++--
 .../streamscope/StreamScopeRegistryBean.java    |  1 +
 .../quarks/streamscope/StreamScopeSetup.java    | 71 ++++++++++++++++++++
 .../mbeans/StreamScopeRegistryMXBean.java       |  6 +-
 .../quarks/streamscope/mbeans/package-info.java | 24 +++++++
 .../java/quarks/streamscope/package-info.java   | 35 ++++++++++
 8 files changed, 159 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/73c90f60/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
----------------------------------------------------------------------
diff --git a/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java b/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
index 799a5db..7097fcf 100644
--- a/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
+++ b/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
@@ -30,7 +30,6 @@ import quarks.execution.Job;
 import quarks.execution.services.ControlService;
 import quarks.graph.Connector;
 import quarks.graph.Edge;
-import quarks.graph.Vertex;
 import quarks.metrics.Metrics;
 import quarks.metrics.MetricsSetup;
 import quarks.metrics.oplets.CounterOp;
@@ -38,9 +37,9 @@ import quarks.oplet.core.FanOut;
 import quarks.oplet.core.Peek;
 import quarks.providers.direct.DirectProvider;
 import quarks.runtime.jmxcontrol.JMXControlService;
-import quarks.streamscope.StreamScope;
 import quarks.streamscope.StreamScopeRegistry;
 import quarks.streamscope.StreamScopeSetup;
+import quarks.streamscope.mbeans.StreamScopeRegistryMXBean;
 import quarks.topology.Topology;
 
 /**
@@ -65,7 +64,18 @@ import quarks.topology.Topology;
  * The implementation calls {@link Metrics#counter(Topology)} to insert 
  * {@link CounterOp} oplets into each stream.
  * </LI>
+ * <LI>
+ * Instrument the topology adding {@link quarks.streamscope.oplets.StreamScope StreamScope}
+ * oplets on all the streams before submitting a topology.  
+ * See {@link StreamScopeSetup#addStreamScopes(Topology) StreamScopeSetup.addStreamscopes}.
+ * </LI>
+ * <LI>
+ * Add a {@link StreamScopeRegistry} runtime service and a
+ * {@link StreamScopeRegistryMXBean} management bean to the {@code ControlService}.
+ * See {@link StreamScopeSetup#register(quarks.execution.services.ServiceContainer) StreamScopeSetup.register}.
+ * </LI>
  * </UL>
+ * @see StreamScopeRegistry
  */
 public class DevelopmentProvider extends DirectProvider {
     
@@ -92,8 +102,8 @@ public class DevelopmentProvider extends DirectProvider {
 
     @Override
     public Future<Job> submit(Topology topology, JsonObject config) {
-        addStreamScopes(topology);
         Metrics.counter(topology);
+        StreamScopeSetup.addStreamScopes(topology);
         duplicateTags(topology);
         return super.submit(topology, config);
     }
@@ -140,47 +150,4 @@ public class DevelopmentProvider extends DirectProvider {
       }
     }
 
-    /**
-     * Add StreamScope instances to the topology
-     * @param t the Topology
-     */
-    private void addStreamScopes(Topology t) {
-      StreamScopeRegistry rgy = (StreamScopeRegistry) 
-          t.getRuntimeServiceSupplier().get()
-            .getService(StreamScopeRegistry.class);
-      if (rgy == null)
-        return;
-      
-      // N.B. at runtime, the Console will need to be able to lookup
-      // StreamScopeMXBean be streamId (see StreamScopeRegistryMXBean).
-      // Nominally, that streamId should be the jobId/opletId/oport of
-      // the stream that the StreamScope was created for - what I'll
-      // call the "origin stream". i.e., the Console shouldn't be 
-      // looking up a streamId for the StreamScopeOplet's opletId.
-      //
-      // Registration is left to the StreamScope oplet initialization processing
-      // since the jobId isn't known until that time.
-      // 
-      // As noted above we really want the StreamScope's streamId registration
-      // to be for the "origin stream".  Today the API (Graph,Vertex,Oplet)
-      // doesn't provide that information so we can't capture that as part
-      // of the StreamScope oplet's info.  The net is that for the time being
-      // a StreamScope will end up having to register itself with its opletId,
-      // not the origin oplet's id and that has implications for the Console. 
-      //
-      // We could create a peekAllFn that takes a BiFunction that receives
-      // the Vertex+oport the StreamScope is being created for but that
-      // Vertex/Oplet's opletId still isn't accessible today.
-      // (The Etaio provider maintains the opletId in its Instance object fwiw).
-      //
-      // TODO straighten this all out
-
-      t.graph().peekAll( 
-          () -> {
-              StreamScope<?> streamScope = new StreamScope<>();
-              Peek<?> peekOp = new quarks.streamscope.oplets.StreamScope<>(streamScope);
-              return peekOp;
-            },
-          (Vertex<?, ?, ?> v) -> !(v.getInstance() instanceof quarks.oplet.core.FanOut));
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/73c90f60/providers/development/src/test/java/quarks/test/providers/dev/DevelopmentProviderTest.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/quarks/test/providers/dev/DevelopmentProviderTest.java b/providers/development/src/test/java/quarks/test/providers/dev/DevelopmentProviderTest.java
index 23b4682..dfbdf9b 100644
--- a/providers/development/src/test/java/quarks/test/providers/dev/DevelopmentProviderTest.java
+++ b/providers/development/src/test/java/quarks/test/providers/dev/DevelopmentProviderTest.java
@@ -73,7 +73,7 @@ public class DevelopmentProviderTest extends TopologyAbstractTest implements Dev
 
     // DevelopmentProvider inserts StreamScope oplets into the graph
     @Test
-    public void testStreaScopesEverywhere() throws Exception {
+    public void testStreamScopesEverywhere() throws Exception {
 
         Topology t = newTopology();
         TStream<String> s = t.strings("a", "b", "c");

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/73c90f60/utils/streamscope/src/main/java/quarks/streamscope/StreamScope.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/main/java/quarks/streamscope/StreamScope.java b/utils/streamscope/src/main/java/quarks/streamscope/StreamScope.java
index 5b8d7a6..2a76755 100644
--- a/utils/streamscope/src/main/java/quarks/streamscope/StreamScope.java
+++ b/utils/streamscope/src/main/java/quarks/streamscope/StreamScope.java
@@ -27,20 +27,27 @@ import java.util.concurrent.TimeUnit;
 import quarks.function.Consumer;
 import quarks.function.Functions;
 import quarks.function.Predicate;
-import quarks.topology.TStream;
+import quarks.streamscope.mbeans.StreamScopeRegistryMXBean;
 
 /**
  * A Stream "oscilloscope" for capturing stream tuples for analysis / debug.
  * <P>
  * A {@code StreamScope} is expected to be used as parameter to
- * {@link TStream#peek(Consumer)}.
+ * {@link quarks.streamscope.oplets.StreamScope} oplet.
  * </P><P>
  * A {@link TriggerManager} controls which tuples are captured.
  * A {@link BufferManager} controls the retention policy for captured tuples.
- * </P>
- * <P>
+ * </P><P>
+ * A {@link Sample} is created for each captured tuple containing the tuple
+ * (not copied) and capture timestamps.  Samples are retrieved using {@link #getSamples()}.
+ * </P><P>
+ * Sample capture can be enabled/disabled ({@link #setEnabled(boolean)}.
+ * It is disabled by default.  Capture can also be paused and resumed via
+ * the {@code TriggerManager}.
+ * </P><P>
  * StreamScope instances are typically registered in and located via
- * a {@link StreamScopeRegistry}.
+ * a {@link StreamScopeRegistry} runtime service 
+ * and {@link StreamScopeRegistryMXBean} runtime ControlService.
  * </P>
  * @see StreamScopeRegistry
  * @see quarks.providers.development.DevelopmentProvider DevelopmentProvider

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/73c90f60/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeRegistryBean.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeRegistryBean.java b/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeRegistryBean.java
index 87c664d..04f6057 100644
--- a/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeRegistryBean.java
+++ b/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeRegistryBean.java
@@ -47,6 +47,7 @@ public class StreamScopeRegistryBean implements StreamScopeRegistryMXBean {
   
   @Override
   public StreamScopeMXBean lookup(String jobId, String opletId, int oport) {
+    // lazy-register the mbeans
     String streamId = StreamScopeRegistry.mkStreamId(jobId, opletId, oport);
     StreamScopeMXBean mbean = cs.getControl(StreamScopeMXBean.TYPE, streamId, StreamScopeMXBean.class);
     

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/73c90f60/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeSetup.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeSetup.java b/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeSetup.java
index 17e522f..a3136bf 100644
--- a/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeSetup.java
+++ b/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeSetup.java
@@ -20,7 +20,11 @@ package quarks.streamscope;
 
 import quarks.execution.services.ControlService;
 import quarks.execution.services.ServiceContainer;
+import quarks.graph.Vertex;
+import quarks.oplet.core.Peek;
 import quarks.streamscope.mbeans.StreamScopeRegistryMXBean;
+import quarks.topology.Topology;
+import quarks.topology.TopologyProvider;
 
 /**
  * Utility helpers for StreamScope setup and management.
@@ -73,6 +77,19 @@ public class StreamScopeSetup {
     // the singleton StreamScopeSetup workaround.
     private static StreamScopeSetup setup = new StreamScopeSetup(new StreamScopeRegistry());
     
+    /**
+     * Perform the registrations needed to use the streamscope package.
+     * <P>
+     * Typically called during {@link TopologyProvider} construction.
+     * </P><P>
+     * <UL>
+     * <LI>register the StreamScopeRegistry service</LI>
+     * <LI>register a cleaner to remove job oplet StreamScope registrations</LI>
+     * <LI>register a StreamScopeRegistryMXBean with the registered ControlService</LI>
+     * </UL>
+     * </P>
+     * @param services ServiceContainer to register with.
+     */
     public static void register(ServiceContainer services) {
       setup.registerPvt(services);
     }
@@ -157,4 +174,58 @@ public class StreamScopeSetup {
 //        }
 //      }
 //    }
+
+    /**
+     * Add StreamScope instances to the topology
+     * <P>
+     * Instrument the topology by adding StreamScope peekers to the
+     * topology's streams.  At topology submission time, StreamScopes
+     * register themselves with the topology's
+     * {@link StreamScopeRegistry} runtime service.
+     * </P>
+     * 
+     * @param t the Topology.  The operation is a no-op if the topology
+     *     does not have a StreamScopeRegistry service.
+     * @see #register(ServiceContainer) register
+     */
+    public static void addStreamScopes(Topology t) {
+      StreamScopeRegistry rgy = (StreamScopeRegistry) 
+          t.getRuntimeServiceSupplier().get()
+            .getService(StreamScopeRegistry.class);
+      if (rgy == null)
+        return;
+      
+      // N.B. at runtime, the Console will need to be able to lookup
+      // StreamScopeMXBean be streamId (see StreamScopeRegistryMXBean).
+      // Nominally, that streamId should be the jobId/opletId/oport of
+      // the stream that the StreamScope was created for - what I'll
+      // call the "origin stream". i.e., the Console shouldn't be 
+      // looking up a streamId for the StreamScopeOplet's opletId.
+      //
+      // Registration is left to the StreamScope oplet initialization processing
+      // since the jobId isn't known until that time.
+      // 
+      // As noted above we really want the StreamScope's streamId registration
+      // to be for the "origin stream".  Today the API (Graph,Vertex,Oplet)
+      // doesn't provide that information so we can't capture that as part
+      // of the StreamScope oplet's info.  The net is that for the time being
+      // a StreamScope will end up having to register itself with its opletId,
+      // not the origin oplet's id and that has implications for the Console. 
+      //
+      // We could create a peekAllFn that takes a BiFunction that receives
+      // the Vertex+oport the StreamScope is being created for but that
+      // Vertex/Oplet's opletId still isn't accessible today.
+      // (The Etaio provider maintains the opletId in its Instance object fwiw).
+      //
+      // TODO straighten this all out
+
+      t.graph().peekAll( 
+          () -> {
+              StreamScope<?> streamScope = new StreamScope<>();
+              Peek<?> peekOp = new quarks.streamscope.oplets.StreamScope<>(streamScope);
+              return peekOp;
+            },
+          (Vertex<?, ?, ?> v) -> !(v.getInstance() instanceof quarks.oplet.core.FanOut));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/73c90f60/utils/streamscope/src/main/java/quarks/streamscope/mbeans/StreamScopeRegistryMXBean.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/main/java/quarks/streamscope/mbeans/StreamScopeRegistryMXBean.java b/utils/streamscope/src/main/java/quarks/streamscope/mbeans/StreamScopeRegistryMXBean.java
index a1b08ab..efd5faa 100644
--- a/utils/streamscope/src/main/java/quarks/streamscope/mbeans/StreamScopeRegistryMXBean.java
+++ b/utils/streamscope/src/main/java/quarks/streamscope/mbeans/StreamScopeRegistryMXBean.java
@@ -18,12 +18,10 @@ under the License.
 */
 package quarks.streamscope.mbeans;
 
-import quarks.streamscope.StreamScope;
-
 /**
- * A registry for Stream "oscilloscope" {@link StreamScope} instances.
+ * A registry for {@link StreamScopeMXBean} instances.
  * <P>
- * The registry contains a collection of StreamScope instances
+ * The registry contains a collection of StreamScopeMXBean instances
  * that are registered by a stream identifier.
  * </P>
  * @see quarks.providers.development.DevelopmentProvider DevelopmentProvider

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/73c90f60/utils/streamscope/src/main/java/quarks/streamscope/mbeans/package-info.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/main/java/quarks/streamscope/mbeans/package-info.java b/utils/streamscope/src/main/java/quarks/streamscope/mbeans/package-info.java
new file mode 100644
index 0000000..ef0b8ff
--- /dev/null
+++ b/utils/streamscope/src/main/java/quarks/streamscope/mbeans/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Stream Oscilloscope {@link quarks.execution.services.ControlService ControlService}
+ * Management Bean interfaces.
+ */
+package quarks.streamscope.mbeans;

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/73c90f60/utils/streamscope/src/main/java/quarks/streamscope/package-info.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/main/java/quarks/streamscope/package-info.java b/utils/streamscope/src/main/java/quarks/streamscope/package-info.java
new file mode 100644
index 0000000..ac7176d
--- /dev/null
+++ b/utils/streamscope/src/main/java/quarks/streamscope/package-info.java
@@ -0,0 +1,35 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Stream Oscilloscope - package for instrumenting streams to capture tuples.
+ * <P>
+ * A {@link quarks.streamscope.StreamScope StreamScope} captures tuples.
+ * StreamScopes are registered with a {@link quarks.streamscope.StreamScopeRegistry StreamScopeRegistry}
+ * runtime service and are also controllable via
+ * {@link quarks.streamscope.mbeans.StreamScopeMXBean StreamScopeMXBean} mbeans
+ * registered with a {@link quarks.streamscope.mbeans.StreamScopeRegistryMXBean StreamScopeRegistryMXBean}
+ * runtime {@link quarks.execution.services.ControlService ControlService}.
+ * </P><P>
+ * {@link quarks.streamscope.StreamScopeSetup StreamScopeSetup} performs the necessary setup for a
+ * {@link quarks.topology.TopologyProvider TopologyProvider} to use the package,
+ * including registration of services and instrumenting a topology with
+ * StreamScope instances.
+ */
+package quarks.streamscope;


[9/9] incubator-quarks git commit: reinstate unintentionally dropped stmt

Posted by dl...@apache.org.
reinstate unintentionally dropped stmt

Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/9f97f31c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/9f97f31c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/9f97f31c

Branch: refs/heads/master
Commit: 9f97f31c76b9c55ccc4a77834a4e3b1cab20a8a9
Parents: 7513f23
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Wed May 25 16:31:33 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Wed May 25 16:31:33 2016 -0400

----------------------------------------------------------------------
 console/servlets/webapp_content/js/graph.js | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/9f97f31c/console/servlets/webapp_content/js/graph.js
----------------------------------------------------------------------
diff --git a/console/servlets/webapp_content/js/graph.js b/console/servlets/webapp_content/js/graph.js
index bbbfefc..7154221 100644
--- a/console/servlets/webapp_content/js/graph.js
+++ b/console/servlets/webapp_content/js/graph.js
@@ -58,6 +58,7 @@ addValuesToEdges = function(graph, counterMetrics) {
 	edges.forEach(function(edge){
 		if (!edge.value) {
 			edge.value = quartile1;
+			edge.derived = true;
 		} else if (edge.value === "0") {
 			edge.value = 0.45;
 			edge.realValue = 0;


[7/9] incubator-quarks git commit: Console: render StreamScope like CounterOp shape and color

Posted by dl...@apache.org.
Console: render StreamScope like CounterOp shape and color

Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/18c24805
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/18c24805
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/18c24805

Branch: refs/heads/master
Commit: 18c2480560544aa56cb904e0e5f351fbf956fcb7
Parents: 73c90f6
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Tue May 24 14:17:31 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Tue May 24 14:17:31 2016 -0400

----------------------------------------------------------------------
 .../servlets/webapp_content/js/ext/d3.legend.js | 16 ++++++-
 console/servlets/webapp_content/js/graph.js     | 49 +++++++++++++++++++-
 console/servlets/webapp_content/js/index.js     |  8 +++-
 3 files changed, 70 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/18c24805/console/servlets/webapp_content/js/ext/d3.legend.js
----------------------------------------------------------------------
diff --git a/console/servlets/webapp_content/js/ext/d3.legend.js b/console/servlets/webapp_content/js/ext/d3.legend.js
index 148eba6..e7dc26a 100755
--- a/console/servlets/webapp_content/js/ext/d3.legend.js
+++ b/console/servlets/webapp_content/js/ext/d3.legend.js
@@ -9,6 +9,11 @@ d3.legend = function(g, chartSvg, pItems, legendTitle) {
     var items = {};
     var svg = !chartSvg ? d3.select(g.property("nearestViewportElement")) : chartSvg;
     var isTupleFlowLegend = false;
+    var isRect = function(d) {
+        var k = d.key.toUpperCase();
+        return k.startsWith("COUNTEROP")
+             || k.startsWith("STREAMSCOPE");
+        };
         
     var	legendPadding = g.attr("data-style-padding") || 5,
         lTitleItems = g.selectAll(".legend-title-items").data([true]),
@@ -53,6 +58,14 @@ d3.legend = function(g, chartSvg, pItems, legendTitle) {
     else  {
 	    items = d3.entries(items).sort(
 	    		function(a,b) {
+	    		    // rect before circle - graphic positioning code below
+	    		    var ra = isRect(a);
+	    		    var rb = isRect(b);
+	    		    if (ra && !rb) {
+	    		      return -1;
+	    		    } else if (!ra && rb) {
+	    		      return 1;
+	    		    }
 	    			if (a.key < b.key) {
 	    				return -1;
 	    			} else if (a.key > b.key) {
@@ -100,13 +113,14 @@ d3.legend = function(g, chartSvg, pItems, legendTitle) {
     		})
     		.enter()
     		.append(function(d) {
-    			if (d.key.toUpperCase().startsWith("COUNTEROP")) {
+    		    if (isRect(d)) {
     	  			return document.createElementNS(d3.ns.prefix.svg, 'rect');
     	  		} else {
     	  			return document.createElementNS(d3.ns.prefix.svg, 'circle');
     	  		}
     		});
 
+        // rects before circles
     	var count = 0;
     	li.selectAll("rect")
     	.attr("x", -3)

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/18c24805/console/servlets/webapp_content/js/graph.js
----------------------------------------------------------------------
diff --git a/console/servlets/webapp_content/js/graph.js b/console/servlets/webapp_content/js/graph.js
index 1851e71..80817f9 100644
--- a/console/servlets/webapp_content/js/graph.js
+++ b/console/servlets/webapp_content/js/graph.js
@@ -16,7 +16,8 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-opletColor = {"quarks.metrics.oplets.CounterOp": "#c7c7c7", "quarks.metrics.oplets.RateMeter": "#aec7e8", "quarks.oplet.core.FanIn": "#ff7f0e",
+opletColor = {"quarks.streamscope.oplets.StreamScope": "#c7c7c7",
+        "quarks.metrics.oplets.CounterOp": "#c7c7c7", "quarks.metrics.oplets.RateMeter": "#aec7e8", "quarks.oplet.core.FanIn": "#ff7f0e",
 		"quarks.oplet.core.FanOut": "#ffbb78", "quarks.oplet.core.Peek": "#2ca02c", "quarks.oplet.core.PeriodicSource": "#98df8a", 
 		"quarks.oplet.core.Pipe": "#d62728", "quarks.oplet.core.PipeWindow": "#ff9896", "quarks.oplet.core.ProcessSource": "#9467bd", 
 		"quarks.oplet.core.Sink": "#c5b0d5", "quarks.oplet.core.Source": "#8c564b", "quarks.oplet.core.Split": "#c49c94", "quarks.oplet.core.Union" : "#1f77b4",
@@ -40,6 +41,8 @@ addValuesToEdges = function(graph, counterMetrics) {
 			if (edge.sourceId === cm.opId || edge.targetId === cm.opId) {
 				// add a value to this edge from the metric
 				edge.value = cm.value;	
+				// propagate the edge value downstream/upstream as appropriate
+				propagateEdgeValue(graph, edge, edge.sourceId === cm.opId);
 			} 
 		});
 	});
@@ -59,6 +62,48 @@ addValuesToEdges = function(graph, counterMetrics) {
 	return graph;
 };
 
+propagateEdgeValue = function(graph, edge, isDownstream) {
+    var opId = isDownstream ? edge.targetId : edge.sourceId;  
+    var vertex = findVertex(graph, opId);
+    if (isPeekish(vertex)) {
+        var edges = findEdges(graph, opId, isDownstream);
+        edges.forEach(function(e2){
+            if (!e2.value) {
+                e2.value = edge.value;
+                propagateEdgeValue(graph, e2, isDownstream);
+            }
+        });
+    }
+};
+
+isPeekish = function(vertex) {
+  var kind = vertex.invocation.kind;
+  return kind === "quarks.oplet.functional.Peek"
+      || kind === "quarks.streamscope.oplets.StreamScope";
+};
+
+findVertex = function(graph, opId) {
+  // this doesn't scale well
+  for (var i = 0; i < graph.vertices.length; i++) {
+    var vertex = graph.vertices[i];
+    if (vertex.id === opId) {
+      return vertex;
+    }
+  }
+  return null;
+};
+
+findEdges = function(graph, opId, isDownstream) {
+  var edges = [];
+  // this doesn't scale well
+  graph.edges.forEach(function(edge){
+      if (opId === (isDownstream ? edge.sourceId : edge.targetId)) {
+        edges.push(edge);
+      }
+  });
+  return edges;
+};
+
 getVertexFillColor = function(layer, data, cMetrics) {
 	if (layer === "opletColor" || layer === "static") {
 		return opletColor[data.invocation.kind];
@@ -71,6 +116,8 @@ getVertexFillColor = function(layer, data, cMetrics) {
 		var myScale = d3.scale.linear().domain([0,tupleBucketsIdx.buckets.length -1]).range(tupleColorRange);
 		if (data.invocation.kind.toUpperCase().endsWith("COUNTEROP")) {
 			return "#c7c7c7";
+        } else if (data.invocation.kind.toUpperCase().endsWith("STREAMSCOPE")) {
+            return "#c7c7c7";
 		} else {
 			return myScale(tupleBucketsIdx.bucketIdx);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/18c24805/console/servlets/webapp_content/js/index.js
----------------------------------------------------------------------
diff --git a/console/servlets/webapp_content/js/index.js b/console/servlets/webapp_content/js/index.js
index ea8eb81..38fedd7 100644
--- a/console/servlets/webapp_content/js/index.js
+++ b/console/servlets/webapp_content/js/index.js
@@ -41,6 +41,12 @@ var resetAll = function(bNew) {
 	}
 };
 
+var isRect = function(kind) {
+    var k = kind.toUpperCase();
+    return k.endsWith("COUNTEROP")
+          || k.endsWith("STREAMSCOPE");
+};
+
 d3.select("#jobs")
 .on("change", function() {
   tagsArray = [];
@@ -573,7 +579,7 @@ var renderGraph = function(jobId, counterMetrics, bIsNewJob) {
       .on("drag", dragmove));
       
       node.append(function(d) {
-    	  if (d.invocation.kind.toUpperCase().endsWith("COUNTEROP")) {
+        	if (isRect(d.invocation.kind)) {
     			return document.createElementNS(d3.ns.prefix.svg, 'rect');
     		} else {
     			return document.createElementNS(d3.ns.prefix.svg, 'circle');


[2/9] incubator-quarks git commit: test StreamScopeRegistry registration w/provider

Posted by dl...@apache.org.
test StreamScopeRegistry registration w/provider


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/8b68c6df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/8b68c6df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/8b68c6df

Branch: refs/heads/master
Commit: 8b68c6df0922d9dd1897e29d7e7fec618e7d6a19
Parents: 4b36d47
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Thu May 12 11:59:39 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Wed May 18 16:03:04 2016 -0400

----------------------------------------------------------------------
 .../streamscope/DevelopmentStreamScopeTest.java | 26 ++++++++++++++++++++
 1 file changed, 26 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/8b68c6df/providers/development/src/test/java/quarks/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/quarks/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java b/providers/development/src/test/java/quarks/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java
index fafe8eb..50c45e4 100644
--- a/providers/development/src/test/java/quarks/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java
+++ b/providers/development/src/test/java/quarks/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java
@@ -18,8 +18,34 @@ under the License.
 */
 package quarks.test.providers.dev.streamscope;
 
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+
+import org.junit.Test;
+
+import quarks.streamscope.StreamScopeRegistry;
 import quarks.test.providers.dev.DevelopmentTestSetup;
 import quarks.test.streamscope.StreamScopeTest;
+import quarks.topology.Topology;
 
 public class DevelopmentStreamScopeTest extends StreamScopeTest implements DevelopmentTestSetup {
+  
+  @Test
+  public void testServiceRegistered() throws Exception {
+    Topology t1 = newTopology();
+    StreamScopeRegistry rgy1 = t1.getRuntimeServiceSupplier().get()
+        .getService(StreamScopeRegistry.class);
+    assertNotNull(rgy1);
+    
+    Topology t2 = newTopology();
+    StreamScopeRegistry rgy2 = t2.getRuntimeServiceSupplier().get()
+        .getService(StreamScopeRegistry.class);
+    assertNotNull(rgy2);
+    
+    // Yikes... the registry / service is provider-wide, not topo/app/job-specific
+    // assertNotSame(rgy1, rgy2);
+    
+    assertSame(rgy1, rgy2);
+  }
+  
 }


[3/9] incubator-quarks git commit: relocate StreamScope to utils / quarks.streamscope

Posted by dl...@apache.org.
relocate StreamScope to utils / quarks.streamscope


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/4b36d473
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/4b36d473
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/4b36d473

Branch: refs/heads/master
Commit: 4b36d47374a1b36d53cbbd574bfceea5c1ebcaba
Parents: e10447c
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Thu May 12 09:06:51 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Wed May 18 16:03:04 2016 -0400

----------------------------------------------------------------------
 .../java/quarks/oplet/plumbing/StreamScope.java |  44 --
 .../quarks/topology/plumbing/StreamScope.java   | 415 -------------------
 .../topology/plumbing/StreamScopeRegistry.java  | 145 -------
 .../java/quarks/test/topology/PlumbingTest.java | 289 -------------
 build.xml                                       |   4 +-
 providers/development/build.xml                 |   2 +
 .../development/DevelopmentProvider.java        |   8 +-
 .../streamscope/DevelopmentStreamScopeTest.java |  25 ++
 utils/.classpath                                |   2 +
 utils/streamscope/build.xml                     |  44 ++
 .../java/quarks/streamscope/StreamScope.java    | 415 +++++++++++++++++++
 .../quarks/streamscope/StreamScopeRegistry.java | 145 +++++++
 .../quarks/streamscope/oplets/StreamScope.java  |  44 ++
 .../test/streamscope/StreamScopeTest.java       | 325 +++++++++++++++
 14 files changed, 1009 insertions(+), 898 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/4b36d473/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java b/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java
deleted file mode 100644
index af97a10..0000000
--- a/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package quarks.oplet.plumbing;
-
-import quarks.function.Consumer;
-import quarks.oplet.functional.Peek;
-
-/**
- * A Stream "oscilloscope" oplet.
- * <P>
- * TODO remove this?  Just added to help the Console specialize its presentation
- * and/or so user can differentiate a StreamScope from any other random Peek oplet use.
- * </P>
- *
- * @param <T> Type of the tuple.
- */
-public class StreamScope<T> extends Peek<T> {
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * Create a new instance.
-   * @param streamScope the consumer function
-   */
-  public StreamScope(Consumer<T> streamScope) {
-    super(streamScope);
-  }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/4b36d473/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java b/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java
deleted file mode 100644
index 2252e2a..0000000
--- a/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java
+++ /dev/null
@@ -1,415 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package quarks.topology.plumbing;
-
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-
-import quarks.function.Consumer;
-import quarks.function.Functions;
-import quarks.function.Predicate;
-import quarks.topology.TStream;
-
-/**
- * A Stream "oscilloscope" for capturing stream tuples for analysis / debug.
- * <P>
- * A {@code StreamScope} is expected to be used as parameter to
- * {@link TStream#peek(Consumer)}.
- * </P><P>
- * A {@link TriggerManager} controls which tuples are captured.
- * A {@link BufferManager} controls the retention policy for captured tuples.
- * </P>
- * <P>
- * StreamScope instances are typically registered in and located via
- * a {@link StreamScopeRegistry}.
- * </P>
- * @see StreamScopeRegistry
- * @see quarks.providers.development.DevelopmentProvider DevelopmentProvider
- */
-public class StreamScope<T> implements Consumer<T> {
-  private static final long serialVersionUID = 1L;
-  private final BufferManager<T> buffer = new BufferManager<>();
-  private final TriggerManager<T> trigger = new TriggerManager<>();
-  private boolean isEnabled;
-  
-  /**
-   * A captured tuple.
-   * <P>
-   * The Sample captures the tuple, and the system time and nanoTime
-   * that the tuple was received.
-   * </P>
-   *
-   * @param <T> Tuple type.
-   */
-  public static class Sample<T> {
-    private final long ts;
-    private final long nanoTime;
-    private final T tuple;
-    
-    Sample(T tuple) {
-      this.ts = System.currentTimeMillis();
-      this.nanoTime = System.nanoTime();
-      this.tuple = tuple;
-    }
-    
-    /**
-     * Capture time in msec since the epoch.
-     * @return the timestamp
-     * @see System#currentTimeMillis()
-     */
-    public long timestamp() {
-      return ts;
-    }
-    
-    /**
-     * Capture time in nanoTime.
-     * @return the nanoTime
-     * @see System#nanoTime()
-     */
-    public long nanoTime() {
-      return nanoTime;
-    }
-    
-    /**
-     * The captured tuple.
-     * @return the tuple
-     */
-    public T tuple() {
-      return tuple;
-    }
-    
-    @Override
-    public String toString() {
-      return "ts="+ts+" nano="+nanoTime+" tuple="+tuple;
-    }
-  }
-  
-  /**
-   * Control the retention of captured tuples.
-   * <P>
-   * Captured tuples are retained until either:
-   * <ul>
-   * <li>a maximum retention count is exceeded</li>
-   * <li>TODO a maximum retention time is exceeded</li>
-   * </ul> 
-   * </P><P>
-   * The default configuration is a maxCount of 10.
-   * </P>
-   */
-  public static class BufferManager<T> {
-    private List<Sample<T>> buffer = Collections.emptyList();
-    private int maxCount = 10;
-    private long period;
-    private TimeUnit unit;
-    
-    // TODO timer based eviction
-    // TODO look at using quarks Windows / WindowImpl for the buffer.
-    //      Can window type/"size" be changed?
-    //      Does it support simultaneous maxCount & maxAge?
-    
-    List<Sample<T>> getSamples() {
-      return Collections.unmodifiableList(buffer);
-    }
-    
-    /**
-     * Set the maximum number of tuples to retain.
-     * <P>
-     * The capture buffer is cleared.
-     * </P>
-     * @param maxCount the maximum number of tuples to retain.
-     *        Specify 0 to disable count based retention.
-     */
-    public void setMaxRetentionCount(int maxCount) {
-      this.maxCount = maxCount;
-      allocate();
-    }
-    
-    /**
-     * Set the maximum retention time of a tuple.
-     * <P>
-     * The capture buffer is cleared.
-     * </P>
-     * @param age the amount of time to retain a tuple.
-     *        Specify 0 to disable age based retention.
-     * @param unit {@link TimeUnit}
-     */
-    public void setMaxRetentionTime(long age, TimeUnit unit) {
-      if (age < 0)
-        throw new IllegalArgumentException("age");
-      Objects.requireNonNull(unit, "unit");
-      this.period = age;
-      this.unit = unit;
-      
-      throw new IllegalStateException("setMaxRetentionTime is NYI");
-    }
-    
-    /**
-     * Get the number of tuples in the capture buffer.
-     * @return the count.
-     */
-    int getCount() {
-      return buffer.size();
-    }
-    
-    void release() {
-      buffer = Collections.emptyList();
-    }
-    
-    void allocate() {
-      buffer = new LinkedList<>();
-    }
-    
-    void add(Sample<T> sample) {
-      if (maxCount > 0 && buffer.size() >= maxCount)
-        buffer.remove(0);
-      buffer.add(sample);
-    }
-    
-    @Override
-    public String toString() {
-      return "size="+getCount()+" maxCount="+maxCount+" maxAge="+period+(unit==null ? "" : unit);
-    }
-    
-  }
-  
-  /**
-   * Control what triggers capturing of tuples.
-   * <P>
-   * The following modes are supported:
-   * <ul>
-   * <li>continuous - capture every tuple</li> 
-   * <li>by-count - capture every nth tuple</li>
-   * <li>by-time - capture based on time elapsed since the last capture</li>
-   * <li>by-Predicate - capture based on evaluating a predicate</li>
-   * </ul>
-   * </P><P>
-   * Tuple capture can be temporarily paused via {@link TriggerManager#setPaused(boolean) setPaused}.
-   * Pausing capture does not clear the capture buffer.
-   * </P><P>
-   * Capture processing can be automatically paused when a particular event
-   * has occurred via {@link TriggerManager#setPauseOn(Predicate) setPauseOn}.
-   * The pause predicate is evaluated after processing each received tuple.
-   * Use {@link TriggerManager#setPaused(boolean)} setPaused} to re-enable capture
-   * following a triggered pause.
-   * </P><P>
-   * The default configuration is {@code continuous} and not paused.
-   * </P>
-   */
-  public static class TriggerManager<T> {
-    private Predicate<T> predicate = Functions.alwaysTrue();
-    private Predicate<T> pauseOnPredicate = Functions.alwaysFalse();
-    private boolean paused = false;
-    
-    /**
-     * Test if the tuple should be captured.
-     * @param tuple the tuple
-     * @return true to capture the tuple.
-     */
-    boolean test(T tuple) {
-      if (paused)
-        return false;
-      boolean b = predicate.test(tuple);
-      paused = pauseOnPredicate.test(tuple);
-      return b;
-    }
-    
-    /**
-     * Set capture paused control
-     * @param paused true to pause, false to clear pause.
-     */
-    public void setPaused(boolean paused) {
-      this.paused = paused;
-    }
-    
-    /**
-     * Is capture paused?
-     * @return true if paused
-     */
-    public boolean isPaused() {
-      return paused;
-    }
-    
-    /**
-     * Set a pause-on predicate.  Capture is paused if the predicate
-     * returns true;
-     * @param predicate the predicate
-     * @see Functions#alwaysFalse()
-     */
-    public void setPauseOn(Predicate<T> predicate) {
-      Objects.requireNonNull(predicate, "predicate");
-      pauseOnPredicate = predicate;
-    }
-    
-    /**
-     * Capture every tuple.
-     */
-    public void setContinuous() {
-      setByPredicate(Functions.alwaysTrue());
-    }
-    
-    /**
-     * Capture the first and every nth tuple
-     * @param count
-     */
-    public void setByCount(int count) {
-      setByPredicate(newByCountPredicate(count));
-    }
-    
-    /**
-     * Capture the 1st tuple and then the next tuple after {@code period}
-     * {@code unit} time has elapsed since the previous captured tuple.
-     * 
-     * @param elapsed time to delay until next capture
-     * @param unit {@link TimeUnit}
-     */
-    public void setByTime(long elapsed, TimeUnit unit) {
-      setByPredicate(newByTimePredicate(elapsed, unit));
-    }
-    
-    /**
-     * Capture a tuple if the {@code predicate} test of the tuple returns true.
-     * @param predicate
-     */
-    public void setByPredicate(Predicate<T> predicate) {
-      Objects.requireNonNull(predicate, "predicate");
-      this.predicate = predicate;
-    }
-    
-    private static <T> Predicate<T> newByCountPredicate(int count) {
-      if (count < 1)
-        throw new IllegalArgumentException("count");
-      return new Predicate<T>() {
-        private static final long serialVersionUID = 1L;
-        int byCount = count;
-        int curCount = -1;  // capture 1st and every byCount-nth
-
-        @Override
-        public boolean test(T value) {
-          return ++curCount % byCount == 0;
-        }
-      };
-    }
-    
-    private static <T> Predicate<T> newByTimePredicate(long elapsed, TimeUnit unit) {
-      if (elapsed < 1)
-        throw new IllegalArgumentException("elapsed");
-      Objects.requireNonNull(unit, "unit");
-      return new Predicate<T>() {
-        private static final long serialVersionUID = 1L;
-        private long nextTime;
-
-        @Override
-        public boolean test(T value) {
-          long now = System.currentTimeMillis();
-          if (now > nextTime) {
-            nextTime = now + unit.toMillis(elapsed);
-            return true;
-          }
-          return false;
-        }
-      };
-    }
-
-  }
-  
-  /**
-   * Create a new instance.
-   * <P>
-   * Sample capture is disabled.
-   * </P>
-   */
-  public StreamScope() {
-  }
-  
-  /**
-   * Is tuple capture enabled?
-   * @return true if enabled.
-   */
-  public boolean isEnabled() {
-    return isEnabled;
-  }
-  
-  /**
-   * Enable or disable tuple capture.
-   * <P>
-   * Disabling releases the capture buffer.
-   * </P>
-   * @param isEnabled true to enable, false to disable.
-   */
-  public synchronized void setEnabled(boolean isEnabled) {
-    if (!isEnabled)
-      buffer.release();
-    buffer.allocate();
-    this.isEnabled = isEnabled;
-  }
-  
-  /**
-   * Get the {@link BufferManager}
-   * @return the manager
-   */
-  public BufferManager<T> bufferMgr() {
-    return buffer;
-  }
-  
-  /**
-   * Get the {@link TriggerManager}
-   * @return the manager
-   */
-  public TriggerManager<T> triggerMgr() {
-    return trigger;
-  }
-  
-  /**
-   * Get all captured tuples.
-   * <P>
-   * The returned samples are removed from the capture buffer.
-   * </P>
-   * @return unmodifiable list of captured samples
-   */
-  public synchronized List<Sample<T>> getSamples() {
-    List<Sample<T>> tmp = buffer.getSamples();
-    buffer.allocate();
-    return tmp;
-  }
-
-  /**
-   * Get the number of Samples currently captured
-   * @return the count
-   */
-  public synchronized int getSampleCount() {
-    return buffer.getCount();
-  }
-
-  @Override
-  public synchronized void accept(T tuple) {
-    if (!isEnabled())
-      return;
-    if (trigger.test(tuple))
-      buffer.add(new Sample<T>(tuple));
-  }
-
-  @Override
-  public String toString() {
-    return "isEnabled="+isEnabled+" bufferMgr="+bufferMgr();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/4b36d473/api/topology/src/main/java/quarks/topology/plumbing/StreamScopeRegistry.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/StreamScopeRegistry.java b/api/topology/src/main/java/quarks/topology/plumbing/StreamScopeRegistry.java
deleted file mode 100644
index dbfdd80..0000000
--- a/api/topology/src/main/java/quarks/topology/plumbing/StreamScopeRegistry.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package quarks.topology.plumbing;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-/**
- * A registry for Stream "oscilloscope" {@link StreamScope} instances.
- * <P>
- * The registry contains a collection of StreamScope instances
- * that are registered by one or more names.
- * </P><P>
- * The names are: by a TStream {@link quarks.topology.TStream#alias(String) alias} or
- * by a stream's (output port's) unique identifier.
- * Static methods are provided for composing these names and extracting
- * the alias/identifier from generated names.
- * </P>
- * @see quarks.providers.development.DevelopmentProvider DevelopmentProvider
- */
-public class StreamScopeRegistry {
-  private final Map<String, StreamScope<?>> byNameMap = new HashMap<>();
-  private final Map<StreamScope<?>, List<String>> byStreamScopeMap = new HashMap<>();
-
-  public StreamScopeRegistry() {
-    
-  }
-  
-  /** create a registration name for a stream alias */
-  public static String nameByStreamAlias(String alias) {
-    Objects.requireNonNull(alias, "alias");
-    return "alias."+alias;
-  }
-  
-  /** create a registration name for a stream id */
-  public static String nameByStreamId(String id) {
-    Objects.requireNonNull(id, "id");
-    return "id."+id;
-  }
-  
-  /** returns null if {@code name} is not a from nameByStreamAlias() */
-  public static String streamAliasFromName(String name) {
-    Objects.requireNonNull(name, "name");
-    if (!name.startsWith("alias."))
-      return null;
-    return name.substring("alias.".length());
-  }
-  
-  /** returns null if {@code name} is not a from nameByStreamId() */
-  public static String streamIdFromName(String name) {
-    Objects.requireNonNull(name, "name");
-    if (!name.startsWith("id."))
-      return null;
-    return name.substring("id.".length());
-  }
-  
-  /** A single StreamScope can be registered with multiple names.
-   * @throws IllegalStateException if a registration already exists for {@code name}
-   */
-  public synchronized void register(String name, StreamScope<?> streamScope) {
-    if (byNameMap.containsKey(name))
-      throw new IllegalStateException("StreamScope already registered by name "+name);
-    byNameMap.put(name, streamScope);
-    List<String> names = byStreamScopeMap.get(streamScope); 
-    if (names == null) {
-      names = new ArrayList<>(2);
-      byStreamScopeMap.put(streamScope, names);
-    }
-    names.add(name);
-  }
-  
-  /**
-   * @param name
-   * @return the StreamScope. null if name is not registered.
-   */
-  public synchronized StreamScope<?> lookup(String name) {
-    return byNameMap.get(name);
-  }
-  
-  /**
-   * Get the registered names.
-   * @return unmodifiable collection of the name.
-   * The set is backed by the registry so the set may change.
-   */
-  public synchronized Set<String> getNames() {
-    return Collections.unmodifiableSet(byNameMap.keySet());
-  }
-  
-  /** Get registered StreamScopes and the name(s) each is registered with.
-   * The map is backed by the registry so its contents may change.
-   */
-  public synchronized Map<StreamScope<?>, List<String>> getStreamScopes() {
-    return Collections.unmodifiableMap(byStreamScopeMap);
-  }
-  
-  /** remove the specific name registration.  Other registration of the same StreamScope may still exist.
-   * no-op if name is not registered.
-   * @see #unregister(StreamScope)
-   */
-  public synchronized void unregister(String name) {
-    StreamScope<?> streamScope = byNameMap.get(name);
-    if (streamScope == null)
-      return;
-    byNameMap.remove(name);
-    List<String> names = byStreamScopeMap.get(streamScope);
-    names.remove(name);
-    if (names.isEmpty())
-      byStreamScopeMap.remove(streamScope);
-  }
-  
-  /** remove all name registrations of the StreamScope.
-   * no-op if no registrations for the StreamScope
-   */
-  public synchronized void unregister(StreamScope<?> streamScope) {
-    List<String> names = byStreamScopeMap.get(streamScope);
-    if (names == null)
-      return;
-    names = new ArrayList<>(names);
-    for (String name : names)
-      unregister(name);
-  }
-  
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/4b36d473/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index 0dfaa6c..685bf32 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -20,9 +20,6 @@ package quarks.test.topology;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 
@@ -47,9 +44,6 @@ import quarks.function.ToIntFunction;
 import quarks.topology.TStream;
 import quarks.topology.Topology;
 import quarks.topology.plumbing.PlumbingStreams;
-import quarks.topology.plumbing.StreamScope;
-import quarks.topology.plumbing.StreamScope.Sample;
-import quarks.topology.plumbing.StreamScopeRegistry;
 import quarks.topology.plumbing.Valve;
 import quarks.topology.tester.Condition;
 
@@ -749,288 +743,5 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
         assertTrue("valid:" + contents.getResult(), contents.valid());
         assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
     }
-    
-    @Test
-    public void testStreamScopeFn() throws Exception {
-
-        StreamScope<Integer> ss = new StreamScope<>();
-
-        List<Sample<Integer>> samples; 
-        Sample<Integer> sample;
-
-        assertFalse(ss.isEnabled());
-        assertNotNull(ss.bufferMgr());
-        assertNotNull(ss.triggerMgr());
-        assertEquals(0, ss.getSampleCount());
-        samples = ss.getSamples();
-        assertNotNull(samples);
-        assertEquals(0, samples.size());
-        
-        // ---------------- no capture when not enabled
-        ss.accept(1);
-        assertEquals(0, ss.getSampleCount());
-        assertEquals(0, ss.getSamples().size());
-        
-        ss.setEnabled(true);
-        
-        // ---------------- single capture
-        // note: getSamples() removes captured tuples
-        ss.accept(100);
-        assertEquals(1, ss.getSampleCount());
-        samples = ss.getSamples();
-        assertEquals(1, samples.size());
-        sample = samples.get(0);
-        assertEquals(100, sample.tuple().intValue());
-        assertTrue(sample.timestamp() != 0);
-        assertTrue(sample.nanoTime() != 0);
-        assertEquals(0, ss.getSampleCount());
-        assertEquals(0, ss.getSamples().size());
-        
-        // ---------------- next capture/get; different lists
-        List<Sample<Integer>> savedSamples = samples;
-        ss.accept(101);
-        assertEquals(1, ss.getSampleCount());
-        samples = ss.getSamples();
-        assertEquals(101, samples.get(0).tuple().intValue());
-        assertTrue(samples != savedSamples);
-        assertEquals(0, ss.getSampleCount());
-        assertEquals(0, ss.getSamples().size());
-        
-        // ---------------- multi capture
-        ss.accept(100);
-        ss.accept(101);
-        ss.accept(102);
-        assertEquals(3, ss.getSampleCount());
-        samples = ss.getSamples();
-        assertEquals(100, samples.get(0).tuple().intValue());
-        assertEquals(101, samples.get(1).tuple().intValue());
-        assertEquals(102, samples.get(2).tuple().intValue());
-        assertEquals(0, ss.getSampleCount());
-        assertEquals(0, ss.getSamples().size());
-        
-        // ----------------  disable => clears capture buffer
-        ss.accept(100);
-        ss.accept(101);
-        ss.accept(102);
-        assertEquals(3, ss.getSampleCount());
-        ss.setEnabled(false);
-        assertEquals(0, ss.getSampleCount());
-        assertEquals(0, ss.getSamples().size());
-
-        ss.setEnabled(true);
-
-        // ---------------- buffer control at the limit (no eviction)
-        ss.bufferMgr().setMaxRetentionCount(3);
-        ss.accept(100);
-        ss.accept(101);
-        ss.accept(102);
-        assertEquals(3, ss.getSampleCount());
-        samples = ss.getSamples();
-        assertEquals(100, samples.get(0).tuple().intValue());
-        assertEquals(101, samples.get(1).tuple().intValue());
-        assertEquals(102, samples.get(2).tuple().intValue());
-        assertEquals(0, ss.getSampleCount());
-        assertEquals(0, ss.getSamples().size());
-        
-        // ---------------- buffer control with eviction
-        ss.bufferMgr().setMaxRetentionCount(2);
-        ss.accept(100);
-        ss.accept(101);
-        ss.accept(102);
-        assertEquals(2, ss.getSampleCount());
-        samples = ss.getSamples();
-        assertEquals(101, samples.get(0).tuple().intValue());
-        assertEquals(102, samples.get(1).tuple().intValue());
-        assertEquals(0, ss.getSampleCount());
-        assertEquals(0, ss.getSamples().size());
-        ss.bufferMgr().setMaxRetentionCount(10);
-        
-        // ---------------- trigger byCount
-        ss.triggerMgr().setByCount(3);
-        ss.accept(100);
-        ss.accept(101);
-        ss.accept(102);
-        ss.accept(103);
-        ss.accept(104);
-        ss.accept(105);
-        ss.accept(106);
-        assertEquals(3, ss.getSampleCount());
-        samples = ss.getSamples();
-        assertEquals(100, samples.get(0).tuple().intValue());
-        assertEquals(103, samples.get(1).tuple().intValue());
-        assertEquals(106, samples.get(2).tuple().intValue());
-        
-        // ---------------- trigger byPredicate
-        ss.triggerMgr().setByPredicate(t -> t % 2 == 0);
-        ss.accept(100);
-        ss.accept(101);
-        ss.accept(102);
-        ss.accept(103);
-        ss.accept(104);
-        assertEquals(3, ss.getSampleCount());
-        samples = ss.getSamples();
-        assertEquals(100, samples.get(0).tuple().intValue());
-        assertEquals(102, samples.get(1).tuple().intValue());
-        assertEquals(104, samples.get(2).tuple().intValue());
-        
-        // ---------------- trigger byTime
-        ss.triggerMgr().setByTime(100, TimeUnit.MILLISECONDS);
-        ss.accept(100);
-        ss.accept(101);
-        ss.accept(102);
-        Thread.sleep(110);
-        ss.accept(103);
-        ss.accept(104);
-        ss.accept(105);
-        Thread.sleep(110);
-        ss.accept(106);
-        assertEquals(3, ss.getSampleCount());
-        samples = ss.getSamples();
-        assertEquals(100, samples.get(0).tuple().intValue());
-        assertEquals(103, samples.get(1).tuple().intValue());
-        assertEquals(106, samples.get(2).tuple().intValue());
-        
-        // ---------------- trigger continuous
-        ss.triggerMgr().setContinuous();
-        ss.accept(100);
-        ss.accept(101);
-        ss.accept(102);
-        assertEquals(3, ss.getSampleCount());
-        samples = ss.getSamples();
-        assertEquals(100, samples.get(0).tuple().intValue());
-        assertEquals(101, samples.get(1).tuple().intValue());
-        assertEquals(102, samples.get(2).tuple().intValue());
-        
-        // ---------------- trigger pause
-        ss.accept(100);
-        ss.accept(101);
-        ss.triggerMgr().setPaused(true);
-        assertTrue(ss.triggerMgr().isPaused());
-        ss.accept(102);
-        ss.accept(103);
-        ss.triggerMgr().setPaused(false);
-        assertFalse(ss.triggerMgr().isPaused());
-        ss.accept(104);
-        assertEquals(3, ss.getSampleCount());
-        samples = ss.getSamples();
-        assertEquals(100, samples.get(0).tuple().intValue());
-        assertEquals(101, samples.get(1).tuple().intValue());
-        assertEquals(104, samples.get(2).tuple().intValue());
-        
-        // ---------------- trigger pauseOn
-        
-        ss.triggerMgr().setPauseOn(t -> t == 102);
-        ss.accept(100);
-        ss.accept(101);
-        ss.accept(102);
-        ss.accept(103);
-        ss.accept(104);
-        ss.accept(105);
-        assertTrue(ss.triggerMgr().isPaused());
-        assertEquals(3, ss.getSampleCount());
-        samples = ss.getSamples();
-        assertEquals(100, samples.get(0).tuple().intValue());
-        assertEquals(101, samples.get(1).tuple().intValue());
-        assertEquals(102, samples.get(2).tuple().intValue());
-        ss.triggerMgr().setPaused(false);
-        ss.accept(1000);
-        ss.accept(1010);
-        ss.accept(102);
-        ss.accept(1030);
-        assertEquals(3, ss.getSampleCount());
-        samples = ss.getSamples();
-        assertEquals(1000, samples.get(0).tuple().intValue());
-        assertEquals(1010, samples.get(1).tuple().intValue());
-        assertEquals(102, samples.get(2).tuple().intValue());
-        
-    }
-
-    @Test
-    public void testStreamScopeRegistry() throws Exception {
-
-      StreamScope<Integer> ss1 = new StreamScope<>();
-      StreamScope<Integer> ss2 = new StreamScope<>();
-      
-      StreamScopeRegistry rgy = new StreamScopeRegistry();
-      
-      assertNotNull(rgy.getNames());
-      assertEquals(0, rgy.getNames().size());
-      assertNotNull(rgy.getStreamScopes());
-      assertEquals(0, rgy.getStreamScopes().size());
-      assertNull(rgy.lookup("xyzzy"));
-      rgy.unregister("xyzzy");
-      rgy.unregister(ss1);
-      
-      // ---------- name generation / parse functions
-      String alias1Name = StreamScopeRegistry.nameByStreamAlias("alias1");
-      assertNotNull(alias1Name);
-      String alias2Name = StreamScopeRegistry.nameByStreamAlias("alias2");
-      assertNotNull(alias2Name);
-      assertFalse(alias1Name.equals(alias2Name));
-      String alias1 = StreamScopeRegistry.streamAliasFromName(alias1Name);
-      assertEquals("alias1", alias1);
-      
-      String id1Name = StreamScopeRegistry.nameByStreamId("id1");
-      assertNotNull(id1Name);
-      String id2Name = StreamScopeRegistry.nameByStreamId("id2");
-      assertNotNull(id2Name);
-      assertFalse(id1Name.equals(id2Name));
-      String id1 = StreamScopeRegistry.streamIdFromName(id1Name);
-      assertEquals("id1", id1);
-
-      assertFalse(StreamScopeRegistry.nameByStreamAlias("1")
-          .equals(StreamScopeRegistry.nameByStreamId("1")));
-      
-      // ---------- register
-      rgy.register(alias1Name, ss1);
-      rgy.register(alias2Name, ss2);
-      rgy.register(id2Name, ss2);
-
-      // ---------- lookup
-      assertSame(ss1, rgy.lookup(alias1Name));
-      assertSame(ss2, rgy.lookup(alias2Name));
-      assertSame(ss2, rgy.lookup(id2Name));
-     
-      // ----------- getNames
-      assertEquals(3, rgy.getNames().size());
-      assertTrue(rgy.getNames().contains(alias1Name));
-      assertFalse(rgy.getNames().contains(id1Name));
-      assertTrue(rgy.getNames().contains(alias2Name));
-      assertTrue(rgy.getNames().contains(id2Name));
-      
-      // ----------- getStreamScopes
-      assertEquals(2, rgy.getStreamScopes().keySet().size());
-      assertTrue(rgy.getStreamScopes().keySet().contains(ss1));
-      assertTrue(rgy.getStreamScopes().keySet().contains(ss2));
-      assertEquals(1, rgy.getStreamScopes().get(ss1).size());
-      assertTrue(rgy.getStreamScopes().get(ss1).contains(alias1Name));
-      assertEquals(2, rgy.getStreamScopes().get(ss2).size());
-      assertTrue(rgy.getStreamScopes().get(ss2).contains(alias2Name));
-      assertTrue(rgy.getStreamScopes().get(ss2).contains(id2Name));
-      
-      // ---------- unregister
-      rgy.unregister(alias1Name);
-      assertNull(rgy.lookup(alias1Name));
-      assertEquals(2, rgy.getNames().size());
-      assertFalse(rgy.getNames().contains(alias1Name));
-      assertFalse(rgy.getStreamScopes().keySet().contains(ss1));
-      assertTrue(rgy.getStreamScopes().keySet().contains(ss2));
-      
-      rgy.unregister(alias2Name);
-      assertNull(rgy.lookup(alias2Name));
-      assertEquals(1, rgy.getNames().size());
-      assertFalse(rgy.getNames().contains(alias2Name));
-      assertTrue(rgy.getStreamScopes().keySet().contains(ss2));
-      assertSame(ss2, rgy.lookup(id2Name));
-      rgy.unregister(id2Name);
-      assertEquals(0, rgy.getNames().size());
-      assertEquals(0, rgy.getStreamScopes().keySet().size());
-      
-      rgy.register(alias2Name, ss2);
-      rgy.register(id2Name, ss2);
-      rgy.unregister(ss2);
-      assertEquals(0, rgy.getNames().size());
-      assertEquals(0, rgy.getStreamScopes().keySet().size());
-    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/4b36d473/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 93f3fdc..43a72f3 100644
--- a/build.xml
+++ b/build.xml
@@ -112,6 +112,7 @@
         <ant dir="runtime/jsoncontrol" target="@{target}" useNativeBasedir="true"/>
         <ant dir="runtime/jobregistry" target="@{target}" useNativeBasedir="true"/>
         <ant dir="utils/metrics" target="@{target}" useNativeBasedir="true"/>
+        <ant dir="utils/streamscope" target="@{target}" useNativeBasedir="true"/>
 
         <ant dir="providers/direct" target="@{target}" useNativeBasedir="true"/>
         <ant dir="connectors/common" target="@{target}" useNativeBasedir="true"/>
@@ -215,6 +216,7 @@
       </classfiles>
     <sourcefiles encoding="UTF-8">
         <fileset dir="${basedir}/utils/metrics/src/main/java" includes="**/*.java"/>
+        <fileset dir="${basedir}/utils/streamscope/src/main/java" includes="**/*.java"/>
     </sourcefiles>
     </group>
     <group name="Quarks Connectors">
@@ -321,7 +323,7 @@
        <group title="Quarks Connectors" packages="quarks.connectors.*"/>
        <group title="Quarks Samples" packages="quarks.samples.*"/>
        <group title="Quarks Analytics" packages="quarks.analytics.*"/>
-       <group title="Quarks Utilities" packages="quarks.metrics,quarks.metrics.*"/>
+       <group title="Quarks Utilities" packages="quarks.metrics,quarks.metrics.*,quarks.streamscope,quarks.streamscope.*"/>
        <group title="Quarks Low-Level API" packages="quarks.graph,quarks.graph.*,quarks.oplet,quarks.oplet.*,quarks.window"/>
        <group title="Quarks SPI" packages="quarks.*.spi.*"/>
      </javadoc>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/4b36d473/providers/development/build.xml
----------------------------------------------------------------------
diff --git a/providers/development/build.xml b/providers/development/build.xml
index df53ecd..a8ea24b 100644
--- a/providers/development/build.xml
+++ b/providers/development/build.xml
@@ -27,12 +27,14 @@
     <pathelement location="${quarks.lib}/quarks.providers.direct.jar" />
     <pathelement location="${quarks.console}/server/lib/quarks.console.server.jar" />
     <pathelement location="${quarks.utils}/metrics/lib/quarks.utils.metrics.jar" />
+    <pathelement location="${quarks.utils}/streamscope/lib/quarks.utils.streamscope.jar" />
     <pathelement location="${lib}/quarks.runtime.jmxcontrol.jar" />
   </path>
 
   <path id="test.compile.classpath">
     <pathelement location="${jar}" />
     <pathelement location="../../api/topology/test.classes" />
+    <pathelement location="../../utils/streamscope/test.classes" />
     <path refid="compile.classpath"/>
   </path>
 

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/4b36d473/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
----------------------------------------------------------------------
diff --git a/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java b/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
index cd2ca9f..a6a4ae7 100644
--- a/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
+++ b/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
@@ -38,9 +38,9 @@ import quarks.oplet.core.FanOut;
 import quarks.oplet.core.Peek;
 import quarks.providers.direct.DirectProvider;
 import quarks.runtime.jmxcontrol.JMXControlService;
+import quarks.streamscope.StreamScope;
+import quarks.streamscope.StreamScopeRegistry;
 import quarks.topology.Topology;
-import quarks.topology.plumbing.StreamScope;
-import quarks.topology.plumbing.StreamScopeRegistry;
 
 /**
  * Provider intended for development.
@@ -92,8 +92,8 @@ public class DevelopmentProvider extends DirectProvider {
 
     @Override
     public Future<Job> submit(Topology topology, JsonObject config) {
-        Metrics.counter(topology);
         addStreamScopes(topology);
+        Metrics.counter(topology);
         duplicateTags(topology);
         return super.submit(topology, config);
     }
@@ -154,7 +154,7 @@ public class DevelopmentProvider extends DirectProvider {
       t.graph().peekAll( 
           () -> {
               StreamScope<?> streamScope = new StreamScope<>();
-              Peek<?> peekOp = new quarks.oplet.plumbing.StreamScope<>(streamScope);
+              Peek<?> peekOp = new quarks.streamscope.oplets.StreamScope<>(streamScope);
               registerStreamScope(rgy, peekOp, streamScope);
               return peekOp;
             },

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/4b36d473/providers/development/src/test/java/quarks/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/quarks/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java b/providers/development/src/test/java/quarks/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java
new file mode 100644
index 0000000..fafe8eb
--- /dev/null
+++ b/providers/development/src/test/java/quarks/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.test.providers.dev.streamscope;
+
+import quarks.test.providers.dev.DevelopmentTestSetup;
+import quarks.test.streamscope.StreamScopeTest;
+
+public class DevelopmentStreamScopeTest extends StreamScopeTest implements DevelopmentTestSetup {
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/4b36d473/utils/.classpath
----------------------------------------------------------------------
diff --git a/utils/.classpath b/utils/.classpath
index 0eeaf77..1539f57 100644
--- a/utils/.classpath
+++ b/utils/.classpath
@@ -2,6 +2,8 @@
 <classpath>
 	<classpathentry kind="src" path="metrics/src/main/java"/>
 	<classpathentry kind="src" path="metrics/src/test/java"/>
+	<classpathentry kind="src" path="streamscope/src/main/java"/>
+	<classpathentry kind="src" path="streamscope/src/test/java"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
 	<classpathentry combineaccessrules="false" kind="src" path="/api"/>
 	<classpathentry combineaccessrules="false" kind="src" path="/ext"/>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/4b36d473/utils/streamscope/build.xml
----------------------------------------------------------------------
diff --git a/utils/streamscope/build.xml b/utils/streamscope/build.xml
new file mode 100644
index 0000000..a27c6d4
--- /dev/null
+++ b/utils/streamscope/build.xml
@@ -0,0 +1,44 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project name="quarks.utils.streamscope" default="all" xmlns:jacoco="antlib:org.jacoco.ant">
+    <description>
+        Build the StreamScope add-on.
+    </description>
+
+  <property name="component.path" value="utils/streamscope"/>
+  <import file="../../common-build.xml"/>
+
+  <path id="compile.classpath">
+    <pathelement location="${quarks.lib}/quarks.api.topology.jar"/>
+    <path refid="quarks.ext.classpath"/>
+  </path>
+
+  <path id="test.compile.classpath">
+    <pathelement location="${jar}" />
+    <pathelement location="${quarks.lib}/quarks.providers.direct.jar"/>
+    <pathelement location="../../api/topology/test.classes"/>
+    <pathelement location="../../providers/direct/test.classes"/>
+    <path refid="compile.classpath"/>
+  </path>
+
+  <path id="test.classpath">
+    <pathelement location="${test.classes}" />
+    <path refid="test.compile.classpath"/>
+    <path refid="test.common.classpath" />
+  </path>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/4b36d473/utils/streamscope/src/main/java/quarks/streamscope/StreamScope.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/main/java/quarks/streamscope/StreamScope.java b/utils/streamscope/src/main/java/quarks/streamscope/StreamScope.java
new file mode 100644
index 0000000..17a26ce
--- /dev/null
+++ b/utils/streamscope/src/main/java/quarks/streamscope/StreamScope.java
@@ -0,0 +1,415 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.streamscope;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import quarks.function.Consumer;
+import quarks.function.Functions;
+import quarks.function.Predicate;
+import quarks.topology.TStream;
+
+/**
+ * A Stream "oscilloscope" for capturing stream tuples for analysis / debug.
+ * <P>
+ * A {@code StreamScope} is expected to be used as parameter to
+ * {@link TStream#peek(Consumer)}.
+ * </P><P>
+ * A {@link TriggerManager} controls which tuples are captured.
+ * A {@link BufferManager} controls the retention policy for captured tuples.
+ * </P>
+ * <P>
+ * StreamScope instances are typically registered in and located via
+ * a {@link StreamScopeRegistry}.
+ * </P>
+ * @see StreamScopeRegistry
+ * @see quarks.providers.development.DevelopmentProvider DevelopmentProvider
+ */
+public class StreamScope<T> implements Consumer<T> {
+  private static final long serialVersionUID = 1L;
+  private final BufferManager<T> buffer = new BufferManager<>();
+  private final TriggerManager<T> trigger = new TriggerManager<>();
+  private boolean isEnabled;
+  
+  /**
+   * A captured tuple.
+   * <P>
+   * The Sample captures the tuple, and the system time and nanoTime
+   * that the tuple was received.
+   * </P>
+   *
+   * @param <T> Tuple type.
+   */
+  public static class Sample<T> {
+    private final long ts;
+    private final long nanoTime;
+    private final T tuple;
+    
+    Sample(T tuple) {
+      this.ts = System.currentTimeMillis();
+      this.nanoTime = System.nanoTime();
+      this.tuple = tuple;
+    }
+    
+    /**
+     * Capture time in msec since the epoch.
+     * @return the timestamp
+     * @see System#currentTimeMillis()
+     */
+    public long timestamp() {
+      return ts;
+    }
+    
+    /**
+     * Capture time in nanoTime.
+     * @return the nanoTime
+     * @see System#nanoTime()
+     */
+    public long nanoTime() {
+      return nanoTime;
+    }
+    
+    /**
+     * The captured tuple.
+     * @return the tuple
+     */
+    public T tuple() {
+      return tuple;
+    }
+    
+    @Override
+    public String toString() {
+      return "ts="+ts+" nano="+nanoTime+" tuple="+tuple;
+    }
+  }
+  
+  /**
+   * Control the retention of captured tuples.
+   * <P>
+   * Captured tuples are retained until either:
+   * <ul>
+   * <li>a maximum retention count is exceeded</li>
+   * <li>TODO a maximum retention time is exceeded</li>
+   * </ul> 
+   * </P><P>
+   * The default configuration is a maxCount of 10.
+   * </P>
+   */
+  public static class BufferManager<T> {
+    private List<Sample<T>> buffer = Collections.emptyList();
+    private int maxCount = 10;
+    private long period;
+    private TimeUnit unit;
+    
+    // TODO timer based eviction
+    // TODO look at using quarks Windows / WindowImpl for the buffer.
+    //      Can window type/"size" be changed?
+    //      Does it support simultaneous maxCount & maxAge?
+    
+    List<Sample<T>> getSamples() {
+      return Collections.unmodifiableList(buffer);
+    }
+    
+    /**
+     * Set the maximum number of tuples to retain.
+     * <P>
+     * The capture buffer is cleared.
+     * </P>
+     * @param maxCount the maximum number of tuples to retain.
+     *        Specify 0 to disable count based retention.
+     */
+    public void setMaxRetentionCount(int maxCount) {
+      this.maxCount = maxCount;
+      allocate();
+    }
+    
+    /**
+     * Set the maximum retention time of a tuple.
+     * <P>
+     * The capture buffer is cleared.
+     * </P>
+     * @param age the amount of time to retain a tuple.
+     *        Specify 0 to disable age based retention.
+     * @param unit {@link TimeUnit}
+     */
+    public void setMaxRetentionTime(long age, TimeUnit unit) {
+      if (age < 0)
+        throw new IllegalArgumentException("age");
+      Objects.requireNonNull(unit, "unit");
+      this.period = age;
+      this.unit = unit;
+      
+      throw new IllegalStateException("setMaxRetentionTime is NYI");
+    }
+    
+    /**
+     * Get the number of tuples in the capture buffer.
+     * @return the count.
+     */
+    int getCount() {
+      return buffer.size();
+    }
+    
+    void release() {
+      buffer = Collections.emptyList();
+    }
+    
+    void allocate() {
+      buffer = new LinkedList<>();
+    }
+    
+    void add(Sample<T> sample) {
+      if (maxCount > 0 && buffer.size() >= maxCount)
+        buffer.remove(0);
+      buffer.add(sample);
+    }
+    
+    @Override
+    public String toString() {
+      return "size="+getCount()+" maxCount="+maxCount+" maxAge="+period+(unit==null ? "" : unit);
+    }
+    
+  }
+  
+  /**
+   * Control what triggers capturing of tuples.
+   * <P>
+   * The following modes are supported:
+   * <ul>
+   * <li>continuous - capture every tuple</li> 
+   * <li>by-count - capture every nth tuple</li>
+   * <li>by-time - capture based on time elapsed since the last capture</li>
+   * <li>by-Predicate - capture based on evaluating a predicate</li>
+   * </ul>
+   * </P><P>
+   * Tuple capture can be temporarily paused via {@link TriggerManager#setPaused(boolean) setPaused}.
+   * Pausing capture does not clear the capture buffer.
+   * </P><P>
+   * Capture processing can be automatically paused when a particular event
+   * has occurred via {@link TriggerManager#setPauseOn(Predicate) setPauseOn}.
+   * The pause predicate is evaluated after processing each received tuple.
+   * Use {@link TriggerManager#setPaused(boolean)} setPaused} to re-enable capture
+   * following a triggered pause.
+   * </P><P>
+   * The default configuration is {@code continuous} and not paused.
+   * </P>
+   */
+  public static class TriggerManager<T> {
+    private Predicate<T> predicate = Functions.alwaysTrue();
+    private Predicate<T> pauseOnPredicate = Functions.alwaysFalse();
+    private boolean paused = false;
+    
+    /**
+     * Test if the tuple should be captured.
+     * @param tuple the tuple
+     * @return true to capture the tuple.
+     */
+    boolean test(T tuple) {
+      if (paused)
+        return false;
+      boolean b = predicate.test(tuple);
+      paused = pauseOnPredicate.test(tuple);
+      return b;
+    }
+    
+    /**
+     * Set capture paused control
+     * @param paused true to pause, false to clear pause.
+     */
+    public void setPaused(boolean paused) {
+      this.paused = paused;
+    }
+    
+    /**
+     * Is capture paused?
+     * @return true if paused
+     */
+    public boolean isPaused() {
+      return paused;
+    }
+    
+    /**
+     * Set a pause-on predicate.  Capture is paused if the predicate
+     * returns true;
+     * @param predicate the predicate
+     * @see Functions#alwaysFalse()
+     */
+    public void setPauseOn(Predicate<T> predicate) {
+      Objects.requireNonNull(predicate, "predicate");
+      pauseOnPredicate = predicate;
+    }
+    
+    /**
+     * Capture every tuple.
+     */
+    public void setContinuous() {
+      setByPredicate(Functions.alwaysTrue());
+    }
+    
+    /**
+     * Capture the first and every nth tuple
+     * @param count
+     */
+    public void setByCount(int count) {
+      setByPredicate(newByCountPredicate(count));
+    }
+    
+    /**
+     * Capture the 1st tuple and then the next tuple after {@code period}
+     * {@code unit} time has elapsed since the previous captured tuple.
+     * 
+     * @param elapsed time to delay until next capture
+     * @param unit {@link TimeUnit}
+     */
+    public void setByTime(long elapsed, TimeUnit unit) {
+      setByPredicate(newByTimePredicate(elapsed, unit));
+    }
+    
+    /**
+     * Capture a tuple if the {@code predicate} test of the tuple returns true.
+     * @param predicate
+     */
+    public void setByPredicate(Predicate<T> predicate) {
+      Objects.requireNonNull(predicate, "predicate");
+      this.predicate = predicate;
+    }
+    
+    private static <T> Predicate<T> newByCountPredicate(int count) {
+      if (count < 1)
+        throw new IllegalArgumentException("count");
+      return new Predicate<T>() {
+        private static final long serialVersionUID = 1L;
+        int byCount = count;
+        int curCount = -1;  // capture 1st and every byCount-nth
+
+        @Override
+        public boolean test(T value) {
+          return ++curCount % byCount == 0;
+        }
+      };
+    }
+    
+    private static <T> Predicate<T> newByTimePredicate(long elapsed, TimeUnit unit) {
+      if (elapsed < 1)
+        throw new IllegalArgumentException("elapsed");
+      Objects.requireNonNull(unit, "unit");
+      return new Predicate<T>() {
+        private static final long serialVersionUID = 1L;
+        private long nextTime;
+
+        @Override
+        public boolean test(T value) {
+          long now = System.currentTimeMillis();
+          if (now > nextTime) {
+            nextTime = now + unit.toMillis(elapsed);
+            return true;
+          }
+          return false;
+        }
+      };
+    }
+
+  }
+  
+  /**
+   * Create a new instance.
+   * <P>
+   * Sample capture is disabled.
+   * </P>
+   */
+  public StreamScope() {
+  }
+  
+  /**
+   * Is tuple capture enabled?
+   * @return true if enabled.
+   */
+  public boolean isEnabled() {
+    return isEnabled;
+  }
+  
+  /**
+   * Enable or disable tuple capture.
+   * <P>
+   * Disabling releases the capture buffer.
+   * </P>
+   * @param isEnabled true to enable, false to disable.
+   */
+  public synchronized void setEnabled(boolean isEnabled) {
+    if (!isEnabled)
+      buffer.release();
+    buffer.allocate();
+    this.isEnabled = isEnabled;
+  }
+  
+  /**
+   * Get the {@link BufferManager}
+   * @return the manager
+   */
+  public BufferManager<T> bufferMgr() {
+    return buffer;
+  }
+  
+  /**
+   * Get the {@link TriggerManager}
+   * @return the manager
+   */
+  public TriggerManager<T> triggerMgr() {
+    return trigger;
+  }
+  
+  /**
+   * Get all captured tuples.
+   * <P>
+   * The returned samples are removed from the capture buffer.
+   * </P>
+   * @return unmodifiable list of captured samples
+   */
+  public synchronized List<Sample<T>> getSamples() {
+    List<Sample<T>> tmp = buffer.getSamples();
+    buffer.allocate();
+    return tmp;
+  }
+
+  /**
+   * Get the number of Samples currently captured
+   * @return the count
+   */
+  public synchronized int getSampleCount() {
+    return buffer.getCount();
+  }
+
+  @Override
+  public synchronized void accept(T tuple) {
+    if (!isEnabled())
+      return;
+    if (trigger.test(tuple))
+      buffer.add(new Sample<T>(tuple));
+  }
+
+  @Override
+  public String toString() {
+    return "isEnabled="+isEnabled+" bufferMgr="+bufferMgr();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/4b36d473/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeRegistry.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeRegistry.java b/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeRegistry.java
new file mode 100644
index 0000000..836428b
--- /dev/null
+++ b/utils/streamscope/src/main/java/quarks/streamscope/StreamScopeRegistry.java
@@ -0,0 +1,145 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.streamscope;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A registry for Stream "oscilloscope" {@link StreamScope} instances.
+ * <P>
+ * The registry contains a collection of StreamScope instances
+ * that are registered by one or more names.
+ * </P><P>
+ * The names are: by a TStream {@link quarks.topology.TStream#alias(String) alias} or
+ * by a stream's (output port's) unique identifier.
+ * Static methods are provided for composing these names and extracting
+ * the alias/identifier from generated names.
+ * </P>
+ * @see quarks.providers.development.DevelopmentProvider DevelopmentProvider
+ */
+public class StreamScopeRegistry {
+  private final Map<String, StreamScope<?>> byNameMap = new HashMap<>();
+  private final Map<StreamScope<?>, List<String>> byStreamScopeMap = new HashMap<>();
+
+  public StreamScopeRegistry() {
+    
+  }
+  
+  /** create a registration name for a stream alias */
+  public static String nameByStreamAlias(String alias) {
+    Objects.requireNonNull(alias, "alias");
+    return "alias."+alias;
+  }
+  
+  /** create a registration name for a stream id */
+  public static String nameByStreamId(String id) {
+    Objects.requireNonNull(id, "id");
+    return "id."+id;
+  }
+  
+  /** returns null if {@code name} is not a from nameByStreamAlias() */
+  public static String streamAliasFromName(String name) {
+    Objects.requireNonNull(name, "name");
+    if (!name.startsWith("alias."))
+      return null;
+    return name.substring("alias.".length());
+  }
+  
+  /** returns null if {@code name} is not a from nameByStreamId() */
+  public static String streamIdFromName(String name) {
+    Objects.requireNonNull(name, "name");
+    if (!name.startsWith("id."))
+      return null;
+    return name.substring("id.".length());
+  }
+  
+  /** A single StreamScope can be registered with multiple names.
+   * @throws IllegalStateException if a registration already exists for {@code name}
+   */
+  public synchronized void register(String name, StreamScope<?> streamScope) {
+    if (byNameMap.containsKey(name))
+      throw new IllegalStateException("StreamScope already registered by name "+name);
+    byNameMap.put(name, streamScope);
+    List<String> names = byStreamScopeMap.get(streamScope); 
+    if (names == null) {
+      names = new ArrayList<>(2);
+      byStreamScopeMap.put(streamScope, names);
+    }
+    names.add(name);
+  }
+  
+  /**
+   * @param name
+   * @return the StreamScope. null if name is not registered.
+   */
+  public synchronized StreamScope<?> lookup(String name) {
+    return byNameMap.get(name);
+  }
+  
+  /**
+   * Get the registered names.
+   * @return unmodifiable collection of the name.
+   * The set is backed by the registry so the set may change.
+   */
+  public synchronized Set<String> getNames() {
+    return Collections.unmodifiableSet(byNameMap.keySet());
+  }
+  
+  /** Get registered StreamScopes and the name(s) each is registered with.
+   * The map is backed by the registry so its contents may change.
+   */
+  public synchronized Map<StreamScope<?>, List<String>> getStreamScopes() {
+    return Collections.unmodifiableMap(byStreamScopeMap);
+  }
+  
+  /** remove the specific name registration.  Other registration of the same StreamScope may still exist.
+   * no-op if name is not registered.
+   * @see #unregister(StreamScope)
+   */
+  public synchronized void unregister(String name) {
+    StreamScope<?> streamScope = byNameMap.get(name);
+    if (streamScope == null)
+      return;
+    byNameMap.remove(name);
+    List<String> names = byStreamScopeMap.get(streamScope);
+    names.remove(name);
+    if (names.isEmpty())
+      byStreamScopeMap.remove(streamScope);
+  }
+  
+  /** remove all name registrations of the StreamScope.
+   * no-op if no registrations for the StreamScope
+   */
+  public synchronized void unregister(StreamScope<?> streamScope) {
+    List<String> names = byStreamScopeMap.get(streamScope);
+    if (names == null)
+      return;
+    names = new ArrayList<>(names);
+    for (String name : names)
+      unregister(name);
+  }
+  
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/4b36d473/utils/streamscope/src/main/java/quarks/streamscope/oplets/StreamScope.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/main/java/quarks/streamscope/oplets/StreamScope.java b/utils/streamscope/src/main/java/quarks/streamscope/oplets/StreamScope.java
new file mode 100644
index 0000000..7ecdfd5
--- /dev/null
+++ b/utils/streamscope/src/main/java/quarks/streamscope/oplets/StreamScope.java
@@ -0,0 +1,44 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.streamscope.oplets;
+
+import quarks.function.Consumer;
+import quarks.oplet.functional.Peek;
+
+/**
+ * A Stream "oscilloscope" oplet.
+ * <P>
+ * TODO remove this?  Just added to help the Console specialize its presentation
+ * and/or so user can differentiate a StreamScope from any other random Peek oplet use.
+ * </P>
+ *
+ * @param <T> Type of the tuple.
+ */
+public class StreamScope<T> extends Peek<T> {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * Create a new instance.
+   * @param streamScope the consumer function
+   */
+  public StreamScope(Consumer<T> streamScope) {
+    super(streamScope);
+  }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/4b36d473/utils/streamscope/src/test/java/quarks/test/streamscope/StreamScopeTest.java
----------------------------------------------------------------------
diff --git a/utils/streamscope/src/test/java/quarks/test/streamscope/StreamScopeTest.java b/utils/streamscope/src/test/java/quarks/test/streamscope/StreamScopeTest.java
new file mode 100644
index 0000000..3b7afa7
--- /dev/null
+++ b/utils/streamscope/src/test/java/quarks/test/streamscope/StreamScopeTest.java
@@ -0,0 +1,325 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.test.streamscope;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import quarks.streamscope.StreamScope;
+import quarks.streamscope.StreamScopeRegistry;
+import quarks.streamscope.StreamScope.Sample;
+import quarks.test.topology.TopologyAbstractTest;
+
+@Ignore
+public abstract class StreamScopeTest extends TopologyAbstractTest {	
+    
+    @Test
+    public void testStreamScopeFn() throws Exception {
+
+        StreamScope<Integer> ss = new StreamScope<>();
+
+        List<Sample<Integer>> samples; 
+        Sample<Integer> sample;
+
+        assertFalse(ss.isEnabled());
+        assertNotNull(ss.bufferMgr());
+        assertNotNull(ss.triggerMgr());
+        assertEquals(0, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertNotNull(samples);
+        assertEquals(0, samples.size());
+        
+        // ---------------- no capture when not enabled
+        ss.accept(1);
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+        
+        ss.setEnabled(true);
+        
+        // ---------------- single capture
+        // note: getSamples() removes captured tuples
+        ss.accept(100);
+        assertEquals(1, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(1, samples.size());
+        sample = samples.get(0);
+        assertEquals(100, sample.tuple().intValue());
+        assertTrue(sample.timestamp() != 0);
+        assertTrue(sample.nanoTime() != 0);
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+        
+        // ---------------- next capture/get; different lists
+        List<Sample<Integer>> savedSamples = samples;
+        ss.accept(101);
+        assertEquals(1, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(101, samples.get(0).tuple().intValue());
+        assertTrue(samples != savedSamples);
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+        
+        // ---------------- multi capture
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(101, samples.get(1).tuple().intValue());
+        assertEquals(102, samples.get(2).tuple().intValue());
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+        
+        // ----------------  disable => clears capture buffer
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        assertEquals(3, ss.getSampleCount());
+        ss.setEnabled(false);
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+
+        ss.setEnabled(true);
+
+        // ---------------- buffer control at the limit (no eviction)
+        ss.bufferMgr().setMaxRetentionCount(3);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(101, samples.get(1).tuple().intValue());
+        assertEquals(102, samples.get(2).tuple().intValue());
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+        
+        // ---------------- buffer control with eviction
+        ss.bufferMgr().setMaxRetentionCount(2);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        assertEquals(2, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(101, samples.get(0).tuple().intValue());
+        assertEquals(102, samples.get(1).tuple().intValue());
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+        ss.bufferMgr().setMaxRetentionCount(10);
+        
+        // ---------------- trigger byCount
+        ss.triggerMgr().setByCount(3);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        ss.accept(103);
+        ss.accept(104);
+        ss.accept(105);
+        ss.accept(106);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(103, samples.get(1).tuple().intValue());
+        assertEquals(106, samples.get(2).tuple().intValue());
+        
+        // ---------------- trigger byPredicate
+        ss.triggerMgr().setByPredicate(t -> t % 2 == 0);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        ss.accept(103);
+        ss.accept(104);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(102, samples.get(1).tuple().intValue());
+        assertEquals(104, samples.get(2).tuple().intValue());
+        
+        // ---------------- trigger byTime
+        ss.triggerMgr().setByTime(100, TimeUnit.MILLISECONDS);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        Thread.sleep(110);
+        ss.accept(103);
+        ss.accept(104);
+        ss.accept(105);
+        Thread.sleep(110);
+        ss.accept(106);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(103, samples.get(1).tuple().intValue());
+        assertEquals(106, samples.get(2).tuple().intValue());
+        
+        // ---------------- trigger continuous
+        ss.triggerMgr().setContinuous();
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(101, samples.get(1).tuple().intValue());
+        assertEquals(102, samples.get(2).tuple().intValue());
+        
+        // ---------------- trigger pause
+        ss.accept(100);
+        ss.accept(101);
+        ss.triggerMgr().setPaused(true);
+        assertTrue(ss.triggerMgr().isPaused());
+        ss.accept(102);
+        ss.accept(103);
+        ss.triggerMgr().setPaused(false);
+        assertFalse(ss.triggerMgr().isPaused());
+        ss.accept(104);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(101, samples.get(1).tuple().intValue());
+        assertEquals(104, samples.get(2).tuple().intValue());
+        
+        // ---------------- trigger pauseOn
+        
+        ss.triggerMgr().setPauseOn(t -> t == 102);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        ss.accept(103);
+        ss.accept(104);
+        ss.accept(105);
+        assertTrue(ss.triggerMgr().isPaused());
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(101, samples.get(1).tuple().intValue());
+        assertEquals(102, samples.get(2).tuple().intValue());
+        ss.triggerMgr().setPaused(false);
+        ss.accept(1000);
+        ss.accept(1010);
+        ss.accept(102);
+        ss.accept(1030);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(1000, samples.get(0).tuple().intValue());
+        assertEquals(1010, samples.get(1).tuple().intValue());
+        assertEquals(102, samples.get(2).tuple().intValue());
+        
+    }
+
+    @Test
+    public void testStreamScopeRegistry() throws Exception {
+
+      StreamScope<Integer> ss1 = new StreamScope<>();
+      StreamScope<Integer> ss2 = new StreamScope<>();
+      
+      StreamScopeRegistry rgy = new StreamScopeRegistry();
+      
+      assertNotNull(rgy.getNames());
+      assertEquals(0, rgy.getNames().size());
+      assertNotNull(rgy.getStreamScopes());
+      assertEquals(0, rgy.getStreamScopes().size());
+      assertNull(rgy.lookup("xyzzy"));
+      rgy.unregister("xyzzy");
+      rgy.unregister(ss1);
+      
+      // ---------- name generation / parse functions
+      String alias1Name = StreamScopeRegistry.nameByStreamAlias("alias1");
+      assertNotNull(alias1Name);
+      String alias2Name = StreamScopeRegistry.nameByStreamAlias("alias2");
+      assertNotNull(alias2Name);
+      assertFalse(alias1Name.equals(alias2Name));
+      String alias1 = StreamScopeRegistry.streamAliasFromName(alias1Name);
+      assertEquals("alias1", alias1);
+      
+      String id1Name = StreamScopeRegistry.nameByStreamId("id1");
+      assertNotNull(id1Name);
+      String id2Name = StreamScopeRegistry.nameByStreamId("id2");
+      assertNotNull(id2Name);
+      assertFalse(id1Name.equals(id2Name));
+      String id1 = StreamScopeRegistry.streamIdFromName(id1Name);
+      assertEquals("id1", id1);
+
+      assertFalse(StreamScopeRegistry.nameByStreamAlias("1")
+          .equals(StreamScopeRegistry.nameByStreamId("1")));
+      
+      // ---------- register
+      rgy.register(alias1Name, ss1);
+      rgy.register(alias2Name, ss2);
+      rgy.register(id2Name, ss2);
+
+      // ---------- lookup
+      assertSame(ss1, rgy.lookup(alias1Name));
+      assertSame(ss2, rgy.lookup(alias2Name));
+      assertSame(ss2, rgy.lookup(id2Name));
+     
+      // ----------- getNames
+      assertEquals(3, rgy.getNames().size());
+      assertTrue(rgy.getNames().contains(alias1Name));
+      assertFalse(rgy.getNames().contains(id1Name));
+      assertTrue(rgy.getNames().contains(alias2Name));
+      assertTrue(rgy.getNames().contains(id2Name));
+      
+      // ----------- getStreamScopes
+      assertEquals(2, rgy.getStreamScopes().keySet().size());
+      assertTrue(rgy.getStreamScopes().keySet().contains(ss1));
+      assertTrue(rgy.getStreamScopes().keySet().contains(ss2));
+      assertEquals(1, rgy.getStreamScopes().get(ss1).size());
+      assertTrue(rgy.getStreamScopes().get(ss1).contains(alias1Name));
+      assertEquals(2, rgy.getStreamScopes().get(ss2).size());
+      assertTrue(rgy.getStreamScopes().get(ss2).contains(alias2Name));
+      assertTrue(rgy.getStreamScopes().get(ss2).contains(id2Name));
+      
+      // ---------- unregister
+      rgy.unregister(alias1Name);
+      assertNull(rgy.lookup(alias1Name));
+      assertEquals(2, rgy.getNames().size());
+      assertFalse(rgy.getNames().contains(alias1Name));
+      assertFalse(rgy.getStreamScopes().keySet().contains(ss1));
+      assertTrue(rgy.getStreamScopes().keySet().contains(ss2));
+      
+      rgy.unregister(alias2Name);
+      assertNull(rgy.lookup(alias2Name));
+      assertEquals(1, rgy.getNames().size());
+      assertFalse(rgy.getNames().contains(alias2Name));
+      assertTrue(rgy.getStreamScopes().keySet().contains(ss2));
+      assertSame(ss2, rgy.lookup(id2Name));
+      rgy.unregister(id2Name);
+      assertEquals(0, rgy.getNames().size());
+      assertEquals(0, rgy.getStreamScopes().keySet().size());
+      
+      rgy.register(alias2Name, ss2);
+      rgy.register(id2Name, ss2);
+      rgy.unregister(ss2);
+      assertEquals(0, rgy.getNames().size());
+      assertEquals(0, rgy.getStreamScopes().keySet().size());
+    }
+
+}


[4/9] incubator-quarks git commit: rebase and fix ant build

Posted by dl...@apache.org.
rebase and fix ant build


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/e10447ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/e10447ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/e10447ce

Branch: refs/heads/master
Commit: e10447ce7fd5c4cb9bbf8187a1b21baf8c58ed75
Parents: a808a72
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Wed May 11 16:46:47 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Wed May 18 16:03:04 2016 -0400

----------------------------------------------------------------------
 .../java/quarks/oplet/plumbing/StreamScope.java    |  3 ++-
 api/topology/build.xml                             |  1 +
 .../java/quarks/topology/plumbing/StreamScope.java |  2 +-
 .../java/quarks/test/topology/PlumbingTest.java    | 17 ++++++++---------
 4 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/e10447ce/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java b/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java
index fd88d55..af97a10 100644
--- a/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java
+++ b/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java
@@ -18,6 +18,7 @@ under the License.
 */
 package quarks.oplet.plumbing;
 
+import quarks.function.Consumer;
 import quarks.oplet.functional.Peek;
 
 /**
@@ -36,7 +37,7 @@ public class StreamScope<T> extends Peek<T> {
    * Create a new instance.
    * @param streamScope the consumer function
    */
-  public StreamScope(quarks.topology.plumbing.StreamScope<T> streamScope) {
+  public StreamScope(Consumer<T> streamScope) {
     super(streamScope);
   }
     

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/e10447ce/api/topology/build.xml
----------------------------------------------------------------------
diff --git a/api/topology/build.xml b/api/topology/build.xml
index 938a9c6..8e9456f 100644
--- a/api/topology/build.xml
+++ b/api/topology/build.xml
@@ -26,6 +26,7 @@
   <path id="compile.classpath">
     <pathelement location="${lib}/quarks.api.execution.jar"/>
     <pathelement location="${lib}/quarks.api.graph.jar"/>
+    <pathelement location="${lib}/quarks.api.oplet.jar" />
     <path refid="quarks.ext.classpath"/>
   </path>
 

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/e10447ce/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java b/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java
index 4cf77f8..2252e2a 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java
@@ -74,7 +74,7 @@ public class StreamScope<T> implements Consumer<T> {
     /**
      * Capture time in msec since the epoch.
      * @return the timestamp
-     * @see System#currentTimeMillis();
+     * @see System#currentTimeMillis()
      */
     public long timestamp() {
       return ts;

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/e10447ce/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index f88da2d..0dfaa6c 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -1,4 +1,4 @@
-//*
+/*
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
 distributed with this work for additional information
@@ -20,7 +20,6 @@ package quarks.test.topology;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
@@ -782,8 +781,8 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
         assertEquals(1, samples.size());
         sample = samples.get(0);
         assertEquals(100, sample.tuple().intValue());
-        assertNotEquals(0, sample.timestamp());
-        assertNotEquals(0, sample.nanoTime());
+        assertTrue(sample.timestamp() != 0);
+        assertTrue(sample.nanoTime() != 0);
         assertEquals(0, ss.getSampleCount());
         assertEquals(0, ss.getSamples().size());
         
@@ -793,7 +792,7 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
         assertEquals(1, ss.getSampleCount());
         samples = ss.getSamples();
         assertEquals(101, samples.get(0).tuple().intValue());
-        assertNotEquals(samples, savedSamples);
+        assertTrue(samples != savedSamples);
         assertEquals(0, ss.getSampleCount());
         assertEquals(0, ss.getSamples().size());
         
@@ -967,7 +966,7 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
       assertNotNull(alias1Name);
       String alias2Name = StreamScopeRegistry.nameByStreamAlias("alias2");
       assertNotNull(alias2Name);
-      assertNotEquals(alias1Name, alias2Name);
+      assertFalse(alias1Name.equals(alias2Name));
       String alias1 = StreamScopeRegistry.streamAliasFromName(alias1Name);
       assertEquals("alias1", alias1);
       
@@ -975,12 +974,12 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
       assertNotNull(id1Name);
       String id2Name = StreamScopeRegistry.nameByStreamId("id2");
       assertNotNull(id2Name);
-      assertNotEquals(id1Name, id2Name);
+      assertFalse(id1Name.equals(id2Name));
       String id1 = StreamScopeRegistry.streamIdFromName(id1Name);
       assertEquals("id1", id1);
 
-      assertNotEquals(StreamScopeRegistry.nameByStreamAlias("1"),
-                      StreamScopeRegistry.nameByStreamId("1"));
+      assertFalse(StreamScopeRegistry.nameByStreamAlias("1")
+          .equals(StreamScopeRegistry.nameByStreamId("1")));
       
       // ---------- register
       rgy.register(alias1Name, ss1);