You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 14:03:28 UTC
[9/50] [abbrv] git commit: Functional fluent API but not tested.
Functional fluent API but not tested.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/21ff6dc4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/21ff6dc4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/21ff6dc4
Branch: refs/heads/piper
Commit: 21ff6dc45938608b184f2a62a336784127a48f97
Parents: 5903890
Author: Leo Neumeyer <le...@s4.io>
Authored: Fri Dec 16 09:52:54 2011 -0800
Committer: Leo Neumeyer <le...@s4.io>
Committed: Fri Dec 16 09:52:54 2011 -0800
----------------------------------------------------------------------
.../main/java/org/apache/s4/appmaker/AppMaker.java | 71 +++-
.../main/java/org/apache/s4/appmaker/PEMaker.java | 19 +
.../java/org/apache/s4/appmaker/StreamMaker.java | 21 +-
.../src/main/java/org/apache/s4/core/App.java | 4 +-
.../java/org/apache/s4/core/ProcessingElement.java | 11 +-
.../main/java/org/apache/s4/fluent/AppMaker.java | 320 +++++++++++++++
.../main/java/org/apache/s4/fluent/BaseApp.java | 25 ++
.../main/java/org/apache/s4/fluent/FluentApp.java | 27 ++
.../main/java/org/apache/s4/fluent/PEMaker.java | 246 +++++++++++
.../java/org/apache/s4/fluent/StreamMaker.java | 157 +++++++
.../test/java/org/apache/s4/appmaker/MyApp.java | 15 +-
.../src/test/java/org/apache/s4/appmaker/PEY.java | 18 +
.../src/test/java/org/apache/s4/appmaker/PEZ.java | 34 ++
.../java/org/apache/s4/fluent/AppMakerTest.java | 33 ++
.../org/apache/s4/fluent/DurationKeyFinder.java | 19 +
.../src/test/java/org/apache/s4/fluent/EventA.java | 39 ++
.../src/test/java/org/apache/s4/fluent/EventB.java | 24 ++
.../java/org/apache/s4/fluent/HeightKeyFinder.java | 19 +
.../src/test/java/org/apache/s4/fluent/MyApp.java | 48 +++
.../src/test/java/org/apache/s4/fluent/PEX.java | 56 +++
.../src/test/java/org/apache/s4/fluent/PEY.java | 74 ++++
.../src/test/java/org/apache/s4/fluent/PEZ.java | 58 +++
.../java/org/apache/s4/fluent/QueryKeyFinder.java | 19 +
.../org/apache/s4/fluent/counter/AgeKeyFinder.java | 35 ++
.../org/apache/s4/fluent/counter/CountEvent.java | 58 +++
.../apache/s4/fluent/counter/CountKeyFinder.java | 34 ++
.../org/apache/s4/fluent/counter/CounterPE.java | 79 ++++
.../apache/s4/fluent/counter/GenderKeyFinder.java | 35 ++
.../s4/fluent/counter/GenerateUserEventPE.java | 71 ++++
.../java/org/apache/s4/fluent/counter/Module.java | 107 +++++
.../java/org/apache/s4/fluent/counter/MyApp.java | 147 +++++++
.../java/org/apache/s4/fluent/counter/PrintPE.java | 44 ++
.../java/org/apache/s4/fluent/counter/README.md | 17 +
.../org/apache/s4/fluent/counter/UserEvent.java | 68 +++
.../apache/s4/fluent/counter/UserIDKeyFinder.java | 35 ++
.../test/resources/s4-counter-example.properties | 7 +
36 files changed, 2074 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/AppMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/AppMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/AppMaker.java
index ead530d..938029f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/AppMaker.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/AppMaker.java
@@ -1,16 +1,20 @@
package org.apache.s4.appmaker;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import org.apache.s4.base.Event;
import org.apache.s4.core.App;
import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.fluent.FluentApp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
+import com.google.inject.AbstractModule;
/**
* A fluent API to build S4 applications.
@@ -51,6 +55,7 @@ abstract public class AppMaker {
/* Use multi-maps to save the graph. */
private Multimap<PEMaker, StreamMaker> pe2stream = LinkedListMultimap.create();
private Multimap<StreamMaker, PEMaker> stream2pe = LinkedListMultimap.create();
+ private List<Element> order = Lists.newLinkedList();
/**
* Configure the application.
@@ -72,7 +77,9 @@ abstract public class AppMaker {
}
protected PEMaker addPE(Class<? extends ProcessingElement> type) {
- return new PEMaker(this, type);
+ PEMaker pe = new PEMaker(this, type);
+ order.add(new Element(pe, null));
+ return pe;
}
/**
@@ -83,13 +90,45 @@ abstract public class AppMaker {
*
* @return a stream maker.
*/
- protected StreamMaker addStream(Class<? extends Event> type) {
-
- return new StreamMaker(this, type);
-
+ protected StreamMaker addStream(String propName, Class<? extends Event> type) {
+ StreamMaker stream = new StreamMaker(this, propName, type);
+ order.add(new Element(null, stream));
+ return stream;
}
App make() {
+
+ App app = null;
+
+ /* Build the graph using the same order as configured in AppMaker. */
+ for (Element element : order) {
+
+ if (element.pe != null) {
+ /* Create a PE. */
+ ProcessingElement pe = app.createPE(element.pe.getType());
+
+ } else {
+ /* Create a stream. */
+
+ }
+ }
+
+ Map<PEMaker, Collection<StreamMaker>> pe2streamMap = pe2stream.asMap();
+ for (Map.Entry<PEMaker, Collection<StreamMaker>> entry : pe2streamMap.entrySet()) {
+ // sb.append(entry.getKey() + ": ");
+ for (StreamMaker sm : entry.getValue()) {
+ // sb.append(sm + " ");
+ }
+ }
+
+ Map<StreamMaker, Collection<PEMaker>> stream2peMap = stream2pe.asMap();
+ for (Map.Entry<StreamMaker, Collection<PEMaker>> entry : stream2peMap.entrySet()) {
+ // sb.append(entry.getKey() + ": ");
+ for (PEMaker pm : entry.getValue()) {
+ // sb.append(pm + " ");
+ }
+ }
+
return null;
}
@@ -123,4 +162,26 @@ abstract public class AppMaker {
return sb.toString();
}
+
+ class Element {
+
+ PEMaker pe;
+ StreamMaker stream;
+
+ Element(PEMaker pe, StreamMaker stream) {
+ this.pe = pe;
+ this.stream = stream;
+ }
+
+ }
+
+ class Module extends AbstractModule {
+
+ @Override
+ protected void configure() {
+
+ bind(FluentApp.class);
+ bind(PEX.class);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/PEMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/PEMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/PEMaker.java
index c444fda..337bb50 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/PEMaker.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/PEMaker.java
@@ -2,9 +2,12 @@ package org.apache.s4.appmaker;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.s4.base.Event;
import org.apache.s4.core.ProcessingElement;
+import com.google.common.base.Preconditions;
+
/**
* Helper class to add a processing element to an S4 application.
*
@@ -25,7 +28,10 @@ public class PEMaker {
private int cacheMaximumSize = 0;
private long cacheDuration = 0;
+ private PropertiesConfiguration properties = new PropertiesConfiguration();
+
PEMaker(AppMaker app, Class<? extends ProcessingElement> type) {
+ Preconditions.checkNotNull(type);
this.type = type;
this.app = app;
app.add(this, null);
@@ -112,6 +118,12 @@ public class PEMaker {
return this;
}
+ public PEMaker property(String key, Object value) {
+
+ properties.addProperty(key, value);
+ return this;
+ }
+
/**
* Send events from this PE to a stream.
*
@@ -173,4 +185,11 @@ public class PEMaker {
Class<? extends ProcessingElement> getType() {
return type;
}
+
+ /**
+ * @return the properties
+ */
+ PropertiesConfiguration getProperties() {
+ return properties;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/StreamMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/StreamMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/StreamMaker.java
index 5fe2897..e4d89ab 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/StreamMaker.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/StreamMaker.java
@@ -3,6 +3,8 @@ package org.apache.s4.appmaker;
import org.apache.s4.base.Event;
import org.apache.s4.core.KeyFinder;
+import com.google.common.base.Preconditions;
+
/**
* Helper class to add a stream to an S4 application.
*
@@ -12,13 +14,19 @@ import org.apache.s4.core.KeyFinder;
public class StreamMaker {
final private AppMaker app;
- private Class<? extends Event> type;
- private String name = "";
+ final private Class<? extends Event> type;
+ final private String propName; // Must match a property name in a PE class that will receive this stream.
+ private String name;
private KeyFinder<? extends Event> keyFinder;
private String keyDescriptor = null;
- StreamMaker(AppMaker app, Class<? extends Event> type) {
+ StreamMaker(AppMaker app, String propName, Class<? extends Event> type) {
+
+ Preconditions.checkNotNull(propName);
+ Preconditions.checkNotNull(type);
this.app = app;
+ this.propName = propName;
+ this.name = propName; // Default name if one is not specified.
this.type = type;
app.add(null, this);
}
@@ -122,4 +130,11 @@ public class StreamMaker {
return keyDescriptor;
}
+ /**
+ * @return the propName
+ */
+ public String getPropName() {
+ return propName;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 0fea25f..aeffd71 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -287,7 +287,7 @@ public abstract class App {
* the target processing elements
* @return the stream
*/
- protected <T extends Event> Stream<T> createStream(Class<T> type) {
+ public <T extends Event> Stream<T> createStream(Class<T> type) {
return new Stream<T>(this);
}
@@ -299,7 +299,7 @@ public abstract class App {
* the processing element type.
* @return the processing element prototype.
*/
- protected <T extends ProcessingElement> T createPE(Class<T> type) {
+ public <T extends ProcessingElement> T createPE(Class<T> type) {
try {
// TODO: make sure this doesn't crash if PE has a constructor other than with App as argument.
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 0922c47..ef93750 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -274,12 +274,12 @@ abstract public class ProcessingElement implements Cloneable {
}
if (eventType == null) {
- logger.error("Argument null in setTrigger() method is not valid. Trigger not set.");
+ logger.debug("Argument null in setTrigger() method is not valid. Trigger not set.");
return this;
}
if (numEvents < 1) {
- logger.error("Argument numEvents in setTrigger() method must be greater than zero. Trigger not set.");
+ logger.debug("Argument numEvents in setTrigger() method must be greater than zero. Trigger not set.");
return this;
}
@@ -328,8 +328,13 @@ abstract public class ProcessingElement implements Cloneable {
return this;
}
- if (timer != null || interval == 0)
+ if (timer != null) {
timer.cancel();
+ return this;
+ }
+
+ if (interval == 0)
+ return this;
timer = new Timer();
timer.schedule(new OnTimeTask(), 0, timerIntervalInMilliseconds);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
new file mode 100644
index 0000000..9182ee1
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
@@ -0,0 +1,320 @@
+package org.apache.s4.fluent;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.KeyFinder;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+/**
+ * A fluent API to build S4 applications.
+ *
+ * *
+ * <p>
+ * Usage example:
+ *
+ * <pre>
+ *
+ * public class MyApp extends AppMaker {
+ *
+ * @Override
+ * protected void configure() {
+ *
+ * PEMaker pez, pey, pex;
+ *
+ * pez = addPE(PEZ.class);
+ * pez.addTrigger().fireOn(EventA.class).ifInterval(5, TimeUnit.SECONDS);
+ * pez.addCache().ofSize(1000).withDuration(3, TimeUnit.HOURS);
+ *
+ * pey = addPE(PEY.class).property("duration", 4).property("height", 99);
+ * pey.addTimer().withDuration(2, TimeUnit.MINUTES);
+ *
+ * pex = addPE(PEX.class).property("query", "money");
+ * pex.addCache().ofSize(100).withDuration(1, TimeUnit.MINUTES);
+ *
+ * pey.emit(EventA.class).withKey(new DurationKeyFinder()).to(pez);
+ * pex.emit(EventB.class).withKey(new QueryKeyFinder()).to(pez);
+ * pex.emit(EventB.class).withKey(new QueryKeyFinder()).to(pey).to(pez);
+ * }
+ * }
+ *
+ * </pre>
+ */
+abstract public class AppMaker {
+
+ private static final Logger logger = LoggerFactory.getLogger(AppMaker.class);
+
+ /* Use multi-maps to save the graph. */
+ private Multimap<PEMaker, StreamMaker> pe2stream = LinkedListMultimap.create();
+ private Multimap<StreamMaker, PEMaker> stream2pe = LinkedListMultimap.create();
+
+ final private App app;
+
+ AppMaker() {
+ this.app = new BaseApp();
+ }
+
+ /**
+ * Configure the application.
+ */
+ abstract protected void configure();
+
+ /* Used internally to build the graph. */
+ void add(PEMaker pem, StreamMaker stream) {
+
+ pe2stream.put(pem, stream);
+ logger.debug("Adding pe [{}] to stream [{}].", pem != null ? pem.getType().getName() : "null",
+ stream != null ? stream.getName() : "null");
+ }
+
+ /* Used internally to build the graph. */
+ void add(StreamMaker stream, PEMaker pem) {
+
+ stream2pe.put(stream, pem);
+ logger.debug("Adding stream [{}] to pe [{}].", stream != null ? stream.getName() : "null", pem != null ? pem
+ .getType().getName() : "null");
+ }
+
+ protected PEMaker addPE(Class<? extends ProcessingElement> type) {
+ PEMaker pe = new PEMaker(this, type);
+ return pe;
+ }
+
+ /**
+ * Add a stream.
+ *
+ * @param eventType
+ * the type of events emitted by this PE.
+ *
+ * @return a stream maker.
+ */
+ protected StreamMaker addStream(Class<? extends Event> type) {
+ StreamMaker stream = new StreamMaker(this, type);
+ return stream;
+ }
+
+ App make() throws Exception {
+
+ /* Loop PEMaker objects to create PEs. */
+ for (PEMaker key : pe2stream.keySet()) {
+ if (key != null) {
+ key.setPe(makePE(key, key.getType()));
+ }
+
+ }
+ /* Loop StreamMaker objects to create Streams. */
+ for (StreamMaker key : stream2pe.keySet()) {
+ if (key != null) {
+ key.setStream(makeStream(key, key.getType()));
+ }
+ }
+
+ /* PE to Stream wiring. */
+ Set<PEMaker> done = Sets.newHashSet();
+ Map<PEMaker, Collection<StreamMaker>> pe2streamMap = pe2stream.asMap();
+ for (Map.Entry<PEMaker, Collection<StreamMaker>> entry : pe2streamMap.entrySet()) {
+ PEMaker pm = entry.getKey();
+ for (StreamMaker sm : entry.getValue()) {
+ if (pm != null && sm != null && !done.contains(pm)) {
+ done.add(pm);
+ setStreamField(pm.getPe(), sm.getStream(), sm.getType());
+ }
+ }
+ }
+
+ /* Stream to PE wiring. */
+ Map<StreamMaker, Collection<PEMaker>> stream2peMap = stream2pe.asMap();
+ for (Map.Entry<StreamMaker, Collection<PEMaker>> entry : stream2peMap.entrySet()) {
+ StreamMaker sm = entry.getKey();
+ for (PEMaker pm : entry.getValue()) {
+ if (pm != null && sm != null) {
+ sm.getStream().setPE(pm.getPe());
+ }
+ }
+ }
+
+ return app;
+ }
+
+ /* So the magic to create a Stream from a StreamMaker. */
+ @SuppressWarnings("unchecked")
+ private <T extends Event> Stream<T> makeStream(StreamMaker sm, Class<T> type) {
+
+ Stream<T> stream = app.createStream(type);
+ stream.setName(sm.getName());
+ stream.setKey((KeyFinder<T>) sm.getKeyFinder()); // TODO: how do we make this safe?
+ return stream;
+ }
+
+ /* Do the magic to create a PE from a PEMaker. */
+ private <T extends ProcessingElement> T makePE(PEMaker pem, Class<T> type) throws NoSuchFieldException,
+ IllegalAccessException {
+ T pe = app.createPE(type);
+ pe.setPECache(pem.getCacheMaximumSize(), pem.getCacheDuration(), TimeUnit.MILLISECONDS);
+ pe.setTimerInterval(pem.getTimerInterval(), TimeUnit.MILLISECONDS);
+ pe.setTrigger(pem.getTriggerEventType(), pem.getTriggerNumEvents(), pem.getTriggerInterval(),
+ TimeUnit.MILLISECONDS);
+
+ /* Use introspection to match properties to class fields. */
+ setPEAttributes(pe, pem, type);
+ return pe;
+ }
+
+ private <T extends ProcessingElement> void setPEAttributes(T pe, PEMaker pem, Class<T> type)
+ throws NoSuchFieldException, IllegalAccessException {
+
+ PropertiesConfiguration properties = pem.getProperties();
+ @SuppressWarnings("unchecked")
+ Iterator<String> iter = properties.getKeys();
+
+ while (iter.hasNext()) {
+ String property = iter.next();
+ logger.debug("Adding property [{}] to PE of type [{}].", property, type.getName());
+ setField(property, pe, pem, type);
+ }
+ }
+
+ private <T extends ProcessingElement> void setField(String fieldName, T pe, PEMaker pm, Class<T> type)
+ throws NoSuchFieldException, IllegalAccessException {
+ try {
+ Field f = type.getDeclaredField(fieldName);
+ f.setAccessible(true);
+ logger.trace("Type: {}.", f.getType());
+ logger.trace("GenericType: {}.", f.getGenericType());
+
+ /* Set the field. */
+ if (f.getType().getCanonicalName() == "long") {
+ f.setLong(pe, pm.getProperties().getLong(fieldName));
+ return;
+ } else if (f.getType().getCanonicalName() == "int") {
+ f.setInt(pe, pm.getProperties().getInt(fieldName));
+ return;
+ } else if (f.getType().getCanonicalName() == "float") {
+ f.setFloat(pe, pm.getProperties().getFloat(fieldName));
+ return;
+ } else if (f.getType().getCanonicalName() == "double") {
+ f.setDouble(pe, pm.getProperties().getDouble(fieldName));
+ return;
+ } else if (f.getType().getCanonicalName() == "short") {
+ f.setShort(pe, pm.getProperties().getShort(fieldName));
+ return;
+ } else if (f.getType().getCanonicalName() == "byte") {
+ f.setByte(pe, pm.getProperties().getByte(fieldName));
+ return;
+ } else if (f.getType().getCanonicalName() == "boolean") {
+ f.setBoolean(pe, pm.getProperties().getBoolean(fieldName));
+ return;
+ } else if (f.getType().getCanonicalName() == "char") {
+ f.setChar(pe, (char) pm.getProperties().getByte(fieldName));
+ return;
+ } else if (f.getType().getCanonicalName() == "java.lang.String") {
+ f.set(pe, pm.getProperties().getString(fieldName));
+ return;
+ }
+
+ logger.error("Unable to set field named [{}] in PE of type [{}].", fieldName, type);
+ throw new IllegalArgumentException();
+
+ // production code should handle these exceptions more gracefully
+ } catch (NoSuchFieldException e) {
+ logger.error("There is no field named [{}] in PE of type [{}].", fieldName, type);
+ throw e;
+ } catch (IllegalArgumentException e) {
+ logger.error("Couldn't set value for field [{}] in PE of type [{}].", fieldName, type);
+ throw e;
+ }
+ }
+
+ /* We need to set stream fields in PE classes. We will infer the field by checking the Event parameter type. */
+ private <P extends ProcessingElement> void setStreamField(P pe, Stream<? extends Event> stream,
+ Class<? extends Event> eventType) throws Exception {
+
+ Field[] fields = pe.getClass().getDeclaredFields();
+ String fieldName = "";
+ Set<String> eventTypes = Sets.newHashSet();
+ for (Field field : fields) {
+ if (field.getType() == Stream.class) {
+
+ fieldName = field.getName();
+ if (field.getGenericType().toString().endsWith("<" + eventType.getCanonicalName() + ">")) {
+
+ /* Sanity check. This AOI does not support more than one stream field with the same event type. */
+ if (eventTypes.contains(field.getGenericType().toString())) {
+ logger.error(
+ "There is more than one stream field in PE [{}] for event type [{}]. The fluent API only supports one stream field per event type.",
+ pe.getClass().getName(), eventType.getCanonicalName());
+ }
+
+ eventTypes.add(field.getGenericType().toString());
+ logger.debug("Stream field [" + fieldName + "] in PE [" + pe.getClass().getCanonicalName()
+ + "] matches event type: [" + eventType.getCanonicalName() + "].");
+
+ /* Assign stream field. */
+ field.setAccessible(true);
+ field.set(pe, stream);
+ }
+ }
+ }
+
+ }
+
+ static private String toString(PEMaker pm) {
+ return pm != null ? pm.getType().getName() + " " : "null ";
+ }
+
+ static private String toString(StreamMaker sm) {
+ return sm != null ? sm.getName() + " " : "null ";
+ }
+
+ /**
+ * A printable representation of the application graph.
+ *
+ * @return the application graph.
+ */
+ public String toString() {
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("\nApplication Graph for " + this.getClass().getCanonicalName() + "\n");
+ Map<PEMaker, Collection<StreamMaker>> pe2streamMap = pe2stream.asMap();
+ for (Map.Entry<PEMaker, Collection<StreamMaker>> entry : pe2streamMap.entrySet()) {
+ sb.append(toString(entry.getKey()) + "=> ");
+ for (StreamMaker sm : entry.getValue()) {
+ sb.append(toString(sm));
+ }
+ sb.append("\n");
+ }
+
+ Map<StreamMaker, Collection<PEMaker>> stream2peMap = stream2pe.asMap();
+ for (Map.Entry<StreamMaker, Collection<PEMaker>> entry : stream2peMap.entrySet()) {
+ sb.append(toString(entry.getKey()) + "=> ");
+ for (PEMaker pm : entry.getValue()) {
+ sb.append(toString(pm));
+ }
+ sb.append("\n");
+ }
+
+ return sb.toString();
+
+ }
+
+ abstract protected void onStart();
+
+ abstract protected void onInit();
+
+ abstract protected void onClose();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/fluent/BaseApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/BaseApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/BaseApp.java
new file mode 100644
index 0000000..a6d559f
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/BaseApp.java
@@ -0,0 +1,25 @@
+package org.apache.s4.fluent;
+
+import org.apache.s4.core.App;
+
+public class BaseApp extends App {
+
+ @Override
+ protected void onStart() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onInit() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onClose() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
new file mode 100644
index 0000000..7f801b6
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
@@ -0,0 +1,27 @@
+package org.apache.s4.fluent;
+
+import org.apache.s4.core.App;
+
+public class FluentApp extends App {
+
+ final private AppMaker appMaker;
+
+ public FluentApp(AppMaker appMaker) {
+ this.appMaker = appMaker;
+ }
+
+ @Override
+ protected void onStart() {
+ appMaker.onStart();
+ }
+
+ @Override
+ protected void onInit() {
+ appMaker.onInit();
+ }
+
+ @Override
+ protected void onClose() {
+ appMaker.onClose();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/fluent/PEMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/PEMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/PEMaker.java
new file mode 100644
index 0000000..0f568ad
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/PEMaker.java
@@ -0,0 +1,246 @@
+package org.apache.s4.fluent;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Event;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Helper class to add a processing element to an S4 application.
+ *
+ * @see example {@link S4Maker}
+ *
+ */
+public class PEMaker {
+
+ private static final Logger logger = LoggerFactory.getLogger(PEMaker.class);
+
+ final private Class<? extends ProcessingElement> type;
+ final private AppMaker app;
+ private ProcessingElement pe = null;
+
+ private long timerInterval = 0;
+
+ private long triggerInterval = 0;
+ private Class<? extends Event> triggerEventType = null;
+ private int triggerNumEvents = 0;
+
+ private int cacheMaximumSize = 0;
+ private long cacheDuration = 0;
+
+ private PropertiesConfiguration properties = new PropertiesConfiguration();
+
+ PEMaker(AppMaker app, Class<? extends ProcessingElement> type) {
+ Preconditions.checkNotNull(type);
+ this.type = type;
+ this.app = app;
+ app.add(this, null);
+ }
+
+ /**
+ * Configure the PE expiration and cache size.
+ * <p>
+ * PE instances will be automatically removed from the cache once a fixed duration has elapsed after the PEs
+ * creation, or last access.
+ * <p>
+ * Least accessed PEs will automatically be removed from the cache when the number of PEs approaches maximumSize.
+ * <p>
+ * When this method is called all existing PE instances are destroyed.
+ *
+ *
+ * @param maximumSize
+ * the approximate maximum number of PEs in the cache.
+ * @param duration
+ * the PE duration
+ * @param timeUnit
+ * the time unit
+ * @return the PEMaker
+ */
+ public CacheMaker addCache() {
+
+ return new CacheMaker();
+ }
+
+ /**
+ * Configure a trigger that is fired when the following conditions occur:
+ *
+ * <ul>
+ * <li>An event of eventType arrived to the PE instance
+ * <li>numEvents have arrived since the last time this trigger was fired -OR- time since last event is greater than
+ * interval.
+ * </ul>
+ *
+ * <p>
+ * When the trigger fires, the method <tt>trigger(EventType event)</tt> is called. Where <tt>EventType</tt> matches
+ * the argument eventType.
+ *
+ * @param eventType
+ * the type of event on which this trigger will fire.
+ * @param numEvents
+ * number of events since last trigger activation. Must be greater than zero. (Set to one to trigger on
+ * every input event.)
+ * @param interval
+ * minimum time between triggers. Set to zero if no time interval needed.
+ * @param timeUnit
+ * the TimeUnit for the argument interval. Can set to null if no time interval needed.
+ * @return the PEMaker
+ */
+ public TriggerMaker addTrigger() {
+
+ return new TriggerMaker();
+ }
+
+ /**
+ * Set a timer that calls {@link ProcessingElement#onTime()}.
+ *
+ * If {@code interval==0} the timer is disabled.
+ *
+ * @param interval
+ * in timeUnit
+ * @param timeUnit
+ * the timeUnit of interval
+ * @return the PEMaker
+ */
+ public TimerMaker addTimer() {
+ return new TimerMaker();
+ }
+
+ public StreamMaker emit(Class<? extends Event> type) {
+
+ logger.debug("PE [{}] emits event of type [{}].", this.getType().getName(), type.getCanonicalName());
+ StreamMaker stream = new StreamMaker(app, type);
+ app.add(this, stream);
+ return stream;
+ }
+
+ public PEMaker withKey(String key) {
+
+ return this;
+ }
+
+ public PEMaker with(String key, Object value) {
+
+ properties.addProperty(key, value);
+ return this;
+ }
+
+ /**
+ * @return the timerInterval
+ */
+ long getTimerInterval() {
+ return timerInterval;
+ }
+
+ /**
+ * @return the triggerInterval
+ */
+ long getTriggerInterval() {
+ return triggerInterval;
+ }
+
+ /**
+ * @return the triggerEventType
+ */
+ Class<? extends Event> getTriggerEventType() {
+ return triggerEventType;
+ }
+
+ /**
+ * @return the triggerNumEvents
+ */
+ int getTriggerNumEvents() {
+ return triggerNumEvents;
+ }
+
+ /**
+ * @return the cacheMaximumSize
+ */
+ int getCacheMaximumSize() {
+ return cacheMaximumSize;
+ }
+
+ /**
+ * @return the cacheDuration
+ */
+ long getCacheDuration() {
+ return cacheDuration;
+ }
+
+ /**
+ * @return the type
+ */
+ Class<? extends ProcessingElement> getType() {
+ return type;
+ }
+
+ /**
+ * @return the properties
+ */
+ PropertiesConfiguration getProperties() {
+ return properties;
+ }
+
+ /**
+ * @return the pe
+ */
+ public ProcessingElement getPe() {
+ return pe;
+ }
+
+ /**
+ * @param pe
+ * the pe to set
+ */
+ public void setPe(ProcessingElement pe) {
+ this.pe = pe;
+ }
+
+ public class TriggerMaker {
+
+ public TriggerMaker fireOn(Class<? extends Event> eventType) {
+
+ triggerEventType = eventType;
+ return this;
+ }
+
+ public TriggerMaker ifNumEvents(int numEvents) {
+
+ triggerNumEvents = numEvents;
+ return this;
+ }
+
+ public TriggerMaker ifInterval(long interval, TimeUnit timeUnit) {
+
+ if (timeUnit != null)
+ triggerInterval = timeUnit.convert(interval, TimeUnit.MILLISECONDS);
+ return this;
+ }
+ }
+
+ public class CacheMaker {
+
+ public CacheMaker ofSize(int maxSize) {
+ cacheMaximumSize = maxSize;
+ return this;
+ }
+
+ public CacheMaker withDuration(long duration, TimeUnit timeUnit) {
+ cacheDuration = timeUnit.convert(duration, TimeUnit.MILLISECONDS);
+ return this;
+ }
+ }
+
+ public class TimerMaker {
+
+ public TimerMaker withDuration(long duration, TimeUnit timeUnit) {
+ timerInterval = TimeUnit.MILLISECONDS.convert(duration, timeUnit);
+ timerInterval = duration;
+ return this;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/fluent/StreamMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/StreamMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/StreamMaker.java
new file mode 100644
index 0000000..d90c493
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/StreamMaker.java
@@ -0,0 +1,157 @@
+package org.apache.s4.fluent;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.KeyFinder;
+import org.apache.s4.core.Stream;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Helper class to add a stream to an S4 application.
+ *
+ * @see example {@link S4Maker}
+ *
+ */
+public class StreamMaker {
+
+ final private AppMaker app;
+ final private Class<? extends Event> type;
+ private String name = null;
+ private KeyFinder<? extends Event> keyFinder;
+ private String keyDescriptor = null;
+ private Stream<? extends Event> stream = null;
+
+ StreamMaker(AppMaker app, Class<? extends Event> type) {
+
+ Preconditions.checkNotNull(type);
+ this.app = app;
+ this.type = type;
+ app.add(null, this);
+ }
+
+ /**
+ * Name the stream.
+ *
+ * @param name
+ * the stream name, default is an empty string.
+ * @return the stream maker object
+ */
+ public StreamMaker withName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ /**
+ * Define the key finder for this stream.
+ *
+ * @param keyFinder
+ * a function to lookup the value of the key.
+ * @return the stream maker.
+ */
+ public <T extends Event> StreamMaker onKey(KeyFinder<T> keyFinder) {
+ this.keyFinder = keyFinder;
+ return this;
+ }
+
+ /**
+ * Define the key finder for this stream using a descriptor.
+ *
+ * @param keyFinderString
+ * a descriptor to lookup the value of the key.
+ * @return the stream maker.
+ */
+ public StreamMaker withKey(String keyDescriptor) {
+
+ this.keyDescriptor = keyDescriptor;
+ return this;
+ }
+
+ /**
+ * Send events from this stream to a PE.
+ *
+ * @param pe
+ * a target PE.
+ *
+ * @return the stream maker.
+ */
+ public StreamMaker to(PEMaker pe) {
+ app.add(this, pe);
+ return this;
+ }
+
+ /**
+ * Send events from this stream to various PEs.
+ *
+ * @param pe
+ * a target PE array.
+ *
+ * @return the stream maker.
+ */
+ public StreamMaker to(PEMaker[] pes) {
+ for (int i = 0; i < pes.length; i++)
+ app.add(this, pes[i]);
+ return this;
+ }
+
+ /**
+ * @return the app
+ */
+ AppMaker getApp() {
+ return app;
+ }
+
+ /**
+ * @return the type
+ */
+ Class<? extends Event> getType() {
+ return type;
+ }
+
+ /**
+ * @return the name
+ */
+ String getName() {
+
+ if (name != null) {
+ return name;
+ } else {
+
+ String key;
+ if (keyDescriptor != null) {
+ key = keyDescriptor;
+ return type.getCanonicalName() + "-" + key;
+ } else {
+ return type.getCanonicalName();
+ }
+ }
+ }
+
+ /**
+ * @return the keyFinder
+ */
+ KeyFinder<? extends Event> getKeyFinder() {
+ return keyFinder;
+ }
+
+ /**
+ * @return the keyDescriptor
+ */
+ String getKeyDescriptor() {
+ return keyDescriptor;
+ }
+
+ /**
+ * @return the stream
+ */
+ public Stream<? extends Event> getStream() {
+ return stream;
+ }
+
+ /**
+ * @param stream
+ * the stream to set
+ */
+ public void setStream(Stream<? extends Event> stream) {
+ this.stream = stream;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/MyApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/MyApp.java
index 5fbe2c7..778931e 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/MyApp.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/MyApp.java
@@ -5,20 +5,21 @@ public class MyApp extends AppMaker {
@Override
protected void configure() {
- PEMaker pe1, pe2;
+ PEMaker pez, pey;
StreamMaker s1;
StreamMaker s2, s3;
- pe1 = addPE(PEZ.class);
+ pez = addPE(PEZ.class);
- s1 = addStream(EventA.class).withName("My first stream.").withKey("{gender}").to(pe1);
+ s1 = addStream("stream1", EventA.class).withName("My first stream.").withKey("{gender}").to(pez);
- pe2 = addPE(PEY.class).to(s1);
+ pey = addPE(PEY.class).to(s1).property("duration", 4).property("height", 99);
- s2 = addStream(EventB.class).withName("My second stream.").withKey("{age}").to(pe2);
+ s2 = addStream("stream2", EventB.class).withName("My second stream.").withKey("{age}").to(pey).to(pez);
- s3 = addStream(EventB.class).withName("My third stream.").withKey("{height}").to(pe2);
+ s3 = addStream("stream3", EventB.class).withKey("{height}").to(pey);
- addPE(PEX.class).to(s2).to(s3);
+ addPE(PEX.class).to(s2).to(s3).property("keyword", "money");
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEY.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEY.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEY.java
index 16d951a..1db0525 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEY.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEY.java
@@ -1,9 +1,12 @@
package org.apache.s4.appmaker;
import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
public class PEY extends ProcessingElement {
+ private Stream<EventB> stream3;
+
@Override
protected void onCreate() {
// TODO Auto-generated method stub
@@ -16,4 +19,19 @@ public class PEY extends ProcessingElement {
}
+ /**
+ * @return the stream3
+ */
+ Stream<EventB> getStream3() {
+ return stream3;
+ }
+
+ /**
+ * @param stream3
+ * the stream3 to set
+ */
+ void setStream3(Stream<EventB> stream3) {
+ this.stream3 = stream3;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEZ.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEZ.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEZ.java
index e893754..9ac3761 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEZ.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEZ.java
@@ -1,9 +1,43 @@
package org.apache.s4.appmaker;
import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
public class PEZ extends ProcessingElement {
+ private Stream<EventA> stream1;
+ private Stream<EventB> stream2;
+
+ /**
+ * @return the stream1
+ */
+ Stream<EventA> getStream1() {
+ return stream1;
+ }
+
+ /**
+ * @param stream1
+ * the stream1 to set
+ */
+ void setStream1(Stream<EventA> stream1) {
+ this.stream1 = stream1;
+ }
+
+ /**
+ * @return the stream2
+ */
+ Stream<EventB> getStream2() {
+ return stream2;
+ }
+
+ /**
+ * @param stream2
+ * the stream2 to set
+ */
+ void setStream2(Stream<EventB> stream2) {
+ this.stream2 = stream2;
+ }
+
@Override
protected void onCreate() {
// TODO Auto-generated method stub
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/AppMakerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/AppMakerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/AppMakerTest.java
new file mode 100644
index 0000000..365f987
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/AppMakerTest.java
@@ -0,0 +1,33 @@
+package org.apache.s4.fluent;
+
+import java.lang.reflect.Field;
+
+import org.junit.Test;
+
+public class AppMakerTest {
+
+ @Test
+ public void test() throws Exception {
+
+ MyApp myApp = new MyApp();
+ myApp.configure();
+ System.out.println(myApp.toString());
+ myApp.make();
+ }
+
+ @Test
+ public void testReflection() {
+
+ try {
+ Class<?> c = PEY.class;
+ Field f = c.getDeclaredField("duration");
+ System.out.format("Type: %s%n", f.getType());
+ System.out.format("GenericType: %s%n", f.getGenericType());
+
+ // production code should handle these exceptions more gracefully
+ } catch (NoSuchFieldException x) {
+ x.printStackTrace();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/DurationKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/DurationKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/DurationKeyFinder.java
new file mode 100644
index 0000000..a62a9f1
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/DurationKeyFinder.java
@@ -0,0 +1,19 @@
+package org.apache.s4.fluent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class DurationKeyFinder implements KeyFinder<EventA> {
+
+ public List<String> get(EventA event) {
+
+ List<String> results = new ArrayList<String>();
+
+ /* Retrieve the gender and add it to the list. */
+ results.add(Long.toString(event.getDuration()));
+
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventA.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventA.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventA.java
new file mode 100644
index 0000000..ccf37e2
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventA.java
@@ -0,0 +1,39 @@
+package org.apache.s4.fluent;
+
+import org.apache.s4.base.Event;
+
+public class EventA extends Event {
+
+ private long duration;
+ private int height;
+
+ /**
+ * @return the duration
+ */
+ public long getDuration() {
+ return duration;
+ }
+
+ /**
+ * @param duration
+ * the duration to set
+ */
+ public void setDuration(long duration) {
+ this.duration = duration;
+ }
+
+ /**
+ * @return the height
+ */
+ public int getHeight() {
+ return height;
+ }
+
+ /**
+ * @param height the height to set
+ */
+ public void setHeight(int height) {
+ this.height = height;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventB.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventB.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventB.java
new file mode 100644
index 0000000..3e285e3
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventB.java
@@ -0,0 +1,24 @@
+package org.apache.s4.fluent;
+
+import org.apache.s4.base.Event;
+
+public class EventB extends Event {
+
+ private String query;
+
+ /**
+ * @return the query
+ */
+ public String getQuery() {
+ return query;
+ }
+
+ /**
+ * @param query
+ * the query to set
+ */
+ public void setQuery(String query) {
+ this.query = query;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/HeightKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/HeightKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/HeightKeyFinder.java
new file mode 100644
index 0000000..daad556
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/HeightKeyFinder.java
@@ -0,0 +1,19 @@
+package org.apache.s4.fluent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class HeightKeyFinder implements KeyFinder<EventA> {
+
+ public List<String> get(EventA event) {
+
+ List<String> results = new ArrayList<String>();
+
+ /* Retrieve the gender and add it to the list. */
+ results.add(Integer.toString(event.getHeight()));
+
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/MyApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/MyApp.java
new file mode 100644
index 0000000..bf97eb6
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/MyApp.java
@@ -0,0 +1,48 @@
+package org.apache.s4.fluent;
+
+import java.util.concurrent.TimeUnit;
+
+public class MyApp extends AppMaker {
+
+ @Override
+ protected void configure() {
+
+ PEMaker pez, pey, pex;
+
+ /* Configure processing element pez. */
+ pez = addPE(PEZ.class);
+ pez.addTrigger().fireOn(EventA.class).ifInterval(5, TimeUnit.SECONDS);
+ pez.addCache().ofSize(1000).withDuration(3, TimeUnit.HOURS);
+
+ /* Configure processing element pey. */
+ pey = addPE(PEY.class).with("duration", 4).with("height", 99);
+ pey.addTimer().withDuration(2, TimeUnit.MINUTES);
+
+ /* Configure processing element pex. */
+ pex = addPE(PEX.class).with("query", "money");
+ pex.addCache().ofSize(100).withDuration(1, TimeUnit.MINUTES);
+
+ /* Construct the graph. */
+ pey.emit(EventA.class).onKey(new DurationKeyFinder()).to(pez);
+ pex.emit(EventB.class).onKey(new QueryKeyFinder()).to(pez);
+ pex.emit(EventB.class).onKey(new QueryKeyFinder()).to(pey).to(pez);
+ }
+
+ @Override
+ protected void onStart() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onInit() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onClose() {
+ // TODO Auto-generated method stub
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEX.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEX.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEX.java
new file mode 100644
index 0000000..891cd40
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEX.java
@@ -0,0 +1,56 @@
+package org.apache.s4.fluent;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+
+public class PEX extends ProcessingElement {
+
+ private String query;
+ private Stream<EventB> someStream;
+
+ public PEX(App app) {
+ super(app);
+ }
+
+ @Override
+ protected void onCreate() {
+
+ }
+
+ @Override
+ protected void onRemove() {
+
+ }
+
+ /**
+ * @return the keyword
+ */
+ String getKeyword() {
+ return query;
+ }
+
+ /**
+ * @param query
+ * the keyword to set
+ */
+ void setKeyword(String query) {
+ this.query = query;
+ }
+
+ /**
+ * @return the someStream
+ */
+ public Stream<EventB> getSomeStream() {
+ return someStream;
+ }
+
+ /**
+ * @param someStream
+ * the someStream to set
+ */
+ public void setSomeStream(Stream<EventB> someStream) {
+ this.someStream = someStream;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEY.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEY.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEY.java
new file mode 100644
index 0000000..b175657
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEY.java
@@ -0,0 +1,74 @@
+package org.apache.s4.fluent;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+
+public class PEY extends ProcessingElement {
+
+ private Stream<EventA> stream3;
+ private int height;
+ private long duration;
+
+ public PEY(App app) {
+ super(app);
+ }
+
+ @Override
+ protected void onCreate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onRemove() {
+ // TODO Auto-generated method stub
+
+ }
+
+ /**
+ * @return the stream3
+ */
+ Stream<EventA> getStream3() {
+ return stream3;
+ }
+
+ /**
+ * @param stream3
+ * the stream3 to set
+ */
+ void setStream3(Stream<EventA> stream3) {
+ this.stream3 = stream3;
+ }
+
+ /**
+ * @return the height
+ */
+ int getHeight() {
+ return height;
+ }
+
+ /**
+ * @param height
+ * the height to set
+ */
+ void setHeight(int height) {
+ this.height = height;
+ }
+
+ /**
+ * @return the duration
+ */
+ long getDuration() {
+ return duration;
+ }
+
+ /**
+ * @param duration
+ * the duration to set
+ */
+ void setDuration(long duration) {
+ this.duration = duration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEZ.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEZ.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEZ.java
new file mode 100644
index 0000000..e2f362c
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEZ.java
@@ -0,0 +1,58 @@
+package org.apache.s4.fluent;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+
+public class PEZ extends ProcessingElement {
+
+ private Stream<EventA> stream1;
+ private Stream<EventB> stream2;
+
+ public PEZ(App app) {
+ super(app);
+ }
+
+ /**
+ * @return the stream1
+ */
+ Stream<EventA> getStream1() {
+ return stream1;
+ }
+
+ /**
+ * @param stream1
+ * the stream1 to set
+ */
+ void setStream1(Stream<EventA> stream1) {
+ this.stream1 = stream1;
+ }
+
+ /**
+ * @return the stream2
+ */
+ Stream<EventB> getStream2() {
+ return stream2;
+ }
+
+ /**
+ * @param stream2
+ * the stream2 to set
+ */
+ void setStream2(Stream<EventB> stream2) {
+ this.stream2 = stream2;
+ }
+
+ @Override
+ protected void onCreate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onRemove() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/QueryKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/QueryKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/QueryKeyFinder.java
new file mode 100644
index 0000000..5979a23
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/QueryKeyFinder.java
@@ -0,0 +1,19 @@
+package org.apache.s4.fluent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class QueryKeyFinder implements KeyFinder<EventB> {
+
+ public List<String> get(EventB event) {
+
+ List<String> results = new ArrayList<String>();
+
+ /* Retrieve the gender and add it to the list. */
+ results.add(event.getQuery());
+
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/AgeKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/AgeKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/AgeKeyFinder.java
new file mode 100644
index 0000000..a45468d
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/AgeKeyFinder.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.fluent.counter;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class AgeKeyFinder implements KeyFinder<UserEvent> {
+
+ public List<String> get(UserEvent event) {
+
+ List<String> results = new ArrayList<String>();
+
+ /* Retrieve the age and add it to the list. */
+ results.add(Integer.toString(event.getAge()));
+
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CountEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CountEvent.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CountEvent.java
new file mode 100644
index 0000000..3768875
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CountEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.fluent.counter;
+
+import org.apache.s4.base.Event;
+
+public class CountEvent extends Event {
+
+ private String key;
+ private long count;
+
+ public CountEvent() {
+
+ }
+
+ CountEvent(String key, long count) {
+ this.key = key;
+ this.count = count;
+ }
+
+ CountEvent(String key, long count, long time) {
+ super(time);
+ this.key = key;
+ this.count = count;
+ }
+
+
+ /**
+ * @return the key
+ */
+ public String getKey() {
+ return key;
+ }
+
+ /**
+ * @return the count
+ */
+ public long getCount() {
+ return count;
+ }
+
+ public String toString() {
+ return String.format("Key: " + key + ", Count: %08d", count);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CountKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CountKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CountKeyFinder.java
new file mode 100644
index 0000000..6337fe1
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CountKeyFinder.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.fluent.counter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class CountKeyFinder implements KeyFinder<CountEvent> {
+
+ public List<String> get(CountEvent event) {
+
+ List<String> results = new ArrayList<String>();
+
+ /* Retrieve the user ID and add it to the list. */
+ results.add(event.getKey());
+
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CounterPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CounterPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CounterPE.java
new file mode 100644
index 0000000..f9e939a
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CounterPE.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.fluent.counter;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+
+
+public class CounterPE extends ProcessingElement {
+
+ private Stream<CountEvent> countStream = null;
+
+ public CounterPE(App app) {
+ super(app);
+ }
+
+ /**
+ * @return the countStream
+ */
+ public Stream<CountEvent> getCountStream() {
+ return countStream;
+ }
+
+ /**
+ * @param countStream
+ * the countStream to set
+ */
+ public void setCountStream(Stream<CountEvent> countStream) {
+ this.countStream = countStream;
+ }
+
+ private long counter = 0;
+
+ public void onEvent(Event event) {
+
+ counter += 1;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see io.s4.ProcessingElement#sendOutputEvent()
+ */
+ public void onTrigger(Event event) {
+
+ CountEvent countEvent = new CountEvent(getId(), counter);
+ countStream.put(countEvent);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see io.s4.ProcessingElement#init()
+ */
+ @Override
+ protected void onCreate() {
+
+ }
+
+ @Override
+ protected void onRemove() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/GenderKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/GenderKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/GenderKeyFinder.java
new file mode 100644
index 0000000..17111c7
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/GenderKeyFinder.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.fluent.counter;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class GenderKeyFinder implements KeyFinder<UserEvent> {
+
+ public List<String> get(UserEvent event) {
+
+ List<String> results = new ArrayList<String>();
+
+ /* Retrieve the gender and add it to the list. */
+ results.add(Character.toString(event.getGender()));
+
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/GenerateUserEventPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/GenerateUserEventPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/GenerateUserEventPE.java
new file mode 100644
index 0000000..b85c4e7
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/GenerateUserEventPE.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.fluent.counter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.SingletonPE;
+import org.apache.s4.core.Stream;
+
+
+public class GenerateUserEventPE extends SingletonPE {
+
+ static String userIds[] = { "pepe", "jose", "tito", "mr_smith", "joe" };
+ static int[] ages = { 25, 2, 33, 6, 67 };
+ static char[] genders = { 'f', 'm' };
+ private Stream<UserEvent>[] targetStreams;
+ final private Random generator = new Random(22);
+
+ public GenerateUserEventPE(App app) {
+ super(app);
+ }
+
+ /**
+ * @param targetStreams the {@link UserEvent} streams.
+ */
+ public void setStreams(Stream<UserEvent>... targetStreams){
+ this.targetStreams = targetStreams;
+ }
+
+ public void onTrigger(Event event) {
+ List<String> favorites = new ArrayList<String>();
+ favorites.add("dulce de leche");
+ favorites.add("strawberry");
+
+ int indexUserID = generator.nextInt(userIds.length);
+ int indexAge = generator.nextInt(ages.length);
+ int indexGender = generator.nextInt(2);
+
+ UserEvent userEvent = new UserEvent(userIds[indexUserID],
+ ages[indexAge], favorites, genders[indexGender]);
+
+ for (int i = 0; i < targetStreams.length; i++) {
+ targetStreams[i].put(userEvent);
+ }
+ }
+
+ @Override
+ protected void onRemove() {
+ }
+
+ static int pickRandom(int numElements) {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/Module.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/Module.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/Module.java
new file mode 100644
index 0000000..a5acc06
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/Module.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.fluent.counter;
+
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+/**
+ * Configures the controller.
+ *
+ * Reads a properties file, provides a {@link Communicator} singleton, and configures Guice bindings.
+ *
+ * @author Leo Neumeyer
+ */
+public class Module extends AbstractModule {
+
+ protected PropertiesConfiguration config = null;
+
+ private void loadProperties(Binder binder) {
+
+ try {
+ InputStream is = this.getClass().getResourceAsStream("/s4-counter-example.properties");
+ config = new PropertiesConfiguration();
+ config.load(is);
+
+ System.out.println(ConfigurationUtils.toString(config));
+ // TODO - validate properties.
+
+ /* Make all properties injectable. Do we need this? */
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ protected void configure() {
+ if (config == null)
+ loadProperties(binder());
+
+ bind(MyApp.class);
+
+ bind(Cluster.class);
+
+ /* Configure static assignment using a configuration file. */
+ bind(Assignment.class).to(AssignmentFromFile.class);
+
+ /* Configure a static cluster topology using a configuration file. */
+ bind(Topology.class).to(TopologyFromFile.class);
+
+ // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
+ // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
+ //
+ // bind(Emitter.class).to(QueueingEmitter.class);
+ // bind(Listener.class).to(QueueingListener.class);
+
+ /* Use the Netty comm layer implementation. */
+ // bind(Emitter.class).to(NettyEmitter.class);
+ // bind(Listener.class).to(NettyListener.class);
+
+ /* Use a simple UDP comm layer implementation. */
+ bind(Emitter.class).to(UDPEmitter.class);
+ bind(Listener.class).to(UDPListener.class);
+
+ /* The hashing function to map keys top partitions. */
+ bind(Hasher.class).to(DefaultHasher.class);
+
+ /* Use Kryo to serialize events. */
+ bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/MyApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/MyApp.java
new file mode 100644
index 0000000..9ea29cf
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/MyApp.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.fluent.counter;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.Receiver;
+import org.apache.s4.core.Sender;
+import org.apache.s4.core.Stream;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+/*
+ * This is a sample application to test a new S4 API.
+ * See README file for details.
+ *
+ * */
+
+final public class MyApp extends App {
+
+ final private int interval = 1;
+ private GenerateUserEventPE generateUserEventPE;
+
+ /*
+ *
+ *
+ * The application graph itself is created in this Class. However, developers may provide tools for creating apps
+ * which will generate the objects.
+ *
+ * IMPORTANT: we create a graph of PE prototypes. The prototype is a class instance that is used as a prototype from
+ * which all PE instance will be created. The prototype itself is not used as an instance. (Except when the PE is of
+ * type Singleton PE). To create a data structure for each PE instance you must do it in the method
+ * ProcessingElement.onCreate().
+ */
+
+ /*
+ * Build the application graph using POJOs. Don't like it? Write a nice tool.
+ *
+ * @see io.s4.App#init()
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void onInit() {
+
+ /* PE that prints counts to console. */
+ PrintPE printPE = createPE(PrintPE.class);
+
+ Stream<CountEvent> userCountStream = createStream(CountEvent.class).setName("User Count Stream")
+ .setKey(new CountKeyFinder()).setPE(printPE);
+
+ Stream<CountEvent> genderCountStream = createStream(CountEvent.class).setName("Gender Count Stream")
+ .setKey(new CountKeyFinder()).setPE(printPE);
+
+ Stream<CountEvent> ageCountStream = createStream(CountEvent.class).setName("Age Count Stream")
+ .setKey(new CountKeyFinder()).setPE(printPE);
+
+ /* PEs that count events by user, gender, and age. */
+ CounterPE userCountPE = createPE(CounterPE.class);// .withTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+ userCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+ userCountPE.setCountStream(userCountStream);
+
+ CounterPE genderCountPE = createPE(CounterPE.class);
+ genderCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+ genderCountPE.setCountStream(genderCountStream);
+
+ CounterPE ageCountPE = createPE(CounterPE.class);
+ ageCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+ ageCountPE.setCountStream(ageCountStream);
+
+ /* Streams that output user events keyed on user, gender, and age. */
+ Stream<UserEvent> userStream = createStream(UserEvent.class).setName("User Stream")
+ .setKey(new UserIDKeyFinder()).setPE(userCountPE);
+
+ Stream<UserEvent> genderStream = createStream(UserEvent.class).setName("Gender Stream")
+ .setKey(new GenderKeyFinder()).setPE(genderCountPE);
+
+ Stream<UserEvent> ageStream = createStream(UserEvent.class).setName("Age Stream").setKey(new AgeKeyFinder())
+ .setPE(ageCountPE);
+
+ generateUserEventPE = createPE(GenerateUserEventPE.class);
+ generateUserEventPE.setStreams(userStream, genderStream, ageStream);
+ }
+
+ /*
+ * Create and send 200 dummy events of type UserEvent.
+ *
+ * @see io.s4.App#start()
+ */
+ @Override
+ protected void onStart() {
+
+ for (int i = 0; i < 200; i++) {
+ generateUserEventPE.onTrigger(null);
+ }
+
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ System.out.println("Done. Wait until the main app closes.");
+ // close();
+ }
+
+ @Override
+ protected void onClose() {
+ System.out.println("Bye.");
+
+ }
+
+ public static void main(String[] args) {
+
+ Injector injector = Guice.createInjector(new Module());
+ MyApp myApp = injector.getInstance(MyApp.class);
+ Sender sender = injector.getInstance(Sender.class);
+ Receiver receiver = injector.getInstance(Receiver.class);
+ myApp.setCommLayer(sender, receiver);
+ myApp.init();
+ myApp.start();
+
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ myApp.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/PrintPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/PrintPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/PrintPE.java
new file mode 100644
index 0000000..dc3a25d
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/PrintPE.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.fluent.counter;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+
+
+public class PrintPE extends ProcessingElement {
+
+ public PrintPE(App app) {
+ super(app);
+ }
+
+ public void onEvent(Event event) {
+
+ System.out.println(event.toString());
+ }
+
+ @Override
+ protected void onCreate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onRemove() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/README.md
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/README.md b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/README.md
new file mode 100644
index 0000000..72e0a3d
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/README.md
@@ -0,0 +1,17 @@
+S4 Counter Example
+==================
+
+In this example we do the following:
+
+- Generate dummy events (UserEvent).
+- Key events by user, gender, age.
+- Count by user, gender, age.
+- Print partial counts.
+
+The following diagram shows the application graph:
+
+![S4 Counter](https://github.com/leoneu/s4-piper/raw/master/etc/s4-counter-example.png)
+
+In in following diagram I show how Classes, PE Prototypes, PE instances, and Streams are related.
+
+![S4 Objects](https://github.com/leoneu/s4-piper/raw/master/etc/s4-objects-example.png)
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/UserEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/UserEvent.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/UserEvent.java
new file mode 100644
index 0000000..e5bf67d
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/UserEvent.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.fluent.counter;
+
+
+import java.util.List;
+
+import org.apache.s4.base.Event;
+
+public class UserEvent extends Event {
+
+ private String userID;
+ private int age;
+ private char gender;
+ private List<String> favorites;
+
+ public UserEvent() {
+
+ }
+
+ UserEvent(String userID, int age, List<String> favorites, char gender) {
+ this.userID = userID;
+ this.age = age;
+ this.favorites = favorites;
+ this.gender = gender;
+ }
+
+ /**
+ * @return the userID
+ */
+ public String getUserID() {
+ return userID;
+ }
+
+ /**
+ * @return the age
+ */
+ public int getAge() {
+ return age;
+ }
+
+ /**
+ * @return the favorites
+ */
+ public List<String> getFavorites() {
+ return favorites;
+ }
+
+ /**
+ * @return the gender
+ */
+ public char getGender() {
+ return gender;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/UserIDKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/UserIDKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/UserIDKeyFinder.java
new file mode 100644
index 0000000..b49739c
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/UserIDKeyFinder.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.fluent.counter;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class UserIDKeyFinder implements KeyFinder<UserEvent> {
+
+ public List<String> get(UserEvent event) {
+
+ List<String> results = new ArrayList<String>();
+
+ /* Retrieve the user ID and add it to the list. */
+ results.add(event.getUserID());
+
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/resources/s4-counter-example.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/s4-counter-example.properties b/subprojects/s4-core/src/test/resources/s4-counter-example.properties
new file mode 100644
index 0000000..b60f40a
--- /dev/null
+++ b/subprojects/s4-core/src/test/resources/s4-counter-example.properties
@@ -0,0 +1,7 @@
+comm.queue_emmiter_size = 8000
+comm.queue_listener_size = 8000
+cluster.hosts = localhost
+cluster.ports = 5077
+cluster.lock_dir = /tmp
+
+