You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/26 12:33:12 UTC

[5/9] incubator-quarks git commit: [WIP] [QUARKS-22] Initial StreamScope implementation

[WIP] [QUARKS-22] Initial StreamScope implementation

- Add PlumbingStreams.StreamScope (Consumer<T>)
- Add PlumbingStreams.StreamScopeRegistry
- Add quarks.topology.plumbing.StreamScope oplet (just so console can
present it differently - remove?)
- Update DevelopmentProvider to add StreamScopes
- Add StreamScope and StreamScopeRegistry tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/a808a728
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/a808a728
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/a808a728

Branch: refs/heads/master
Commit: a808a72843f2c2b1de76ce79dcb4bbe8f442f41e
Parents: 10075c6
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Wed May 11 15:53:25 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Wed May 18 16:03:04 2016 -0400

----------------------------------------------------------------------
 .../java/quarks/oplet/plumbing/StreamScope.java |  43 ++
 .../quarks/topology/plumbing/StreamScope.java   | 415 +++++++++++++++++++
 .../topology/plumbing/StreamScopeRegistry.java  | 145 +++++++
 .../java/quarks/test/topology/PlumbingTest.java | 295 ++++++++++++-
 .../development/DevelopmentProvider.java        |  37 ++
 5 files changed, 933 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a808a728/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java b/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java
