You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/26 12:33:12 UTC
[5/9] incubator-quarks git commit: [WIP] [QUARKS-22] Initial
StreamScope implementation
[WIP] [QUARKS-22] Initial StreamScope implementation
- Add PlumbingStreams.StreamScope (Consumer<T>)
- Add PlumbingStreams.StreamScopeRegistry
- Add quarks.topology.plumbing.StreamScope oplet (just so console can
present it differently - remove?)
- Update DevelopmentProvider to add StreamScopes
- Add StreamScope and StreamScopeRegistry tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/a808a728
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/a808a728
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/a808a728
Branch: refs/heads/master
Commit: a808a72843f2c2b1de76ce79dcb4bbe8f442f41e
Parents: 10075c6
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Wed May 11 15:53:25 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Wed May 18 16:03:04 2016 -0400
----------------------------------------------------------------------
.../java/quarks/oplet/plumbing/StreamScope.java | 43 ++
.../quarks/topology/plumbing/StreamScope.java | 415 +++++++++++++++++++
.../topology/plumbing/StreamScopeRegistry.java | 145 +++++++
.../java/quarks/test/topology/PlumbingTest.java | 295 ++++++++++++-
.../development/DevelopmentProvider.java | 37 ++
5 files changed, 933 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a808a728/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java
----------------------------------------------------------------------
diff --git a/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java b/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java
new file mode 100644
index 0000000..fd88d55
--- /dev/null
+++ b/api/oplet/src/main/java/quarks/oplet/plumbing/StreamScope.java
@@ -0,0 +1,43 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.oplet.plumbing;
+
+import quarks.oplet.functional.Peek;
+
+/**
+ * A Stream "oscilloscope" oplet.
+ * <P>
+ * TODO remove this? Just added to help the Console specialize its presentation
+ * and/or so user can differentiate a StreamScope from any other random Peek oplet use.
+ * </P>
+ *
+ * @param <T> Type of the tuple.
+ */
+public class StreamScope<T> extends Peek<T> {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Create a new instance.
+ * @param streamScope the consumer function
+ */
+ public StreamScope(quarks.topology.plumbing.StreamScope<T> streamScope) {
+ super(streamScope);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a808a728/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java b/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java
new file mode 100644
index 0000000..4cf77f8
--- /dev/null
+++ b/api/topology/src/main/java/quarks/topology/plumbing/StreamScope.java
@@ -0,0 +1,415 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.topology.plumbing;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import quarks.function.Consumer;
+import quarks.function.Functions;
+import quarks.function.Predicate;
+import quarks.topology.TStream;
+
+/**
+ * A Stream "oscilloscope" for capturing stream tuples for analysis / debug.
+ * <P>
+ * A {@code StreamScope} is expected to be used as parameter to
+ * {@link TStream#peek(Consumer)}.
+ * </P><P>
+ * A {@link TriggerManager} controls which tuples are captured.
+ * A {@link BufferManager} controls the retention policy for captured tuples.
+ * </P>
+ * <P>
+ * StreamScope instances are typically registered in and located via
+ * a {@link StreamScopeRegistry}.
+ * </P>
+ * @see StreamScopeRegistry
+ * @see quarks.providers.development.DevelopmentProvider DevelopmentProvider
+ */
+public class StreamScope<T> implements Consumer<T> {
+ private static final long serialVersionUID = 1L;
+ private final BufferManager<T> buffer = new BufferManager<>();
+ private final TriggerManager<T> trigger = new TriggerManager<>();
+ private boolean isEnabled;
+
+ /**
+ * A captured tuple.
+ * <P>
+ * The Sample captures the tuple, and the system time and nanoTime
+ * that the tuple was received.
+ * </P>
+ *
+ * @param <T> Tuple type.
+ */
+ public static class Sample<T> {
+ private final long ts;
+ private final long nanoTime;
+ private final T tuple;
+
+ Sample(T tuple) {
+ this.ts = System.currentTimeMillis();
+ this.nanoTime = System.nanoTime();
+ this.tuple = tuple;
+ }
+
+ /**
+ * Capture time in msec since the epoch.
+ * @return the timestamp
+ * @see System#currentTimeMillis();
+ */
+ public long timestamp() {
+ return ts;
+ }
+
+ /**
+ * Capture time in nanoTime.
+ * @return the nanoTime
+ * @see System#nanoTime()
+ */
+ public long nanoTime() {
+ return nanoTime;
+ }
+
+ /**
+ * The captured tuple.
+ * @return the tuple
+ */
+ public T tuple() {
+ return tuple;
+ }
+
+ @Override
+ public String toString() {
+ return "ts="+ts+" nano="+nanoTime+" tuple="+tuple;
+ }
+ }
+
+ /**
+ * Control the retention of captured tuples.
+ * <P>
+ * Captured tuples are retained until either:
+ * <ul>
+ * <li>a maximum retention count is exceeded</li>
+ * <li>TODO a maximum retention time is exceeded</li>
+ * </ul>
+ * </P><P>
+ * The default configuration is a maxCount of 10.
+ * </P>
+ */
+ public static class BufferManager<T> {
+ private List<Sample<T>> buffer = Collections.emptyList();
+ private int maxCount = 10;
+ private long period;
+ private TimeUnit unit;
+
+ // TODO timer based eviction
+ // TODO look at using quarks Windows / WindowImpl for the buffer.
+ // Can window type/"size" be changed?
+ // Does it support simultaneous maxCount & maxAge?
+
+ List<Sample<T>> getSamples() {
+ return Collections.unmodifiableList(buffer);
+ }
+
+ /**
+ * Set the maximum number of tuples to retain.
+ * <P>
+ * The capture buffer is cleared.
+ * </P>
+ * @param maxCount the maximum number of tuples to retain.
+ * Specify 0 to disable count based retention.
+ */
+ public void setMaxRetentionCount(int maxCount) {
+ this.maxCount = maxCount;
+ allocate();
+ }
+
+ /**
+ * Set the maximum retention time of a tuple.
+ * <P>
+ * The capture buffer is cleared.
+ * </P>
+ * @param age the amount of time to retain a tuple.
+ * Specify 0 to disable age based retention.
+ * @param unit {@link TimeUnit}
+ */
+ public void setMaxRetentionTime(long age, TimeUnit unit) {
+ if (age < 0)
+ throw new IllegalArgumentException("age");
+ Objects.requireNonNull(unit, "unit");
+ this.period = age;
+ this.unit = unit;
+
+ throw new IllegalStateException("setMaxRetentionTime is NYI");
+ }
+
+ /**
+ * Get the number of tuples in the capture buffer.
+ * @return the count.
+ */
+ int getCount() {
+ return buffer.size();
+ }
+
+ void release() {
+ buffer = Collections.emptyList();
+ }
+
+ void allocate() {
+ buffer = new LinkedList<>();
+ }
+
+ void add(Sample<T> sample) {
+ if (maxCount > 0 && buffer.size() >= maxCount)
+ buffer.remove(0);
+ buffer.add(sample);
+ }
+
+ @Override
+ public String toString() {
+ return "size="+getCount()+" maxCount="+maxCount+" maxAge="+period+(unit==null ? "" : unit);
+ }
+
+ }
+
+ /**
+ * Control what triggers capturing of tuples.
+ * <P>
+ * The following modes are supported:
+ * <ul>
+ * <li>continuous - capture every tuple</li>
+ * <li>by-count - capture every nth tuple</li>
+ * <li>by-time - capture based on time elapsed since the last capture</li>
+ * <li>by-Predicate - capture based on evaluating a predicate</li>
+ * </ul>
+ * </P><P>
+ * Tuple capture can be temporarily paused via {@link TriggerManager#setPaused(boolean) setPaused}.
+ * Pausing capture does not clear the capture buffer.
+ * </P><P>
+ * Capture processing can be automatically paused when a particular event
+ * has occurred via {@link TriggerManager#setPauseOn(Predicate) setPauseOn}.
+ * The pause predicate is evaluated after processing each received tuple.
+ * Use {@link TriggerManager#setPaused(boolean)} setPaused} to re-enable capture
+ * following a triggered pause.
+ * </P><P>
+ * The default configuration is {@code continuous} and not paused.
+ * </P>
+ */
+ public static class TriggerManager<T> {
+ private Predicate<T> predicate = Functions.alwaysTrue();
+ private Predicate<T> pauseOnPredicate = Functions.alwaysFalse();
+ private boolean paused = false;
+
+ /**
+ * Test if the tuple should be captured.
+ * @param tuple the tuple
+ * @return true to capture the tuple.
+ */
+ boolean test(T tuple) {
+ if (paused)
+ return false;
+ boolean b = predicate.test(tuple);
+ paused = pauseOnPredicate.test(tuple);
+ return b;
+ }
+
+ /**
+ * Set capture paused control
+ * @param paused true to pause, false to clear pause.
+ */
+ public void setPaused(boolean paused) {
+ this.paused = paused;
+ }
+
+ /**
+ * Is capture paused?
+ * @return true if paused
+ */
+ public boolean isPaused() {
+ return paused;
+ }
+
+ /**
+ * Set a pause-on predicate. Capture is paused if the predicate
+ * returns true;
+ * @param predicate the predicate
+ * @see Functions#alwaysFalse()
+ */
+ public void setPauseOn(Predicate<T> predicate) {
+ Objects.requireNonNull(predicate, "predicate");
+ pauseOnPredicate = predicate;
+ }
+
+ /**
+ * Capture every tuple.
+ */
+ public void setContinuous() {
+ setByPredicate(Functions.alwaysTrue());
+ }
+
+ /**
+ * Capture the first and every nth tuple
+ * @param count
+ */
+ public void setByCount(int count) {
+ setByPredicate(newByCountPredicate(count));
+ }
+
+ /**
+ * Capture the 1st tuple and then the next tuple after {@code period}
+ * {@code unit} time has elapsed since the previous captured tuple.
+ *
+ * @param elapsed time to delay until next capture
+ * @param unit {@link TimeUnit}
+ */
+ public void setByTime(long elapsed, TimeUnit unit) {
+ setByPredicate(newByTimePredicate(elapsed, unit));
+ }
+
+ /**
+ * Capture a tuple if the {@code predicate} test of the tuple returns true.
+ * @param predicate
+ */
+ public void setByPredicate(Predicate<T> predicate) {
+ Objects.requireNonNull(predicate, "predicate");
+ this.predicate = predicate;
+ }
+
+ private static <T> Predicate<T> newByCountPredicate(int count) {
+ if (count < 1)
+ throw new IllegalArgumentException("count");
+ return new Predicate<T>() {
+ private static final long serialVersionUID = 1L;
+ int byCount = count;
+ int curCount = -1; // capture 1st and every byCount-nth
+
+ @Override
+ public boolean test(T value) {
+ return ++curCount % byCount == 0;
+ }
+ };
+ }
+
+ private static <T> Predicate<T> newByTimePredicate(long elapsed, TimeUnit unit) {
+ if (elapsed < 1)
+ throw new IllegalArgumentException("elapsed");
+ Objects.requireNonNull(unit, "unit");
+ return new Predicate<T>() {
+ private static final long serialVersionUID = 1L;
+ private long nextTime;
+
+ @Override
+ public boolean test(T value) {
+ long now = System.currentTimeMillis();
+ if (now > nextTime) {
+ nextTime = now + unit.toMillis(elapsed);
+ return true;
+ }
+ return false;
+ }
+ };
+ }
+
+ }
+
+ /**
+ * Create a new instance.
+ * <P>
+ * Sample capture is disabled.
+ * </P>
+ */
+ public StreamScope() {
+ }
+
+ /**
+ * Is tuple capture enabled?
+ * @return true if enabled.
+ */
+ public boolean isEnabled() {
+ return isEnabled;
+ }
+
+ /**
+ * Enable or disable tuple capture.
+ * <P>
+ * Disabling releases the capture buffer.
+ * </P>
+ * @param isEnabled true to enable, false to disable.
+ */
+ public synchronized void setEnabled(boolean isEnabled) {
+ if (!isEnabled)
+ buffer.release();
+ buffer.allocate();
+ this.isEnabled = isEnabled;
+ }
+
+ /**
+ * Get the {@link BufferManager}
+ * @return the manager
+ */
+ public BufferManager<T> bufferMgr() {
+ return buffer;
+ }
+
+ /**
+ * Get the {@link TriggerManager}
+ * @return the manager
+ */
+ public TriggerManager<T> triggerMgr() {
+ return trigger;
+ }
+
+ /**
+ * Get all captured tuples.
+ * <P>
+ * The returned samples are removed from the capture buffer.
+ * </P>
+ * @return unmodifiable list of captured samples
+ */
+ public synchronized List<Sample<T>> getSamples() {
+ List<Sample<T>> tmp = buffer.getSamples();
+ buffer.allocate();
+ return tmp;
+ }
+
+ /**
+ * Get the number of Samples currently captured
+ * @return the count
+ */
+ public synchronized int getSampleCount() {
+ return buffer.getCount();
+ }
+
+ @Override
+ public synchronized void accept(T tuple) {
+ if (!isEnabled())
+ return;
+ if (trigger.test(tuple))
+ buffer.add(new Sample<T>(tuple));
+ }
+
+ @Override
+ public String toString() {
+ return "isEnabled="+isEnabled+" bufferMgr="+bufferMgr();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a808a728/api/topology/src/main/java/quarks/topology/plumbing/StreamScopeRegistry.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/StreamScopeRegistry.java b/api/topology/src/main/java/quarks/topology/plumbing/StreamScopeRegistry.java
new file mode 100644
index 0000000..dbfdd80
--- /dev/null
+++ b/api/topology/src/main/java/quarks/topology/plumbing/StreamScopeRegistry.java
@@ -0,0 +1,145 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package quarks.topology.plumbing;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A registry for Stream "oscilloscope" {@link StreamScope} instances.
+ * <P>
+ * The registry contains a collection of StreamScope instances
+ * that are registered by one or more names.
+ * </P><P>
+ * The names are: by a TStream {@link quarks.topology.TStream#alias(String) alias} or
+ * by a stream's (output port's) unique identifier.
+ * Static methods are provided for composing these names and extracting
+ * the alias/identifier from generated names.
+ * </P>
+ * @see quarks.providers.development.DevelopmentProvider DevelopmentProvider
+ */
+public class StreamScopeRegistry {
+ private final Map<String, StreamScope<?>> byNameMap = new HashMap<>();
+ private final Map<StreamScope<?>, List<String>> byStreamScopeMap = new HashMap<>();
+
+ public StreamScopeRegistry() {
+
+ }
+
+ /** create a registration name for a stream alias */
+ public static String nameByStreamAlias(String alias) {
+ Objects.requireNonNull(alias, "alias");
+ return "alias."+alias;
+ }
+
+ /** create a registration name for a stream id */
+ public static String nameByStreamId(String id) {
+ Objects.requireNonNull(id, "id");
+ return "id."+id;
+ }
+
+ /** returns null if {@code name} is not a from nameByStreamAlias() */
+ public static String streamAliasFromName(String name) {
+ Objects.requireNonNull(name, "name");
+ if (!name.startsWith("alias."))
+ return null;
+ return name.substring("alias.".length());
+ }
+
+ /** returns null if {@code name} is not a from nameByStreamId() */
+ public static String streamIdFromName(String name) {
+ Objects.requireNonNull(name, "name");
+ if (!name.startsWith("id."))
+ return null;
+ return name.substring("id.".length());
+ }
+
+ /** A single StreamScope can be registered with multiple names.
+ * @throws IllegalStateException if a registration already exists for {@code name}
+ */
+ public synchronized void register(String name, StreamScope<?> streamScope) {
+ if (byNameMap.containsKey(name))
+ throw new IllegalStateException("StreamScope already registered by name "+name);
+ byNameMap.put(name, streamScope);
+ List<String> names = byStreamScopeMap.get(streamScope);
+ if (names == null) {
+ names = new ArrayList<>(2);
+ byStreamScopeMap.put(streamScope, names);
+ }
+ names.add(name);
+ }
+
+ /**
+ * @param name
+ * @return the StreamScope. null if name is not registered.
+ */
+ public synchronized StreamScope<?> lookup(String name) {
+ return byNameMap.get(name);
+ }
+
+ /**
+ * Get the registered names.
+ * @return unmodifiable collection of the name.
+ * The set is backed by the registry so the set may change.
+ */
+ public synchronized Set<String> getNames() {
+ return Collections.unmodifiableSet(byNameMap.keySet());
+ }
+
+ /** Get registered StreamScopes and the name(s) each is registered with.
+ * The map is backed by the registry so its contents may change.
+ */
+ public synchronized Map<StreamScope<?>, List<String>> getStreamScopes() {
+ return Collections.unmodifiableMap(byStreamScopeMap);
+ }
+
+ /** remove the specific name registration. Other registration of the same StreamScope may still exist.
+ * no-op if name is not registered.
+ * @see #unregister(StreamScope)
+ */
+ public synchronized void unregister(String name) {
+ StreamScope<?> streamScope = byNameMap.get(name);
+ if (streamScope == null)
+ return;
+ byNameMap.remove(name);
+ List<String> names = byStreamScopeMap.get(streamScope);
+ names.remove(name);
+ if (names.isEmpty())
+ byStreamScopeMap.remove(streamScope);
+ }
+
+ /** remove all name registrations of the StreamScope.
+ * no-op if no registrations for the StreamScope
+ */
+ public synchronized void unregister(StreamScope<?> streamScope) {
+ List<String> names = byStreamScopeMap.get(streamScope);
+ if (names == null)
+ return;
+ names = new ArrayList<>(names);
+ for (String name : names)
+ unregister(name);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a808a728/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index f2535ed..f88da2d 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -1,4 +1,4 @@
-/*
+//*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
@@ -20,6 +20,10 @@ package quarks.test.topology;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
@@ -44,6 +48,9 @@ import quarks.function.ToIntFunction;
import quarks.topology.TStream;
import quarks.topology.Topology;
import quarks.topology.plumbing.PlumbingStreams;
+import quarks.topology.plumbing.StreamScope;
+import quarks.topology.plumbing.StreamScope.Sample;
+import quarks.topology.plumbing.StreamScopeRegistry;
import quarks.topology.plumbing.Valve;
import quarks.topology.tester.Condition;
@@ -743,4 +750,288 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
assertTrue("valid:" + contents.getResult(), contents.valid());
assertTrue("valid:" + resultAvailablePermits, resultAvailablePermits.equals(arrayResult));
}
-}
\ No newline at end of file
+
+ @Test
+ public void testStreamScopeFn() throws Exception {
+
+ StreamScope<Integer> ss = new StreamScope<>();
+
+ List<Sample<Integer>> samples;
+ Sample<Integer> sample;
+
+ assertFalse(ss.isEnabled());
+ assertNotNull(ss.bufferMgr());
+ assertNotNull(ss.triggerMgr());
+ assertEquals(0, ss.getSampleCount());
+ samples = ss.getSamples();
+ assertNotNull(samples);
+ assertEquals(0, samples.size());
+
+ // ---------------- no capture when not enabled
+ ss.accept(1);
+ assertEquals(0, ss.getSampleCount());
+ assertEquals(0, ss.getSamples().size());
+
+ ss.setEnabled(true);
+
+ // ---------------- single capture
+ // note: getSamples() removes captured tuples
+ ss.accept(100);
+ assertEquals(1, ss.getSampleCount());
+ samples = ss.getSamples();
+ assertEquals(1, samples.size());
+ sample = samples.get(0);
+ assertEquals(100, sample.tuple().intValue());
+ assertNotEquals(0, sample.timestamp());
+ assertNotEquals(0, sample.nanoTime());
+ assertEquals(0, ss.getSampleCount());
+ assertEquals(0, ss.getSamples().size());
+
+ // ---------------- next capture/get; different lists
+ List<Sample<Integer>> savedSamples = samples;
+ ss.accept(101);
+ assertEquals(1, ss.getSampleCount());
+ samples = ss.getSamples();
+ assertEquals(101, samples.get(0).tuple().intValue());
+ assertNotEquals(samples, savedSamples);
+ assertEquals(0, ss.getSampleCount());
+ assertEquals(0, ss.getSamples().size());
+
+ // ---------------- multi capture
+ ss.accept(100);
+ ss.accept(101);
+ ss.accept(102);
+ assertEquals(3, ss.getSampleCount());
+ samples = ss.getSamples();
+ assertEquals(100, samples.get(0).tuple().intValue());
+ assertEquals(101, samples.get(1).tuple().intValue());
+ assertEquals(102, samples.get(2).tuple().intValue());
+ assertEquals(0, ss.getSampleCount());
+ assertEquals(0, ss.getSamples().size());
+
+ // ---------------- disable => clears capture buffer
+ ss.accept(100);
+ ss.accept(101);
+ ss.accept(102);
+ assertEquals(3, ss.getSampleCount());
+ ss.setEnabled(false);
+ assertEquals(0, ss.getSampleCount());
+ assertEquals(0, ss.getSamples().size());
+
+ ss.setEnabled(true);
+
+ // ---------------- buffer control at the limit (no eviction)
+ ss.bufferMgr().setMaxRetentionCount(3);
+ ss.accept(100);
+ ss.accept(101);
+ ss.accept(102);
+ assertEquals(3, ss.getSampleCount());
+ samples = ss.getSamples();
+ assertEquals(100, samples.get(0).tuple().intValue());
+ assertEquals(101, samples.get(1).tuple().intValue());
+ assertEquals(102, samples.get(2).tuple().intValue());
+ assertEquals(0, ss.getSampleCount());
+ assertEquals(0, ss.getSamples().size());
+
+ // ---------------- buffer control with eviction
+ ss.bufferMgr().setMaxRetentionCount(2);
+ ss.accept(100);
+ ss.accept(101);
+ ss.accept(102);
+ assertEquals(2, ss.getSampleCount());
+ samples = ss.getSamples();
+ assertEquals(101, samples.get(0).tuple().intValue());
+ assertEquals(102, samples.get(1).tuple().intValue());
+ assertEquals(0, ss.getSampleCount());
+ assertEquals(0, ss.getSamples().size());
+ ss.bufferMgr().setMaxRetentionCount(10);
+
+ // ---------------- trigger byCount
+ ss.triggerMgr().setByCount(3);
+ ss.accept(100);
+ ss.accept(101);
+ ss.accept(102);
+ ss.accept(103);
+ ss.accept(104);
+ ss.accept(105);
+ ss.accept(106);
+ assertEquals(3, ss.getSampleCount());
+ samples = ss.getSamples();
+ assertEquals(100, samples.get(0).tuple().intValue());
+ assertEquals(103, samples.get(1).tuple().intValue());
+ assertEquals(106, samples.get(2).tuple().intValue());
+
+ // ---------------- trigger byPredicate
+ ss.triggerMgr().setByPredicate(t -> t % 2 == 0);
+ ss.accept(100);
+ ss.accept(101);
+ ss.accept(102);
+ ss.accept(103);
+ ss.accept(104);
+ assertEquals(3, ss.getSampleCount());
+ samples = ss.getSamples();
+ assertEquals(100, samples.get(0).tuple().intValue());
+ assertEquals(102, samples.get(1).tuple().intValue());
+ assertEquals(104, samples.get(2).tuple().intValue());
+
+ // ---------------- trigger byTime
+ ss.triggerMgr().setByTime(100, TimeUnit.MILLISECONDS);
+ ss.accept(100);
+ ss.accept(101);
+ ss.accept(102);
+ Thread.sleep(110);
+ ss.accept(103);
+ ss.accept(104);
+ ss.accept(105);
+ Thread.sleep(110);
+ ss.accept(106);
+ assertEquals(3, ss.getSampleCount());
+ samples = ss.getSamples();
+ assertEquals(100, samples.get(0).tuple().intValue());
+ assertEquals(103, samples.get(1).tuple().intValue());
+ assertEquals(106, samples.get(2).tuple().intValue());
+
+ // ---------------- trigger continuous
+ ss.triggerMgr().setContinuous();
+ ss.accept(100);
+ ss.accept(101);
+ ss.accept(102);
+ assertEquals(3, ss.getSampleCount());
+ samples = ss.getSamples();
+ assertEquals(100, samples.get(0).tuple().intValue());
+ assertEquals(101, samples.get(1).tuple().intValue());
+ assertEquals(102, samples.get(2).tuple().intValue());
+
+ // ---------------- trigger pause
+ ss.accept(100);
+ ss.accept(101);
+ ss.triggerMgr().setPaused(true);
+ assertTrue(ss.triggerMgr().isPaused());
+ ss.accept(102);
+ ss.accept(103);
+ ss.triggerMgr().setPaused(false);
+ assertFalse(ss.triggerMgr().isPaused());
+ ss.accept(104);
+ assertEquals(3, ss.getSampleCount());
+ samples = ss.getSamples();
+ assertEquals(100, samples.get(0).tuple().intValue());
+ assertEquals(101, samples.get(1).tuple().intValue());
+ assertEquals(104, samples.get(2).tuple().intValue());
+
+ // ---------------- trigger pauseOn
+
+ ss.triggerMgr().setPauseOn(t -> t == 102);
+ ss.accept(100);
+ ss.accept(101);
+ ss.accept(102);
+ ss.accept(103);
+ ss.accept(104);
+ ss.accept(105);
+ assertTrue(ss.triggerMgr().isPaused());
+ assertEquals(3, ss.getSampleCount());
+ samples = ss.getSamples();
+ assertEquals(100, samples.get(0).tuple().intValue());
+ assertEquals(101, samples.get(1).tuple().intValue());
+ assertEquals(102, samples.get(2).tuple().intValue());
+ ss.triggerMgr().setPaused(false);
+ ss.accept(1000);
+ ss.accept(1010);
+ ss.accept(102);
+ ss.accept(1030);
+ assertEquals(3, ss.getSampleCount());
+ samples = ss.getSamples();
+ assertEquals(1000, samples.get(0).tuple().intValue());
+ assertEquals(1010, samples.get(1).tuple().intValue());
+ assertEquals(102, samples.get(2).tuple().intValue());
+
+ }
+
+ @Test
+ public void testStreamScopeRegistry() throws Exception {
+
+ StreamScope<Integer> ss1 = new StreamScope<>();
+ StreamScope<Integer> ss2 = new StreamScope<>();
+
+ StreamScopeRegistry rgy = new StreamScopeRegistry();
+
+ assertNotNull(rgy.getNames());
+ assertEquals(0, rgy.getNames().size());
+ assertNotNull(rgy.getStreamScopes());
+ assertEquals(0, rgy.getStreamScopes().size());
+ assertNull(rgy.lookup("xyzzy"));
+ rgy.unregister("xyzzy");
+ rgy.unregister(ss1);
+
+ // ---------- name generation / parse functions
+ String alias1Name = StreamScopeRegistry.nameByStreamAlias("alias1");
+ assertNotNull(alias1Name);
+ String alias2Name = StreamScopeRegistry.nameByStreamAlias("alias2");
+ assertNotNull(alias2Name);
+ assertNotEquals(alias1Name, alias2Name);
+ String alias1 = StreamScopeRegistry.streamAliasFromName(alias1Name);
+ assertEquals("alias1", alias1);
+
+ String id1Name = StreamScopeRegistry.nameByStreamId("id1");
+ assertNotNull(id1Name);
+ String id2Name = StreamScopeRegistry.nameByStreamId("id2");
+ assertNotNull(id2Name);
+ assertNotEquals(id1Name, id2Name);
+ String id1 = StreamScopeRegistry.streamIdFromName(id1Name);
+ assertEquals("id1", id1);
+
+ assertNotEquals(StreamScopeRegistry.nameByStreamAlias("1"),
+ StreamScopeRegistry.nameByStreamId("1"));
+
+ // ---------- register
+ rgy.register(alias1Name, ss1);
+ rgy.register(alias2Name, ss2);
+ rgy.register(id2Name, ss2);
+
+ // ---------- lookup
+ assertSame(ss1, rgy.lookup(alias1Name));
+ assertSame(ss2, rgy.lookup(alias2Name));
+ assertSame(ss2, rgy.lookup(id2Name));
+
+ // ----------- getNames
+ assertEquals(3, rgy.getNames().size());
+ assertTrue(rgy.getNames().contains(alias1Name));
+ assertFalse(rgy.getNames().contains(id1Name));
+ assertTrue(rgy.getNames().contains(alias2Name));
+ assertTrue(rgy.getNames().contains(id2Name));
+
+ // ----------- getStreamScopes
+ assertEquals(2, rgy.getStreamScopes().keySet().size());
+ assertTrue(rgy.getStreamScopes().keySet().contains(ss1));
+ assertTrue(rgy.getStreamScopes().keySet().contains(ss2));
+ assertEquals(1, rgy.getStreamScopes().get(ss1).size());
+ assertTrue(rgy.getStreamScopes().get(ss1).contains(alias1Name));
+ assertEquals(2, rgy.getStreamScopes().get(ss2).size());
+ assertTrue(rgy.getStreamScopes().get(ss2).contains(alias2Name));
+ assertTrue(rgy.getStreamScopes().get(ss2).contains(id2Name));
+
+ // ---------- unregister
+ rgy.unregister(alias1Name);
+ assertNull(rgy.lookup(alias1Name));
+ assertEquals(2, rgy.getNames().size());
+ assertFalse(rgy.getNames().contains(alias1Name));
+ assertFalse(rgy.getStreamScopes().keySet().contains(ss1));
+ assertTrue(rgy.getStreamScopes().keySet().contains(ss2));
+
+ rgy.unregister(alias2Name);
+ assertNull(rgy.lookup(alias2Name));
+ assertEquals(1, rgy.getNames().size());
+ assertFalse(rgy.getNames().contains(alias2Name));
+ assertTrue(rgy.getStreamScopes().keySet().contains(ss2));
+ assertSame(ss2, rgy.lookup(id2Name));
+ rgy.unregister(id2Name);
+ assertEquals(0, rgy.getNames().size());
+ assertEquals(0, rgy.getStreamScopes().keySet().size());
+
+ rgy.register(alias2Name, ss2);
+ rgy.register(id2Name, ss2);
+ rgy.unregister(ss2);
+ assertEquals(0, rgy.getNames().size());
+ assertEquals(0, rgy.getStreamScopes().keySet().size());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a808a728/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
----------------------------------------------------------------------
diff --git a/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java b/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
index 137b936..cd2ca9f 100644
--- a/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
+++ b/providers/development/src/main/java/quarks/providers/development/DevelopmentProvider.java
@@ -30,6 +30,7 @@ import quarks.execution.Job;
import quarks.execution.services.ControlService;
import quarks.graph.Connector;
import quarks.graph.Edge;
+import quarks.graph.Vertex;
import quarks.metrics.Metrics;
import quarks.metrics.MetricsSetup;
import quarks.metrics.oplets.CounterOp;
@@ -38,6 +39,8 @@ import quarks.oplet.core.Peek;
import quarks.providers.direct.DirectProvider;
import quarks.runtime.jmxcontrol.JMXControlService;
import quarks.topology.Topology;
+import quarks.topology.plumbing.StreamScope;
+import quarks.topology.plumbing.StreamScopeRegistry;
/**
* Provider intended for development.
@@ -78,6 +81,9 @@ public class DevelopmentProvider extends DirectProvider {
getServices().addService(ControlService.class,
new JMXControlService(JMX_DOMAIN, new Hashtable<>()));
+
+ getServices().addService(StreamScopeRegistry.class,
+ new StreamScopeRegistry());
HttpServer server = HttpServer.getInstance();
getServices().addService(HttpServer.class, server);
@@ -87,6 +93,7 @@ public class DevelopmentProvider extends DirectProvider {
@Override
public Future<Job> submit(Topology topology, JsonObject config) {
Metrics.counter(topology);
+ addStreamScopes(topology);
duplicateTags(topology);
return super.submit(topology, config);
}
@@ -132,4 +139,34 @@ public class DevelopmentProvider extends DirectProvider {
c.tag(ta);
}
}
+
+ /**
+ * Add StreamScope instances to the topology
+ * @param t the Topology
+ */
+ private void addStreamScopes(Topology t) {
+ StreamScopeRegistry rgy = (StreamScopeRegistry)
+ t.getRuntimeServiceSupplier().get()
+ .getService(StreamScopeRegistry.class);
+ if (rgy == null)
+ return;
+
+ t.graph().peekAll(
+ () -> {
+ StreamScope<?> streamScope = new StreamScope<>();
+ Peek<?> peekOp = new quarks.oplet.plumbing.StreamScope<>(streamScope);
+ registerStreamScope(rgy, peekOp, streamScope);
+ return peekOp;
+ },
+ (Vertex<?, ?, ?> v) -> !(v.getInstance() instanceof quarks.oplet.core.FanOut));
+ }
+
+ private int hackCount = 0; // TODO temp hack to enable some test development
+ private void registerStreamScope(StreamScopeRegistry rgy, Peek<?> peekOp, StreamScope<?> streamScope) {
+ hackCount++;
+ String id = "oplet-"+ hackCount; // TODO get from peekOp's source oport <opletId>.oport.<n>
+ String alias = "streamAlias-"+ hackCount; // TODO get from peekOp's source oport context
+ rgy.register(StreamScopeRegistry.nameByStreamAlias(alias), streamScope);
+ rgy.register(StreamScopeRegistry.nameByStreamId(id), streamScope);
+ }
}