You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 14:03:28 UTC
[10/50] [abbrv] git commit: Fluent API to build S4 apps.
Fluent API to build S4 apps.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/59038903
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/59038903
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/59038903
Branch: refs/heads/piper
Commit: 59038903d158b183aa3a1d145aed9807f090fcee
Parents: fb61d6d
Author: Leo Neumeyer <le...@s4.io>
Authored: Tue Dec 13 15:45:35 2011 -0800
Committer: Leo Neumeyer <le...@s4.io>
Committed: Tue Dec 13 15:45:35 2011 -0800
----------------------------------------------------------------------
.../java/org/apache/s4/appbuilder/AppMaker.java | 94 --------
.../main/java/org/apache/s4/appbuilder/EventA.java | 7 -
.../main/java/org/apache/s4/appbuilder/EventB.java | 7 -
.../main/java/org/apache/s4/appbuilder/MyApp.java | 48 ----
.../java/org/apache/s4/appbuilder/PEMaker.java | 37 ---
.../main/java/org/apache/s4/appbuilder/PEX.java | 19 --
.../main/java/org/apache/s4/appbuilder/PEY.java | 19 --
.../main/java/org/apache/s4/appbuilder/PEZ.java | 19 --
.../java/org/apache/s4/appbuilder/StreamMaker.java | 88 -------
.../main/java/org/apache/s4/appmaker/AppMaker.java | 126 +++++++++++
.../main/java/org/apache/s4/appmaker/PEMaker.java | 176 +++++++++++++++
.../java/org/apache/s4/appmaker/StreamMaker.java | 125 ++++++++++
.../src/main/java/org/apache/s4/core/App.java | 15 +-
.../java/org/apache/s4/core/ProcessingElement.java | 8 +-
.../src/main/java/org/apache/s4/core/Stream.java | 10 +-
subprojects/s4-core/src/main/resources/logback.xml | 2 +-
.../java/org/apache/s4/appmaker/AppMakerTest.java | 14 ++
.../test/java/org/apache/s4/appmaker/EventA.java | 7 +
.../test/java/org/apache/s4/appmaker/EventB.java | 7 +
.../test/java/org/apache/s4/appmaker/MyApp.java | 24 ++
.../src/test/java/org/apache/s4/appmaker/PEX.java | 19 ++
.../src/test/java/org/apache/s4/appmaker/PEY.java | 19 ++
.../src/test/java/org/apache/s4/appmaker/PEZ.java | 19 ++
.../test/java/org/apache/s4/core/TriggeredApp.java | 4 +-
.../java/org/apache/s4/example/counter/MyApp.java | 30 ++--
.../java/org/apache/s4/example/model/MyApp.java | 2 +-
26 files changed, 568 insertions(+), 377 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/AppMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/AppMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/AppMaker.java
deleted file mode 100644
index dcee431..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/AppMaker.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package org.apache.s4.appbuilder;
-
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-
-abstract public class AppMaker extends App {
-
- /**
- * NOTES: reflection+guice:
- * <code>http://groups.google.com/group/google-guice/browse_thread/thread/23f4bf986a999e00/73f83a98c288a3e1?lnk=gst&q=binding+api#73f83a98c288a3e1</code>
- */
-
- /**
- * The app graph is stored as follows:
- * <p>
- * PE to Stream
- * <p>
- * PE[1]: S[1,1], S[1,2], ...
- * <p>
- * PE[2]: S[2,1], S[2,2], ...
- * <p>
- * Stream to PE
- * <p>
- * S[1]: PE[1]
- * <p>
- * S[2] : PE[2]
- *
- */
-
- private Multimap<PEMaker, StreamMaker> psGraph = LinkedListMultimap.create();
- private Map<StreamMaker, PEMaker> spGraph = Maps.newHashMap();
-
- void add(PEMaker pem, StreamMaker stream) {
-
- psGraph.put(pem, stream);
- }
-
- void add(StreamMaker stream, PEMaker pem) {
-
- spGraph.put(stream, pem);
- }
-
- public PEMaker addPE(Class<? extends ProcessingElement> type) {
- return new PEMaker(this, type);
- }
-
- /**
- * Add a stream.
- *
- * @param eventType
- * the type of events emitted by this PE.
- *
- * @return a stream maker.
- */
- public StreamMaker addStream(Class<? extends Event> type) {
-
- return new StreamMaker(this, type);
-
- }
-
- public App make() {
- return null;
- }
-
- public String toString() {
-
- StringBuilder sb = new StringBuilder();
- Map<PEMaker, Collection<StreamMaker>> psMap = psGraph.asMap();
-
- for (Map.Entry<PEMaker, Collection<StreamMaker>> entry : psMap.entrySet()) {
- sb.append(entry.getKey() + ": ");
- for (StreamMaker sm : entry.getValue()) {
- sb.append(sm + " ");
- }
- sb.append("\n");
- }
-
- for (Map.Entry<StreamMaker, PEMaker> entry : spGraph.entrySet()) {
- sb.append(entry.getKey() + ": " + entry.getValue());
- sb.append("\n");
- }
-
- return sb.toString();
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventA.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventA.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventA.java
deleted file mode 100644
index c5c7414..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventA.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.s4.appbuilder;
-
-import org.apache.s4.base.Event;
-
-public class EventA extends Event {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventB.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventB.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventB.java
deleted file mode 100644
index 3feb90f..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/EventB.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.s4.appbuilder;
-
-import org.apache.s4.base.Event;
-
-public class EventB extends Event {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/MyApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/MyApp.java
deleted file mode 100644
index 5f3af49..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/MyApp.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.s4.appbuilder;
-
-public class MyApp extends AppMaker {
-
- public static void main(String[] args) {
-
- MyApp myApp = new MyApp();
- myApp.init();
- System.out.println(myApp.toString());
- }
-
- @Override
- protected void start() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- protected void onInit() {
- PEMaker pem1, pem2;
- StreamMaker s1;
- StreamMaker s2, s3;
-
- pem1 = addPE(PEZ.class);
-
- s1 = addStream(EventA.class).withName("My first stream.").withKey("{gender}").to(pem1);
-
- pem2 = addPE(PEY.class).to(s1);
-
- s2 = addStream(EventB.class).withName("My second stream.").withKey("{age}").to(pem2);
-
- s3 = addStream(EventB.class).withName("My third stream.").withKey("{height}").to(pem2);
-
- addPE(PEX.class).to(s2).to(s3);
- }
-
- @Override
- protected void onStart() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- protected void onClose() {
- // TODO Auto-generated method stub
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEMaker.java
deleted file mode 100644
index 58a8c6c..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEMaker.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.s4.appbuilder;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.ProcessingElement;
-
-public class PEMaker {
-
- private Class<? extends ProcessingElement> type;
- private AppMaker appMaker;
-
- /* Only package classes can instantiate this class. */
- PEMaker(AppMaker appMaker, Class<? extends ProcessingElement> type) {
- this.type = type;
- this.appMaker = appMaker;
- }
-
- public PEMaker withTrigger(Class<? extends Event> eventType, int numEvents, long interval, TimeUnit timeUnit) {
-
- return this;
- }
-
- public PEMaker withTimerInterval(long interval, TimeUnit timeUnit) {
-
- return this;
- }
-
- public <T extends Event> PEMaker to(StreamMaker stream) {
- appMaker.add(this, stream);
- return this;
- }
-
- public ProcessingElement make() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEX.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEX.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEX.java
deleted file mode 100644
index 7df2d29..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEX.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.s4.appbuilder;
-
-import org.apache.s4.core.ProcessingElement;
-
-public class PEX extends ProcessingElement {
-
- @Override
- protected void onCreate() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- protected void onRemove() {
- // TODO Auto-generated method stub
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEY.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEY.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEY.java
deleted file mode 100644
index 35e4fbe..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEY.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.s4.appbuilder;
-
-import org.apache.s4.core.ProcessingElement;
-
-public class PEY extends ProcessingElement {
-
- @Override
- protected void onCreate() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- protected void onRemove() {
- // TODO Auto-generated method stub
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEZ.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEZ.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEZ.java
deleted file mode 100644
index cbd0dd9..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/PEZ.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.s4.appbuilder;
-
-import org.apache.s4.core.ProcessingElement;
-
-public class PEZ extends ProcessingElement {
-
- @Override
- protected void onCreate() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- protected void onRemove() {
- // TODO Auto-generated method stub
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/StreamMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/StreamMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/StreamMaker.java
deleted file mode 100644
index 596634b..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appbuilder/StreamMaker.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package org.apache.s4.appbuilder;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.KeyFinder;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Stream;
-
-public class StreamMaker {
-
- private String name = "";
- private KeyFinder<?> keyFinder = null;
- private String keyFinderString;
- private PEMaker pem;
- private Class<? extends Event> type;
- private AppMaker appMaker;
-
- /* Only package classes can instantiate this class. */
- StreamMaker(AppMaker appMaker, Class<? extends Event> type) {
- this.type = type;
- this.appMaker = appMaker;
- }
-
- /**
- * Name the stream.
- *
- * @param name
- * the stream name, default is an empty string.
- * @return the stream maker object
- */
- public StreamMaker withName(String name) {
- this.name = name;
- return this;
- }
-
- /**
- * Define the key finder for this stream.
- *
- * @param keyFinder
- * a function to lookup the value of the key.
- * @return the stream maker object
- */
- public StreamMaker withKey(KeyFinder<?> keyFinder) {
- this.keyFinder = keyFinder;
- this.keyFinderString = null;
- return this;
- }
-
- /**
- * Define the key finder for this stream using a descriptor.
- *
- * @param keyFinderString
- * a descriptor to lookup the value of the key.
- * @return the stream maker object
- */
- public StreamMaker withKey(String keyFinderString) {
- this.keyFinder = null;
- this.keyFinderString = keyFinderString;
- return this;
- }
-
- /**
- * Define the key finder for this stream using a descriptor.
- *
- * @param keyFinderString
- * a descriptor to lookup the value of the key.
- * @return the stream maker object
- */
- public StreamMaker to(PEMaker pem) {
- appMaker.add(this, pem);
- this.pem = pem;
- return this;
- }
-
- private Stream<? extends Event> getStream() {
-
- // Stream stream = new Stream(appMaker, name, getKeyFinder(), getProcessingElements());
- return null;// stream;
- }
-
- private KeyFinder<?> getKeyFinder() {
-
- return null;
- }
-
- private ProcessingElement[] getProcessingElements() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/AppMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/AppMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/AppMaker.java
new file mode 100644
index 0000000..ead530d
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/AppMaker.java
@@ -0,0 +1,126 @@
+package org.apache.s4.appmaker;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * A fluent API to build S4 applications.
+ *
+ * *
+ * <p>
+ * Usage example:
+ *
+ * <pre>
+ * public class MyApp extends AppMaker {
+ *
+ * @Override
+ * void configure() {
+ *
+ * PEMaker pe1, pe2;
+ * StreamMaker s1;
+ * StreamMaker s2, s3;
+ *
+ * pe1 = addPE(PEZ.class);
+ *
+ * s1 = addStream(EventA.class).withName("My first stream.").withKey("{gender}").to(pe1);
+ *
+ * pe2 = addPE(PEY.class).to(s1);
+ *
+ * s2 = addStream(EventB.class).withName("My second stream.").withKey("{age}").to(pe2);
+ *
+ * s3 = addStream(EventB.class).withName("My third stream.").withKey("{height}").to(pe2);
+ *
+ * addPE(PEX.class).to(s2).to(s3);
+ * }
+ * }
+ * </pre>
+ */
+abstract public class AppMaker {
+
+ private static final Logger logger = LoggerFactory.getLogger(AppMaker.class);
+
+ /* Use multi-maps to save the graph. */
+ private Multimap<PEMaker, StreamMaker> pe2stream = LinkedListMultimap.create();
+ private Multimap<StreamMaker, PEMaker> stream2pe = LinkedListMultimap.create();
+
+ /**
+ * Configure the application.
+ */
+ abstract protected void configure();
+
+ /* Used internally to build the graph. */
+ void add(PEMaker pem, StreamMaker stream) {
+
+ pe2stream.put(pem, stream);
+ logger.trace("Adding pe [{}] to stream [{}].", pem, stream);
+ }
+
+ /* Used internally to build the graph. */
+ void add(StreamMaker stream, PEMaker pem) {
+
+ stream2pe.put(stream, pem);
+ logger.trace("Adding stream [{}] to pe [{}].", stream, pem);
+ }
+
+ protected PEMaker addPE(Class<? extends ProcessingElement> type) {
+ return new PEMaker(this, type);
+ }
+
+ /**
+ * Add a stream.
+ *
+ * @param eventType
+ * the type of events emitted by this PE.
+ *
+ * @return a stream maker.
+ */
+ protected StreamMaker addStream(Class<? extends Event> type) {
+
+ return new StreamMaker(this, type);
+
+ }
+
+ App make() {
+ return null;
+ }
+
+ /**
+ * A printable representation of the application graph.
+ *
+ * @return the application graph.
+ */
+ public String toString() {
+
+ StringBuilder sb = new StringBuilder();
+
+ Map<PEMaker, Collection<StreamMaker>> pe2streamMap = pe2stream.asMap();
+ for (Map.Entry<PEMaker, Collection<StreamMaker>> entry : pe2streamMap.entrySet()) {
+ sb.append(entry.getKey() + ": ");
+ for (StreamMaker sm : entry.getValue()) {
+ sb.append(sm + " ");
+ }
+ sb.append("\n");
+ }
+
+ Map<StreamMaker, Collection<PEMaker>> stream2peMap = stream2pe.asMap();
+ for (Map.Entry<StreamMaker, Collection<PEMaker>> entry : stream2peMap.entrySet()) {
+ sb.append(entry.getKey() + ": ");
+ for (PEMaker pm : entry.getValue()) {
+ sb.append(pm + " ");
+ }
+ sb.append("\n");
+ }
+
+ return sb.toString();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/PEMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/PEMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/PEMaker.java
new file mode 100644
index 0000000..c444fda
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/PEMaker.java
@@ -0,0 +1,176 @@
+package org.apache.s4.appmaker;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.ProcessingElement;
+
+/**
+ * Helper class to add a processing element to an S4 application.
+ *
+ * @see example {@link S4Maker}
+ *
+ */
+public class PEMaker {
+
+ final private Class<? extends ProcessingElement> type;
+ final private AppMaker app;
+
+ private long timerInterval = 0;
+
+ private long triggerInterval = 0;
+ private Class<? extends Event> triggerEventType = null;
+ private int triggerNumEvents = 0;
+
+ private int cacheMaximumSize = 0;
+ private long cacheDuration = 0;
+
+ PEMaker(AppMaker app, Class<? extends ProcessingElement> type) {
+ this.type = type;
+ this.app = app;
+ app.add(this, null);
+ }
+
+ /**
+ * Configure the PE expiration and cache size.
+ * <p>
+ * PE instances will be automatically removed from the cache once a fixed duration has elapsed after the PEs
+ * creation, or last access.
+ * <p>
+ * Least accessed PEs will automatically be removed from the cache when the number of PEs approaches maximumSize.
+ * <p>
+ * When this method is called all existing PE instances are destroyed.
+ *
+ *
+ * @param maximumSize
+ * the approximate maximum number of PEs in the cache.
+ * @param duration
+ * the PE duration
+ * @param timeUnit
+ * the time unit
+ * @return the PEMaker
+ */
+ public PEMaker withPECache(int maximumSize, long duration, TimeUnit timeUnit) {
+
+ cacheMaximumSize = maximumSize;
+ cacheDuration = timeUnit.convert(duration, TimeUnit.MILLISECONDS);
+
+ return this;
+ }
+
+ /**
+ * Configure a trigger that is fired when the following conditions occur:
+ *
+ * <ul>
+ * <li>An event of eventType arrived to the PE instance
+ * <li>numEvents have arrived since the last time this trigger was fired -OR- time since last event is greater than
+ * interval.
+ * </ul>
+ *
+ * <p>
+ * When the trigger fires, the method <tt>trigger(EventType event)</tt> is called. Where <tt>EventType</tt> matches
+ * the argument eventType.
+ *
+ * @param eventType
+ * the type of event on which this trigger will fire.
+ * @param numEvents
+ * number of events since last trigger activation. Must be greater than zero. (Set to one to trigger on
+ * every input event.)
+ * @param interval
+ * minimum time between triggers. Set to zero if no time interval needed.
+ * @param timeUnit
+ * the TimeUnit for the argument interval. Can set to null if no time interval needed.
+ * @return the PEMaker
+ */
+ public PEMaker withTrigger(Class<? extends Event> eventType, int numEvents, long interval, TimeUnit timeUnit) {
+
+ triggerEventType = eventType;
+ triggerNumEvents = numEvents;
+
+ if (timeUnit != null)
+ triggerInterval = timeUnit.convert(interval, TimeUnit.MILLISECONDS);
+
+ return this;
+ }
+
+ /**
+ * Set a timer that calls {@link ProcessingElement#onTime()}.
+ *
+ * If {@code interval==0} the timer is disabled.
+ *
+ * @param interval
+ * in timeUnit
+ * @param timeUnit
+ * the timeUnit of interval
+ * @return the PEMaker
+ */
+ public PEMaker withTimerInterval(long interval, TimeUnit timeUnit) {
+ timerInterval = TimeUnit.MILLISECONDS.convert(interval, timeUnit);
+
+ timerInterval = interval;
+
+ return this;
+ }
+
+ /**
+ * Send events from this PE to a stream.
+ *
+ * @param stream
+ *
+ *
+ * @return the PE maker.
+ */
+ public PEMaker to(StreamMaker stream) {
+ app.add(this, stream);
+ return this;
+ }
+
+ /**
+ * @return the timerInterval
+ */
+ long getTimerInterval() {
+ return timerInterval;
+ }
+
+ /**
+ * @return the triggerInterval
+ */
+ long getTriggerInterval() {
+ return triggerInterval;
+ }
+
+ /**
+ * @return the triggerEventType
+ */
+ Class<? extends Event> getTriggerEventType() {
+ return triggerEventType;
+ }
+
+ /**
+ * @return the triggerNumEvents
+ */
+ int getTriggerNumEvents() {
+ return triggerNumEvents;
+ }
+
+ /**
+ * @return the cacheMaximumSize
+ */
+ int getCacheMaximumSize() {
+ return cacheMaximumSize;
+ }
+
+ /**
+ * @return the cacheDuration
+ */
+ long getCacheDuration() {
+ return cacheDuration;
+ }
+
+ /**
+ * @return the type
+ */
+ Class<? extends ProcessingElement> getType() {
+ return type;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/StreamMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/StreamMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/StreamMaker.java
new file mode 100644
index 0000000..5fe2897
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/StreamMaker.java
@@ -0,0 +1,125 @@
+package org.apache.s4.appmaker;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.KeyFinder;
+
+/**
+ * Helper class to add a stream to an S4 application.
+ *
+ * @see example {@link S4Maker}
+ *
+ */
+public class StreamMaker {
+
+ final private AppMaker app;
+ private Class<? extends Event> type;
+ private String name = "";
+ private KeyFinder<? extends Event> keyFinder;
+ private String keyDescriptor = null;
+
+ StreamMaker(AppMaker app, Class<? extends Event> type) {
+ this.app = app;
+ this.type = type;
+ app.add(null, this);
+ }
+
+ /**
+ * Name the stream.
+ *
+ * @param name
+ * the stream name, default is an empty string.
+ * @return the stream maker object
+ */
+ public StreamMaker withName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ /**
+ * Define the key finder for this stream.
+ *
+ * @param keyFinder
+ * a function to lookup the value of the key.
+ * @return the stream maker.
+ */
+ public <T extends Event> StreamMaker withKey(KeyFinder<T> keyFinder) {
+ this.keyFinder = keyFinder;
+ return this;
+ }
+
+ /**
+ * Define the key finder for this stream using a descriptor.
+ *
+ * @param keyFinderString
+ * a descriptor to lookup the value of the key.
+ * @return the stream maker.
+ */
+ public StreamMaker withKey(String keyDescriptor) {
+
+ this.keyDescriptor = keyDescriptor;
+ return this;
+ }
+
+ /**
+ * Send events from this stream to a PE.
+ *
+ * @param pe
+ * a target PE.
+ *
+ * @return the stream maker.
+ */
+ public StreamMaker to(PEMaker pe) {
+ app.add(this, pe);
+ return this;
+ }
+
+ /**
+ * Send events from this stream to various PEs.
+ *
+ * @param pe
+ * a target PE array.
+ *
+ * @return the stream maker.
+ */
+ public StreamMaker to(PEMaker[] pes) {
+ for (int i = 0; i < pes.length; i++)
+ app.add(this, pes[i]);
+ return this;
+ }
+
+ /**
+ * @return the app
+ */
+ AppMaker getApp() {
+ return app;
+ }
+
+ /**
+ * @return the type
+ */
+ Class<? extends Event> getType() {
+ return type;
+ }
+
+ /**
+ * @return the name
+ */
+ String getName() {
+ return name;
+ }
+
+ /**
+ * @return the keyFinder
+ */
+ KeyFinder<? extends Event> getKeyFinder() {
+ return keyFinder;
+ }
+
+ /**
+ * @return the keyDescriptor
+ */
+ String getKeyDescriptor() {
+ return keyDescriptor;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 9c4ec81..0fea25f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -21,6 +21,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.s4.base.Event;
+import org.apache.s4.core.App.ClockType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +35,7 @@ import com.google.inject.Injector;
/*
* Container base class to hold all processing elements. We will implement administrative methods here.
*/
-public abstract class App extends AbstractModule {
+public abstract class App {
private static final Logger logger = LoggerFactory.getLogger(App.class);
@@ -250,7 +251,7 @@ public abstract class App extends AbstractModule {
protected <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder,
ProcessingElement... processingElements) {
- return new Stream<T>(this).withName(name).withKey(finder).to(processingElements);
+ return new Stream<T>(this).setName(name).setKey(finder).setPEs(processingElements);
}
/**
@@ -267,7 +268,7 @@ public abstract class App extends AbstractModule {
*/
protected <T extends Event> Stream<T> createStream(String name, ProcessingElement... processingElements) {
- return new Stream<T>(this).withName(name).to(processingElements);
+ return new Stream<T>(this).setName(name).setPEs(processingElements);
}
/**
@@ -355,12 +356,4 @@ public abstract class App extends AbstractModule {
+ " \nUsage: java <classpath+params> org.apache.s4.core.App <appClassName> <moduleClassName>");
System.exit(-1);
}
-
- /* Implement Guice abstract method. */
- @Override
- protected void configure() {
- // TODO Auto-generated method stub
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 5314824..0922c47 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -97,7 +97,7 @@ import com.google.common.collect.Maps;
*
*
*/
-public abstract class ProcessingElement implements Cloneable {
+abstract public class ProcessingElement implements Cloneable {
private static final Logger logger = LoggerFactory.getLogger(ProcessingElement.class);
@@ -223,7 +223,7 @@ public abstract class ProcessingElement implements Cloneable {
* the time unit
* @return the PE prototype
*/
- public ProcessingElement withPECache(int maximumSize, long duration, TimeUnit timeUnit) {
+ public ProcessingElement setPECache(int maximumSize, long duration, TimeUnit timeUnit) {
if (!isPrototype) {
logger.error("This method can only be used on the PE prototype. Cache not configured.");
@@ -265,7 +265,7 @@ public abstract class ProcessingElement implements Cloneable {
* the TimeUnit for the argument interval. Can set to null if no time interval needed.
* @return the PE prototype
*/
- public ProcessingElement withTrigger(Class<? extends Event> eventType, int numEvents, long interval,
+ public ProcessingElement setTrigger(Class<? extends Event> eventType, int numEvents, long interval,
TimeUnit timeUnit) {
if (!isPrototype) {
@@ -319,7 +319,7 @@ public abstract class ProcessingElement implements Cloneable {
* the timeUnit of interval
* @return the PE prototype
*/
- public ProcessingElement withTimerInterval(long interval, TimeUnit timeUnit) {
+ public ProcessingElement setTimerInterval(long interval, TimeUnit timeUnit) {
timerIntervalInMilliseconds = TimeUnit.MILLISECONDS.convert(interval, timeUnit);
/* We only allow timers in the PE prototype, not in the instances. */
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index 2d0a3a3..f8cd81c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -103,7 +103,7 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
* the stream name, default is an empty string.
* @return the stream maker object
*/
- public Stream<T> withName(String name) {
+ public Stream<T> setName(String name) {
this.name = name;
return this;
}
@@ -115,7 +115,7 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
* a function to lookup the value of the key.
* @return the stream maker object
*/
- public Stream<T> withKey(KeyFinder<T> keyFinder) {
+ public Stream<T> setKey(KeyFinder<T> keyFinder) {
this.key = new Key<T>(keyFinder, DEFAULT_SEPARATOR);
return this;
}
@@ -127,7 +127,7 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
* a descriptor to lookup the value of the key.
* @return the stream maker object
*/
- public Stream<T> withKey(String keyFinderString) {
+ public Stream<T> setKey(String keyFinderString) {
return this;
}
@@ -140,7 +140,7 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
*
* @return the stream maker object
*/
- public Stream<T> to(ProcessingElement pe) {
+ public Stream<T> setPE(ProcessingElement pe) {
app.addStream(this, pe);
return this;
}
@@ -153,7 +153,7 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
*
* @return the stream maker object
*/
- public Stream<T> to(ProcessingElement[] pes) {
+ public Stream<T> setPEs(ProcessingElement[] pes) {
for (int i = 0; i < pes.length; i++)
app.addStream(this, pes[i]);
return this;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/main/resources/logback.xml
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/resources/logback.xml b/subprojects/s4-core/src/main/resources/logback.xml
index 6b246ee..ea8c85a 100644
--- a/subprojects/s4-core/src/main/resources/logback.xml
+++ b/subprojects/s4-core/src/main/resources/logback.xml
@@ -8,7 +8,7 @@
</encoder>
</appender>
- <root level="info">
+ <root level="debug">
<appender-ref ref="STDOUT" />
</root>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/AppMakerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/AppMakerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/AppMakerTest.java
new file mode 100644
index 0000000..43b4cfb
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/AppMakerTest.java
@@ -0,0 +1,14 @@
+package org.apache.s4.appmaker;
+
+import org.junit.Test;
+
+public class AppMakerTest {
+
+ @Test
+ public void test() {
+
+ MyApp myApp = new MyApp();
+ myApp.configure();
+ System.out.println(myApp.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/EventA.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/EventA.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/EventA.java
new file mode 100644
index 0000000..c6a6325
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/EventA.java
@@ -0,0 +1,7 @@
+package org.apache.s4.appmaker;
+
+import org.apache.s4.base.Event;
+
+public class EventA extends Event {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/EventB.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/EventB.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/EventB.java
new file mode 100644
index 0000000..04ae5e0
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/EventB.java
@@ -0,0 +1,7 @@
+package org.apache.s4.appmaker;
+
+import org.apache.s4.base.Event;
+
+public class EventB extends Event {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/MyApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/MyApp.java
new file mode 100644
index 0000000..5fbe2c7
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/MyApp.java
@@ -0,0 +1,24 @@
+package org.apache.s4.appmaker;
+
+public class MyApp extends AppMaker {
+
+ @Override
+ protected void configure() {
+
+ PEMaker pe1, pe2;
+ StreamMaker s1;
+ StreamMaker s2, s3;
+
+ pe1 = addPE(PEZ.class);
+
+ s1 = addStream(EventA.class).withName("My first stream.").withKey("{gender}").to(pe1);
+
+ pe2 = addPE(PEY.class).to(s1);
+
+ s2 = addStream(EventB.class).withName("My second stream.").withKey("{age}").to(pe2);
+
+ s3 = addStream(EventB.class).withName("My third stream.").withKey("{height}").to(pe2);
+
+ addPE(PEX.class).to(s2).to(s3);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEX.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEX.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEX.java
new file mode 100644
index 0000000..c2e5532
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEX.java
@@ -0,0 +1,19 @@
+package org.apache.s4.appmaker;
+
+import org.apache.s4.core.ProcessingElement;
+
+public class PEX extends ProcessingElement {
+
+ @Override
+ protected void onCreate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onRemove() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEY.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEY.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEY.java
new file mode 100644
index 0000000..16d951a
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEY.java
@@ -0,0 +1,19 @@
+package org.apache.s4.appmaker;
+
+import org.apache.s4.core.ProcessingElement;
+
+public class PEY extends ProcessingElement {
+
+ @Override
+ protected void onCreate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onRemove() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEZ.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEZ.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEZ.java
new file mode 100644
index 0000000..e893754
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEZ.java
@@ -0,0 +1,19 @@
+package org.apache.s4.appmaker;
+
+import org.apache.s4.core.ProcessingElement;
+
+public class PEZ extends ProcessingElement {
+
+ @Override
+ protected void onCreate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onRemove() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggeredApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggeredApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggeredApp.java
index 80207f9..9e6d1e3 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggeredApp.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggeredApp.java
@@ -32,10 +32,10 @@ public class TriggeredApp extends App {
Stream<StringEvent> stream = createStream("stream", new SentenceKeyFinder(), prototype);
switch (TriggerTest.triggerType) {
case COUNT_BASED:
- prototype.withTrigger(Event.class, 1, 0, TimeUnit.SECONDS);
+ prototype.setTrigger(Event.class, 1, 0, TimeUnit.SECONDS);
break;
case TIME_BASED:
- prototype.withTrigger(Event.class, 1, 1, TimeUnit.MILLISECONDS);
+ prototype.setTrigger(Event.class, 1, 1, TimeUnit.MILLISECONDS);
default:
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
index c8586a4..cae9a3b 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
@@ -61,37 +61,37 @@ final public class MyApp extends App {
/* PE that prints counts to console. */
PrintPE printPE = createPE(PrintPE.class);
- Stream<CountEvent> userCountStream = createStream(CountEvent.class).withName("User Count Stream")
- .withKey(new CountKeyFinder()).to(printPE);
+ Stream<CountEvent> userCountStream = createStream(CountEvent.class).setName("User Count Stream")
+ .setKey(new CountKeyFinder()).setPE(printPE);
- Stream<CountEvent> genderCountStream = createStream(CountEvent.class).withName("Gender Count Stream")
- .withKey(new CountKeyFinder()).to(printPE);
+ Stream<CountEvent> genderCountStream = createStream(CountEvent.class).setName("Gender Count Stream")
+ .setKey(new CountKeyFinder()).setPE(printPE);
- Stream<CountEvent> ageCountStream = createStream(CountEvent.class).withName("Age Count Stream")
- .withKey(new CountKeyFinder()).to(printPE);
+ Stream<CountEvent> ageCountStream = createStream(CountEvent.class).setName("Age Count Stream")
+ .setKey(new CountKeyFinder()).setPE(printPE);
/* PEs that count events by user, gender, and age. */
CounterPE userCountPE = createPE(CounterPE.class);// .withTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
- userCountPE.withTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+ userCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
userCountPE.setCountStream(userCountStream);
CounterPE genderCountPE = createPE(CounterPE.class);
- genderCountPE.withTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+ genderCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
genderCountPE.setCountStream(genderCountStream);
CounterPE ageCountPE = createPE(CounterPE.class);
- ageCountPE.withTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+ ageCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
ageCountPE.setCountStream(ageCountStream);
/* Streams that output user events keyed on user, gender, and age. */
- Stream<UserEvent> userStream = createStream(UserEvent.class).withName("User Stream")
- .withKey(new UserIDKeyFinder()).to(userCountPE);
+ Stream<UserEvent> userStream = createStream(UserEvent.class).setName("User Stream")
+ .setKey(new UserIDKeyFinder()).setPE(userCountPE);
- Stream<UserEvent> genderStream = createStream(UserEvent.class).withName("Gender Stream")
- .withKey(new GenderKeyFinder()).to(genderCountPE);
+ Stream<UserEvent> genderStream = createStream(UserEvent.class).setName("Gender Stream")
+ .setKey(new GenderKeyFinder()).setPE(genderCountPE);
- Stream<UserEvent> ageStream = createStream(UserEvent.class).withName("Age Stream").withKey(new AgeKeyFinder())
- .to(ageCountPE);
+ Stream<UserEvent> ageStream = createStream(UserEvent.class).setName("Age Stream").setKey(new AgeKeyFinder())
+ .setPE(ageCountPE);
generateUserEventPE = createPE(GenerateUserEventPE.class);
generateUserEventPE.setStreams(userStream, genderStream, ageStream);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/59038903/subprojects/s4-example/src/main/java/org/apache/s4/example/model/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/model/MyApp.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/model/MyApp.java
index a069513..ee649cb 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/model/MyApp.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/model/MyApp.java
@@ -94,7 +94,7 @@ public class MyApp extends App {
*/
modelPE.setStream(distanceStream, resultStream);
// modelPE.setOutputIntervalInEvents(10); // output every 10 events
- metricsPE.withTimerInterval(outputInterval, timeUnit); // output every 5
+ metricsPE.setTimerInterval(outputInterval, timeUnit); // output every 5
// seconds
// obsStream = new Stream<ObsEvent>(this, "Observation Stream", new
// ClassIDKeyFinder(), modelPE);