new file mode 100644
index 0000000..fd88d55
--- /dev/null
+++ b/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java
@@ -0,0 +1,43 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.oplet.plumbing;
+
+import quarks.oplet.functional.Peek;
+
+/**
+ * A Stream "oscilloscope" oplet.
+ * <P>
+ * TODO remove this?  Just added to help the Console specialize its presentation
+ * and/or so user can differentiate a StreamScope from any other random Peek oplet use.
+ * </P>
+ *
+ * @param <T> Type of the tuple.
+ */
+public class StreamScope<T> extends Peek<T> {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * Create a new instance.
+   * @param streamScope the consumer function
+   */
+  public StreamScope(quarks.topology.plumbing.StreamScope<T> streamScope) {
+    super(streamScope);
+  }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a808a728/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java b/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java
new file mode 100644
index 0000000..4cf77f8
--- /dev/null
+++ b/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java
@@ -0,0 +1,415 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.topology.plumbing;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import quarks.function.Consumer;
+import quarks.function.Functions;
+import quarks.function.Predicate;
+import quarks.topology.TStream;
+
+/**
+ * A Stream "oscilloscope" for capturing stream tuples for analysis / debug.
+ * <P>
+ * A {@code StreamScope} is expected to be used as parameter to
+ * {@link TStream#peek(Consumer)}.
+ * </P><P>
+ * A {@link TriggerManager} controls which tuples are captured.
+ * A {@link BufferManager} controls the retention policy for captured tuples.
+ * </P>
+ * <P>
+ * StreamScope instances are typically registered in and located via
+ * a {@link StreamScopeRegistry}.
+ * </P>
+ * @see StreamScopeRegistry
+ * @see quarks.providers.development.DevelopmentProvider DevelopmentProvider
+ */
+public class StreamScope<T> implements Consumer<T> {
+  private static final long serialVersionUID = 1L;
+  private final BufferManager<T> buffer = new BufferManager<>();
+  private final TriggerManager<T> trigger = new TriggerManager<>();
+  private boolean isEnabled;
+  
+  /**
+   * A captured tuple.
+   * <P>
+   * The Sample captures the tuple, and the system time and nanoTime
+   * that the tuple was received.
+   * </P>
+   *
+   * @param <T> Tuple type.
+   */
+  public static class Sample<T> {
+    private final long ts;
+    private final long nanoTime;
+    private final T tuple;
+    
+    Sample(T tuple) {
+      this.ts = System.currentTimeMillis();
+      this.nanoTime = System.nanoTime();
+      this.tuple = tuple;
+    }
+    
+    /**
+     * Capture time in msec since the epoch.
+     * @return the timestamp
+     * @see System#currentTimeMillis();
+     */
+    public long timestamp() {
+      return ts;
+    }
+    
+    /**
+     * Capture time in nanoTime.
+     * @return the nanoTime
+     * @see System#nanoTime()
+     */
+    public long nanoTime() {
+      return nanoTime;
+    }
+    
+    /**
+     * The captured tuple.
+     * @return the tuple
+     */
+    public T tuple() {
+      return tuple;
+    }
+    
+    @Override
+    public String toString() {
+      return "ts="+ts+" nano="+nanoTime+" tuple="+tuple;
+    }
+  }
+  
+  /**
+   * Control the retention of captured tuples.
+   * <P>
+   * Captured tuples are retained until either:
+   * <ul>
+   * <li>a maximum retention count is exceeded</li>
+   * <li>TODO a maximum retention time is exceeded</li>
+   * </ul> 
+   * </P><P>
+   * The default configuration is a maxCount of 10.
+   * </P>
+   */
+  public static class BufferManager<T> {
+    private List<Sample<T>> buffer = Collections.emptyList();
+    private int maxCount = 10;
+    private long period;
+    private TimeUnit unit;
+    
+    // TODO timer based eviction
+    // TODO look at using quarks Windows / WindowImpl for the buffer.
+    //      Can window type/"size" be changed?
+    //      Does it support simultaneous maxCount & maxAge?
+    
+    List<Sample<T>> getSamples() {
+      return Collections.unmodifiableList(buffer);
+    }
+    
+    /**
+     * Set the maximum number of tuples to retain.
+     * <P>
+     * The capture buffer is cleared.
+     * </P>
+     * @param maxCount the maximum number of tuples to retain.
+     *        Specify 0 to disable count based retention.
+     */
+    public void setMaxRetentionCount(int maxCount) {
+      this.maxCount = maxCount;
+      allocate();
+    }
+    
+    /**
+     * Set the maximum retention time of a tuple.
+     * <P>
+     * The capture buffer is cleared.
+     * </P>
+     * @param age the amount of time to retain a tuple.
+     *        Specify 0 to disable age based retention.
+     * @param unit {@link TimeUnit}
+     */
+    public void setMaxRetentionTime(long age, TimeUnit unit) {
+      if (age < 0)
+        throw new IllegalArgumentException("age");
+      Objects.requireNonNull(unit, "unit");
+      this.period = age;
+      this.unit = unit;
+      
+      throw new IllegalStateException("setMaxRetentionTime is NYI");
+    }
+    
+    /**
+     * Get the number of tuples in the capture buffer.
+     * @return the count.
+     */
+    int getCount() {
+      return buffer.size();
+    }
+    
+    void release() {
+      buffer = Collections.emptyList();
+    }
+    
+    void allocate() {
+      buffer = new LinkedList<>();
+    }
+    
+    void add(Sample<T> sample) {
+      if (maxCount > 0 && buffer.size() >= maxCount)
+        buffer.remove(0);
+      buffer.add(sample);
+    }
+    
+    @Override
+    public String toString() {
+      return "size="+getCount()+" maxCount="+maxCount+" maxAge="+period+(unit==null ? "" : unit);
+    }
+    
+  }
+  
+  /**
+   * Control what triggers capturing of tuples.
+   * <P>
+   * The following modes are supported:
+   * <ul>
+   * <li>continuous - capture every tuple</li> 
+   * <li>by-count - capture every nth tuple</li>
+   * <li>by-time - capture based on time elapsed since the last capture</li>
+   * <li>by-Predicate - capture based on evaluating a predicate</li>
+   * </ul>
+   * </P><P>
+   * Tuple capture can be temporarily paused via {@link TriggerManager#setPaused(boolean) setPaused}.
+   * Pausing capture does not clear the capture buffer.
+   * </P><P>
+   * Capture processing can be automatically paused when a particular event
+   * has occurred via {@link TriggerManager#setPauseOn(Predicate) setPauseOn}.
+   * The pause predicate is evaluated after processing each received tuple.
+   * Use {@link TriggerManager#setPaused(boolean)} setPaused} to re-enable capture
+   * following a triggered pause.
+   * </P><P>
+   * The default configuration is {@code continuous} and not paused.
+   * </P>
+   */
+  public static class TriggerManager<T> {
+    private Predicate<T> predicate = Functions.alwaysTrue();
+    private Predicate<T> pauseOnPredicate = Functions.alwaysFalse();
+    private boolean paused = false;
+    
+    /**
+     * Test if the tuple should be captured.
+     * @param tuple the tuple
+     * @return true to capture the tuple.
+     */
+    boolean test(T tuple) {
+      if (paused)
+        return false;
+      boolean b = predicate.test(tuple);
+      paused = pauseOnPredicate.test(tuple);
+      return b;
+    }
+    
+    /**
+     * Set capture paused control
+     * @param paused true to pause, false to clear pause.
+     */
+    public void setPaused(boolean paused) {
+      this.paused = paused;
+    }
+    
+    /**
+     * Is capture paused?
+     * @return true if paused
+     */
+    public boolean isPaused() {
+      return paused;
+    }
+    
+    /**
+     * Set a pause-on predicate.  Capture is paused if the predicate
+     * returns true;
+     * @param predicate the predicate
+     * @see Functions#alwaysFalse()
+     */
+    public void setPauseOn(Predicate<T> predicate) {
+      Objects.requireNonNull(predicate, "predicate");
+      pauseOnPredicate = predicate;
+    }
+    
+    /**
+     * Capture every tuple.
+     */
+    public void setContinuous() {
+      setByPredicate(Functions.alwaysTrue());
+    }
+    
+    /**
+     * Capture the first and every nth tuple
+     * @param count
+     */
+    public void setByCount(int count) {
+      setByPredicate(newByCountPredicate(count));
+    }
+    
+    /**
+     * Capture the 1st tuple and then the next tuple after {@code period}
+     * {@code unit} time has elapsed since the previous captured tuple.
+     * 
+     * @param elapsed time to delay until next capture
+     * @param unit {@link TimeUnit}
+     */
+    public void setByTime(long elapsed, TimeUnit unit) {
+      setByPredicate(newByTimePredicate(elapsed, unit));
+    }
+    
+    /**
+     * Capture a tuple if the {@code predicate} test of the tuple returns true.
+     * @param predicate
+     */
+    public void setByPredicate(Predicate<T> predicate) {
+      Objects.requireNonNull(predicate, "predicate");
+      this.predicate = predicate;
+    }
+    
+    private static <T> Predicate<T> newByCountPredicate(int count) {
+      if (count < 1)
+        throw new IllegalArgumentException("count");
+      return new Predicate<T>() {
+        private static final long serialVersionUID = 1L;
+        int byCount = count;
+        int curCount = -1;  // capture 1st and every byCount-nth
+
+        @Override
+        public boolean test(T value) {
+          return ++curCount % byCount == 0;
+        }
+      };
+    }
+    
+    private static <T> Predicate<T> newByTimePredicate(long elapsed, TimeUnit unit) {
+      if (elapsed < 1)
+        throw new IllegalArgumentException("elapsed");
+      Objects.requireNonNull(unit, "unit");
+      return new Predicate<T>() {
+        private static final long serialVersionUID = 1L;
+        private long nextTime;
+
+        @Override
+        public boolean test(T value) {
+          long now = System.currentTimeMillis();
+          if (now > nextTime) {
+            nextTime = now + unit.toMillis(elapsed);
+            return true;
+          }
+          return false;
+        }
+      };
+    }
+
+  }
+  
+  /**
+   * Create a new instance.
+   * <P>
+   * Sample capture is disabled.
+   * </P>
+   */
+  public StreamScope() {
+  }
+  
+  /**
+   * Is tuple capture enabled?
+   * @return true if enabled.
+   */
+  public boolean isEnabled() {
+    return isEnabled;
+  }
+  
+  /**
+   * Enable or disable tuple capture.
+   * <P>
+   * Disabling releases the capture buffer.
+   * </P>
+   * @param isEnabled true to enable, false to disable.
+   */
+  public synchronized void setEnabled(boolean isEnabled) {
+    if (!isEnabled)
+      buffer.release();
+    buffer.allocate();
+    this.isEnabled = isEnabled;
+  }
+  
+  /**
+   * Get the {@link BufferManager}
+   * @return the manager
+   */
+  public BufferManager<T> bufferMgr() {
+    return buffer;
+  }
+  
+  /**
+   * Get the {@link TriggerManager}
+   * @return the manager
+   */
+  public TriggerManager<T> triggerMgr() {
+    return trigger;
+  }
+  
+  /**
+   * Get all captured tuples.
+   * <P>
+   * The returned samples are removed from the capture buffer.
+   * </P>
+   * @return unmodifiable list of captured samples
+   */
+  public synchronized List<Sample<T>> getSamples() {
+    List<Sample<T>> tmp = buffer.getSamples();
+    buffer.allocate();
+    return tmp;
+  }
+
+  /**
+   * Get the number of Samples currently captured
+   * @return the count
+   */
+  public synchronized int getSampleCount() {
+    return buffer.getCount();
+  }
+
+  @Override
+  public synchronized void accept(T tuple) {
+    if (!isEnabled())
+      return;
+    if (trigger.test(tuple))
+      buffer.add(new Sample<T>(tuple));
+  }
+
+  @Override
+  public String toString() {
+    return "isEnabled="+isEnabled+" bufferMgr="+bufferMgr();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a808a728/api/topology/src/main/java/quarks/topology/plumbing/StreamScopeRegistry.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/StreamScopeRegistry.java b/api/topology/src/main/java/quarks/topology/plumbing/StreamScopeRegistry.java
new file mode 100644
index 0000000..dbfdd80
--- /dev/null
+++ b/api/topology/src/main/java/quarks/topology/plumbing/StreamScopeRegistry.java
@@ -0,0 +1,145 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.topology.plumbing;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A registry for Stream "oscilloscope" {@link StreamScope} instances.
+ * <P>
+ * The registry contains a collection of StreamScope instances
+ * that are registered by one or more names.
+ * </P><P>
+ * The names are: by a TStream {@link quarks.topology.TStream#alias(String) alias} or
+ * by a stream's (output port's) unique identifier.
+ * Static methods are provided for composing these names and extracting
+ * the alias/identifier from generated names.
+ * </P>
+ * @see quarks.providers.development.DevelopmentProvider DevelopmentProvider
+ */
+public class StreamScopeRegistry {
+  private final Map<String, StreamScope<?>> byNameMap = new HashMap<>();
+  private final Map<StreamScope<?>, List<String>> byStreamScopeMap = new HashMap<>();
+
+  public StreamScopeRegistry() {
+    
+  }
+  
+  /** create a registration name for a stream alias */
+  public static String nameByStreamAlias(String alias) {
+    Objects.requireNonNull(alias, "alias");
+    return "alias."+alias;
+  }
+  
+  /** create a registration name for a stream id */
+  public static String nameByStreamId(String id) {
+    Objects.requireNonNull(id, "id");
+    return "id."+id;
+  }
+  
+  /** returns null if {@code name} is not a from nameByStreamAlias() */
+  public static String streamAliasFromName(String name) {
+    Objects.requireNonNull(name, "name");
+    if (!name.startsWith("alias."))
+      return null;
+    return name.substring("alias.".length());
+  }
+  
+  /** returns null if {@code name} is not a from nameByStreamId() */
+  public static String streamIdFromName(String name) {
+    Objects.requireNonNull(name, "name");
+    if (!name.startsWith("id."))
+      return null;
+    return name.substring("id.".length());
+  }
+  
+  /** A single StreamScope can be registered with multiple names.
+   * @throws IllegalStateException if a registration already exists for {@code name}
+   */
+  public synchronized void register(String name, StreamScope<?> streamScope) {
+    if (byNameMap.containsKey(name))
+      throw new IllegalStateException("StreamScope already registered by name "+name);
+    byNameMap.put(name, streamScope);
+    List<String> names = byStreamScopeMap.get(streamScope); 
+    if (names == null) {
+      names = new ArrayList<>(2);
+      byStreamScopeMap.put(streamScope, names);
+    }
+    names.add(name);
+  }
+  
+  /**
+   * @param name
+   * @return the StreamScope. null if name is not registered.
+   */
+  public synchronized StreamScope<?> lookup(String name) {
+    return byNameMap.get(name);
+  }
+  
+  /**
+   * Get the registered names.
+   * @return unmodifiable collection of the name.
+   * The set is backed by the registry so the set may change.
+   */
+  public synchronized Set<String> getNames() {
+    return Collections.unmodifiableSet(byNameMap.keySet());
+  }
+  
+  /** Get registered StreamScopes and the name(s) each is registered with.
+   * The map is backed by the registry so its contents may change.
+   */
+  public synchronized Map<StreamScope<?>, List<String>> getStreamScopes() {
+    return Collections.unmodifiableMap(byStreamScopeMap);
+  }
+  
+  /** remove the specific name registration.  Other registration of the same StreamScope may still exist.
+   * no-op if name is not registered.
+   * @see #unregister(StreamScope)
+   */
+  public synchronized void unregister(String name) {
+    StreamScope<?> streamScope = byNameMap.get(name);
+    if (streamScope == null)
+      return;
+    byNameMap.remove(name);
+    List<String> names = byStreamScopeMap.get(streamScope);
+    names.remove(name);
+    if (names.isEmpty())
+      byStreamScopeMap.remove(streamScope);
+  }
+  
+  /** remove all name registrations of the StreamScope.
+   * no-op if no registrations for the StreamScope
+   */
+  public synchronized void unregister(StreamScope<?> streamScope) {
+    List<String> names = byStreamScopeMap.get(streamScope);
+    if (names == null)
+      return;
+    names = new ArrayList<>(names);
+    for (String name : names)
+      unregister(name);
+  }
+  
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a808a728/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index f2535ed..f88da2d 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -1,4 +1,4 @@
-/*
+//*
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
 distributed with this work for additional information
@@ -20,6 +20,10 @@ package quarks.test.topology;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 
@@ -44,6 +48,9 @@ import quarks.function.ToIntFunction;
 import quarks.topology.TStream;
 import quarks.topology.Topology;
 import quarks.topology.plumbing.PlumbingStreams;
+import quarks.topology.plumbing.StreamScope;
+import quarks.topology.plumbing.StreamScope.Sample;
+import quarks.topology.plumbing.StreamScopeRegistry;
 import quarks.topology.plumbing.Valve;
 import quarks.topology.tester.Condition;
 
@@ -743,4 +750,288 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
         assertTrue("valid:" + contents.getResult(), contents.valid());
         assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
     }
-}
\ No newline at end of file
+    
+    @Test
+    public void testStreamScopeFn() throws Exception {
+
+        StreamScope<Integer> ss = new StreamScope<>();
+
+        List<Sample<Integer>> samples; 
+        Sample<Integer> sample;
+
+        assertFalse(ss.isEnabled());
+        assertNotNull(ss.bufferMgr());
+        assertNotNull(ss.triggerMgr());
+        assertEquals(0, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertNotNull(samples);
+        assertEquals(0, samples.size());
+        
+        // ---------------- no capture when not enabled
+        ss.accept(1);
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+        
+        ss.setEnabled(true);
+        
+        // ---------------- single capture
+        // note: getSamples() removes captured tuples
+        ss.accept(100);
+        assertEquals(1, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(1, samples.size());
+        sample = samples.get(0);
+        assertEquals(100, sample.tuple().intValue());
+        assertNotEquals(0, sample.timestamp());
+        assertNotEquals(0, sample.nanoTime());
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+        
+        // ---------------- next capture/get; different lists
+        List<Sample<Integer>> savedSamples = samples;
+        ss.accept(101);
+        assertEquals(1, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(101, samples.get(0).tuple().intValue());
+        assertNotEquals(samples, savedSamples);
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+        
+        // ---------------- multi capture
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(101, samples.get(1).tuple().intValue());
+        assertEquals(102, samples.get(2).tuple().intValue());
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+        
+        // ----------------  disable => clears capture buffer
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        assertEquals(3, ss.getSampleCount());
+        ss.setEnabled(false);
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+
+        ss.setEnabled(true);
+
+        // ---------------- buffer control at the limit (no eviction)
+        ss.bufferMgr().setMaxRetentionCount(3);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(101, samples.get(1).tuple().intValue());
+        assertEquals(102, samples.get(2).tuple().intValue());
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+        
+        // ---------------- buffer control with eviction
+        ss.bufferMgr().setMaxRetentionCount(2);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        assertEquals(2, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(101, samples.get(0).tuple().intValue());
+        assertEquals(102, samples.get(1).tuple().intValue());
+        assertEquals(0, ss.getSampleCount());
+        assertEquals(0, ss.getSamples().size());
+        ss.bufferMgr().setMaxRetentionCount(10);
+        
+        // ---------------- trigger byCount
+        ss.triggerMgr().setByCount(3);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        ss.accept(103);
+        ss.accept(104);
+        ss.accept(105);
+        ss.accept(106);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(103, samples.get(1).tuple().intValue());
+        assertEquals(106, samples.get(2).tuple().intValue());
+        
+        // ---------------- trigger byPredicate
+        ss.triggerMgr().setByPredicate(t -> t % 2 == 0);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        ss.accept(103);
+        ss.accept(104);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(102, samples.get(1).tuple().intValue());
+        assertEquals(104, samples.get(2).tuple().intValue());
+        
+        // ---------------- trigger byTime
+        ss.triggerMgr().setByTime(100, TimeUnit.MILLISECONDS);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        Thread.sleep(110);
+        ss.accept(103);
+        ss.accept(104);
+        ss.accept(105);
+        Thread.sleep(110);
+        ss.accept(106);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(103, samples.get(1).tuple().intValue());
+        assertEquals(106, samples.get(2).tuple().intValue());
+        
+        // ---------------- trigger continuous
+        ss.triggerMgr().setContinuous();
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(101, samples.get(1).tuple().intValue());
+        assertEquals(102, samples.get(2).tuple().intValue());
+        
+        // ---------------- trigger pause
+        ss.accept(100);
+        ss.accept(101);
+        ss.triggerMgr().setPaused(true);
+        assertTrue(ss.triggerMgr().isPaused());
+        ss.accept(102);
+        ss.accept(103);
+        ss.triggerMgr().setPaused(false);
+        assertFalse(ss.triggerMgr().isPaused());
+        ss.accept(104);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(101, samples.get(1).tuple().intValue());
+        assertEquals(104, samples.get(2).tuple().intValue());
+        
+        // ---------------- trigger pauseOn
+        
+        ss.triggerMgr().setPauseOn(t -> t == 102);
+        ss.accept(100);
+        ss.accept(101);
+        ss.accept(102);
+        ss.accept(103);
+        ss.accept(104);
+        ss.accept(105);
+        assertTrue(ss.triggerMgr().isPaused());
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(100, samples.get(0).tuple().intValue());
+        assertEquals(101, samples.get(1).tuple().intValue());
+        assertEquals(102, samples.get(2).tuple().intValue());
+        ss.triggerMgr().setPaused(false);
+        ss.accept(1000);
+        ss.accept(1010);
+        ss.accept(102);
+        ss.accept(1030);
+        assertEquals(3, ss.getSampleCount());
+        samples = ss.getSamples();
+        assertEquals(1000, samples.get(0).tuple().intValue());
+        assertEquals(1010, samples.get(1).tuple().intValue());
+        assertEquals(102, samples.get(2).tuple().intValue());
+        
+    }
+
+    @Test
+    public void testStreamScopeRegistry() throws Exception {
+
+      StreamScope<Integer> ss1 = new StreamScope<>();
+      StreamScope<Integer> ss2 = new StreamScope<>();
+      
+      StreamScopeRegistry rgy = new StreamScopeRegistry();
+      
+      assertNotNull(rgy.getNames());
+      assertEquals(0, rgy.getNames().size());
+      assertNotNull(rgy.getStreamScopes());
+      assertEquals(0, rgy.getStreamScopes().size());
+      assertNull(rgy.lookup("xyzzy"));
+      rgy.unregister("xyzzy");
+      rgy.unregister(ss1);
+      
+      // ---------- name generation / parse functions
+      String alias1Name = StreamScopeRegistry.nameByStreamAlias("alias1");
+      assertNotNull(alias1Name);
+      String alias2Name = StreamScopeRegistry.nameByStreamAlias("alias2");
+      assertNotNull(alias2Name);
+      assertNotEquals(alias1Name, alias2Name);
+      String alias1 = StreamScopeRegistry.streamAliasFromName(alias1Name);
+      assertEquals("alias1", alias1);
+      
+      String id1Name = StreamScopeRegistry.nameByStreamId("id1");
+      assertNotNull(id1Name);
+      String id2Name = StreamScopeRegistry.nameByStreamId("id2");
+      assertNotNull(id2Name);
+      assertNotEquals(id1Name, id2Name);
+      String id1 = StreamScopeRegistry.streamIdFromName(id1Name);
+      assertEquals("id1", id1);
+
+      assertNotEquals(StreamScopeRegistry.nameByStreamAlias("1"),
+                      StreamScopeRegistry.nameByStreamId("1"));
+      
+      // ---------- register
+      rgy.register(alias1Name, ss1);
+      rgy.register(alias2Name, ss2);
+      rgy.register(id2Name, ss2);
+
+      // ---------- lookup
+      assertSame(ss1, rgy.lookup(alias1Name));
+      assertSame(ss2, rgy.lookup(alias2Name));
+      assertSame(ss2, rgy.lookup(id2Name));
+     
+      // ----------- getNames
+      assertEquals(3, rgy.getNames().size());
+      assertTrue(rgy.getNames().contains(alias1Name));
+      assertFalse(rgy.getNames().contains(id1Name));
+      assertTrue(rgy.getNames().contains(alias2Name));
+      assertTrue(rgy.getNames().contains(id2Name));
+      
+      // ----------- getStreamScopes
+      assertEquals(2, rgy.getStreamScopes().keySet().size());
+      assertTrue(rgy.getStreamScopes().keySet().contains(ss1));
+      assertTrue(rgy.getStreamScopes().keySet().contains(ss2));
+      assertEquals(1, rgy.getStreamScopes().get(ss1).size());
+      assertTrue(rgy.getStreamScopes().get(ss1).contains(alias1Name));
+      assertEquals(2, rgy.getStreamScopes().get(ss2).size());
+      assertTrue(rgy.getStreamScopes().get(ss2).contains(alias2Name));
+      assertTrue(rgy.getStreamScopes().get(ss2).contains(id2Name));
+      
+      // ---------- unregister
+      rgy.unregister(alias1Name);
+      assertNull(rgy.lookup(alias1Name));
+      assertEquals(2, rgy.getNames().size());
+      assertFalse(rgy.getNames().contains(alias1Name));
+      assertFalse(rgy.getStreamScopes().keySet().contains(ss1));
+      assertTrue(rgy.getStreamScopes().keySet().contains(ss2));
+      
+      rgy.unregister(alias2Name);
+      assertNull(rgy.lookup(alias2Name));
+      assertEquals(1, rgy.getNames().size());
+      assertFalse(rgy.getNames().contains(alias2Name));
+      assertTrue(rgy.getStreamScopes().keySet().contains(ss2));
+      assertSame(ss2, rgy.lookup(id2Name));
+      rgy.unregister(id2Name);
+      assertEquals(0, rgy.getNames().size());
+      assertEquals(0, rgy.getStreamScopes().keySet().size());
+      
+      rgy.register(alias2Name, ss2);
+      rgy.register(id2Name, ss2);
+      rgy.unregister(ss2);
+      assertEquals(0, rgy.getNames().size());
+      assertEquals(0, rgy.getStreamScopes().keySet().size());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a808a728/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
----------------------------------------------------------------------
diff --git a/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java b/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
index 137b936..cd2ca9f 100644
--- a/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
+++ b/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
@@ -30,6 +30,7 @@ import quarks.execution.Job;
 import quarks.execution.services.ControlService;
 import quarks.graph.Connector;
 import quarks.graph.Edge;
+import quarks.graph.Vertex;
 import quarks.metrics.Metrics;
 import quarks.metrics.MetricsSetup;
 import quarks.metrics.oplets.CounterOp;
@@ -38,6 +39,8 @@ import quarks.oplet.core.Peek;
 import quarks.providers.direct.DirectProvider;
 import quarks.runtime.jmxcontrol.JMXControlService;
 import quarks.topology.Topology;
+import quarks.topology.plumbing.StreamScope;
+import quarks.topology.plumbing.StreamScopeRegistry;
 
 /**
  * Provider intended for development.
@@ -78,6 +81,9 @@ public class DevelopmentProvider extends DirectProvider {
         
         getServices().addService(ControlService.class,
                 new JMXControlService(JMX_DOMAIN, new Hashtable<>()));
+        
+        getServices().addService(StreamScopeRegistry.class,
+                new StreamScopeRegistry());
 
         HttpServer server = HttpServer.getInstance();
         getServices().addService(HttpServer.class, server);   
@@ -87,6 +93,7 @@ public class DevelopmentProvider extends DirectProvider {
     @Override
     public Future<Job> submit(Topology topology, JsonObject config) {
         Metrics.counter(topology);
+        addStreamScopes(topology);
         duplicateTags(topology);
         return super.submit(topology, config);
     }
@@ -132,4 +139,34 @@ public class DevelopmentProvider extends DirectProvider {
         c.tag(ta);
       }
     }
+
+    /**
+     * Add StreamScope instances to the topology
+     * @param t the Topology
+     */
+    private void addStreamScopes(Topology t) {
+      StreamScopeRegistry rgy = (StreamScopeRegistry) 
+          t.getRuntimeServiceSupplier().get()
+            .getService(StreamScopeRegistry.class);
+      if (rgy == null)
+        return;
+
+      t.graph().peekAll( 
+          () -> {
+              StreamScope<?> streamScope = new StreamScope<>();
+              Peek<?> peekOp = new quarks.oplet.plumbing.StreamScope<>(streamScope);
+              registerStreamScope(rgy, peekOp, streamScope);
+              return peekOp;
+            },
+          (Vertex<?, ?, ?> v) -> !(v.getInstance() instanceof quarks.oplet.core.FanOut));
+    }
+    
+    private int hackCount = 0;  // TODO temp hack to enable some test development
+    private void registerStreamScope(StreamScopeRegistry rgy, Peek<?> peekOp, StreamScope<?> streamScope) {
+      hackCount++;
+      String id = "oplet-"+ hackCount;  // TODO get from peekOp's source oport  <opletId>.oport.<n>
+      String alias = "streamAlias-"+ hackCount;  //  TODO get from peekOp's source oport context
+      rgy.register(StreamScopeRegistry.nameByStreamAlias(alias), streamScope);
+      rgy.register(StreamScopeRegistry.nameByStreamId(id), streamScope);
+    }
 }