You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 14:03:28 UTC

[9/50] [abbrv] git commit: Functional fluent API but not tested.

Functional fluent API but not tested.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/21ff6dc4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/21ff6dc4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/21ff6dc4

Branch: refs/heads/piper
Commit: 21ff6dc45938608b184f2a62a336784127a48f97
Parents: 5903890
Author: Leo Neumeyer <le...@s4.io>
Authored: Fri Dec 16 09:52:54 2011 -0800
Committer: Leo Neumeyer <le...@s4.io>
Committed: Fri Dec 16 09:52:54 2011 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/s4/appmaker/AppMaker.java |   71 +++-
 .../main/java/org/apache/s4/appmaker/PEMaker.java  |   19 +
 .../java/org/apache/s4/appmaker/StreamMaker.java   |   21 +-
 .../src/main/java/org/apache/s4/core/App.java      |    4 +-
 .../java/org/apache/s4/core/ProcessingElement.java |   11 +-
 .../main/java/org/apache/s4/fluent/AppMaker.java   |  320 +++++++++++++++
 .../main/java/org/apache/s4/fluent/BaseApp.java    |   25 ++
 .../main/java/org/apache/s4/fluent/FluentApp.java  |   27 ++
 .../main/java/org/apache/s4/fluent/PEMaker.java    |  246 +++++++++++
 .../java/org/apache/s4/fluent/StreamMaker.java     |  157 +++++++
 .../test/java/org/apache/s4/appmaker/MyApp.java    |   15 +-
 .../src/test/java/org/apache/s4/appmaker/PEY.java  |   18 +
 .../src/test/java/org/apache/s4/appmaker/PEZ.java  |   34 ++
 .../java/org/apache/s4/fluent/AppMakerTest.java    |   33 ++
 .../org/apache/s4/fluent/DurationKeyFinder.java    |   19 +
 .../src/test/java/org/apache/s4/fluent/EventA.java |   39 ++
 .../src/test/java/org/apache/s4/fluent/EventB.java |   24 ++
 .../java/org/apache/s4/fluent/HeightKeyFinder.java |   19 +
 .../src/test/java/org/apache/s4/fluent/MyApp.java  |   48 +++
 .../src/test/java/org/apache/s4/fluent/PEX.java    |   56 +++
 .../src/test/java/org/apache/s4/fluent/PEY.java    |   74 ++++
 .../src/test/java/org/apache/s4/fluent/PEZ.java    |   58 +++
 .../java/org/apache/s4/fluent/QueryKeyFinder.java  |   19 +
 .../org/apache/s4/fluent/counter/AgeKeyFinder.java |   35 ++
 .../org/apache/s4/fluent/counter/CountEvent.java   |   58 +++
 .../apache/s4/fluent/counter/CountKeyFinder.java   |   34 ++
 .../org/apache/s4/fluent/counter/CounterPE.java    |   79 ++++
 .../apache/s4/fluent/counter/GenderKeyFinder.java  |   35 ++
 .../s4/fluent/counter/GenerateUserEventPE.java     |   71 ++++
 .../java/org/apache/s4/fluent/counter/Module.java  |  107 +++++
 .../java/org/apache/s4/fluent/counter/MyApp.java   |  147 +++++++
 .../java/org/apache/s4/fluent/counter/PrintPE.java |   44 ++
 .../java/org/apache/s4/fluent/counter/README.md    |   17 +
 .../org/apache/s4/fluent/counter/UserEvent.java    |   68 +++
 .../apache/s4/fluent/counter/UserIDKeyFinder.java  |   35 ++
 .../test/resources/s4-counter-example.properties   |    7 +
 36 files changed, 2074 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/AppMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/AppMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/AppMaker.java
index ead530d..938029f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/AppMaker.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/AppMaker.java
@@ -1,16 +1,20 @@
 package org.apache.s4.appmaker;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.s4.base.Event;
 import org.apache.s4.core.App;
 import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.fluent.FluentApp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
+import com.google.inject.AbstractModule;
 
 /**
  * A fluent API to build S4 applications.
@@ -51,6 +55,7 @@ abstract public class AppMaker {
     /* Use multi-maps to save the graph. */
     private Multimap<PEMaker, StreamMaker> pe2stream = LinkedListMultimap.create();
     private Multimap<StreamMaker, PEMaker> stream2pe = LinkedListMultimap.create();
+    private List<Element> order = Lists.newLinkedList();
 
     /**
      * Configure the application.
@@ -72,7 +77,9 @@ abstract public class AppMaker {
     }
 
     protected PEMaker addPE(Class<? extends ProcessingElement> type) {
-        return new PEMaker(this, type);
+        PEMaker pe = new PEMaker(this, type);
+        order.add(new Element(pe, null));
+        return pe;
     }
 
     /**
@@ -83,13 +90,45 @@ abstract public class AppMaker {
      * 
      * @return a stream maker.
      */
-    protected StreamMaker addStream(Class<? extends Event> type) {
-
-        return new StreamMaker(this, type);
-
+    protected StreamMaker addStream(String propName, Class<? extends Event> type) {
+        StreamMaker stream = new StreamMaker(this, propName, type);
+        order.add(new Element(null, stream));
+        return stream;
     }
 
     App make() {
+
+        App app = null;
+
+        /* Build the graph using the same order as configured in AppMaker. */
+        for (Element element : order) {
+
+            if (element.pe != null) {
+                /* Create a PE. */
+                ProcessingElement pe = app.createPE(element.pe.getType());
+
+            } else {
+                /* Create a stream. */
+
+            }
+        }
+
+        Map<PEMaker, Collection<StreamMaker>> pe2streamMap = pe2stream.asMap();
+        for (Map.Entry<PEMaker, Collection<StreamMaker>> entry : pe2streamMap.entrySet()) {
+            // sb.append(entry.getKey() + ": ");
+            for (StreamMaker sm : entry.getValue()) {
+                // sb.append(sm + " ");
+            }
+        }
+
+        Map<StreamMaker, Collection<PEMaker>> stream2peMap = stream2pe.asMap();
+        for (Map.Entry<StreamMaker, Collection<PEMaker>> entry : stream2peMap.entrySet()) {
+            // sb.append(entry.getKey() + ": ");
+            for (PEMaker pm : entry.getValue()) {
+                // sb.append(pm + " ");
+            }
+        }
+
         return null;
     }
 
@@ -123,4 +162,26 @@ abstract public class AppMaker {
         return sb.toString();
 
     }
+
+    class Element {
+
+        PEMaker pe;
+        StreamMaker stream;
+
+        Element(PEMaker pe, StreamMaker stream) {
+            this.pe = pe;
+            this.stream = stream;
+        }
+
+    }
+
+    class Module extends AbstractModule {
+
+        @Override
+        protected void configure() {
+
+            bind(FluentApp.class);
+            bind(PEX.class);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/PEMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/PEMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/PEMaker.java
index c444fda..337bb50 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/PEMaker.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/PEMaker.java
@@ -2,9 +2,12 @@ package org.apache.s4.appmaker;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.s4.base.Event;
 import org.apache.s4.core.ProcessingElement;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Helper class to add a processing element to an S4 application.
  * 
@@ -25,7 +28,10 @@ public class PEMaker {
     private int cacheMaximumSize = 0;
     private long cacheDuration = 0;
 
+    private PropertiesConfiguration properties = new PropertiesConfiguration();
+
     PEMaker(AppMaker app, Class<? extends ProcessingElement> type) {
+        Preconditions.checkNotNull(type);
         this.type = type;
         this.app = app;
         app.add(this, null);
@@ -112,6 +118,12 @@ public class PEMaker {
         return this;
     }
 
+    public PEMaker property(String key, Object value) {
+
+        properties.addProperty(key, value);
+        return this;
+    }
+
     /**
      * Send events from this PE to a stream.
      * 
@@ -173,4 +185,11 @@ public class PEMaker {
     Class<? extends ProcessingElement> getType() {
         return type;
     }
+
+    /**
+     * @return the properties
+     */
+    PropertiesConfiguration getProperties() {
+        return properties;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/StreamMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/StreamMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/StreamMaker.java
index 5fe2897..e4d89ab 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/StreamMaker.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/appmaker/StreamMaker.java
@@ -3,6 +3,8 @@ package org.apache.s4.appmaker;
 import org.apache.s4.base.Event;
 import org.apache.s4.core.KeyFinder;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Helper class to add a stream to an S4 application.
  * 
@@ -12,13 +14,19 @@ import org.apache.s4.core.KeyFinder;
 public class StreamMaker {
 
     final private AppMaker app;
-    private Class<? extends Event> type;
-    private String name = "";
+    final private Class<? extends Event> type;
+    final private String propName; // Must match a property name in a PE class that will receive this stream.
+    private String name;
     private KeyFinder<? extends Event> keyFinder;
     private String keyDescriptor = null;
 
-    StreamMaker(AppMaker app, Class<? extends Event> type) {
+    StreamMaker(AppMaker app, String propName, Class<? extends Event> type) {
+
+        Preconditions.checkNotNull(propName);
+        Preconditions.checkNotNull(type);
         this.app = app;
+        this.propName = propName;
+        this.name = propName; // Default name if one is not specified.
         this.type = type;
         app.add(null, this);
     }
@@ -122,4 +130,11 @@ public class StreamMaker {
         return keyDescriptor;
     }
 
+    /**
+     * @return the propName
+     */
+    public String getPropName() {
+        return propName;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 0fea25f..aeffd71 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -287,7 +287,7 @@ public abstract class App {
      *            the target processing elements
      * @return the stream
      */
-    protected <T extends Event> Stream<T> createStream(Class<T> type) {
+    public <T extends Event> Stream<T> createStream(Class<T> type) {
 
         return new Stream<T>(this);
     }
@@ -299,7 +299,7 @@ public abstract class App {
      *            the processing element type.
      * @return the processing element prototype.
      */
-    protected <T extends ProcessingElement> T createPE(Class<T> type) {
+    public <T extends ProcessingElement> T createPE(Class<T> type) {
 
         try {
             // TODO: make sure this doesn't crash if PE has a constructor other than with App as argument.

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 0922c47..ef93750 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -274,12 +274,12 @@ abstract public class ProcessingElement implements Cloneable {
         }
 
         if (eventType == null) {
-            logger.error("Argument null in setTrigger() method is not valid. Trigger not set.");
+            logger.debug("Argument null in setTrigger() method is not valid. Trigger not set.");
             return this;
         }
 
         if (numEvents < 1) {
-            logger.error("Argument numEvents in setTrigger() method must be greater than zero. Trigger not set.");
+            logger.debug("Argument numEvents in setTrigger() method must be greater than zero. Trigger not set.");
             return this;
         }
 
@@ -328,8 +328,13 @@ abstract public class ProcessingElement implements Cloneable {
             return this;
         }
 
-        if (timer != null || interval == 0)
+        if (timer != null) {
             timer.cancel();
+            return this;
+        }
+
+        if (interval == 0)
+            return this;
 
         timer = new Timer();
         timer.schedule(new OnTimeTask(), 0, timerIntervalInMilliseconds);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
new file mode 100644
index 0000000..9182ee1
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
@@ -0,0 +1,320 @@
+package org.apache.s4.fluent;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.KeyFinder;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+/**
+ * A fluent API to build S4 applications.
+ * 
+ * *
+ * <p>
+ * Usage example:
+ * 
+ * <pre>
+ * 
+ * public class MyApp extends AppMaker {
+ * 
+ *     &#064;Override
+ *     protected void configure() {
+ * 
+ *         PEMaker pez, pey, pex;
+ * 
+ *         pez = addPE(PEZ.class);
+ *         pez.addTrigger().fireOn(EventA.class).ifInterval(5, TimeUnit.SECONDS);
+ *         pez.addCache().ofSize(1000).withDuration(3, TimeUnit.HOURS);
+ * 
+ *         pey = addPE(PEY.class).property(&quot;duration&quot;, 4).property(&quot;height&quot;, 99);
+ *         pey.addTimer().withDuration(2, TimeUnit.MINUTES);
+ * 
+ *         pex = addPE(PEX.class).property(&quot;query&quot;, &quot;money&quot;);
+ *         pex.addCache().ofSize(100).withDuration(1, TimeUnit.MINUTES);
+ * 
+ *         pey.emit(EventA.class).withKey(new DurationKeyFinder()).to(pez);
+ *         pex.emit(EventB.class).withKey(new QueryKeyFinder()).to(pez);
+ *         pex.emit(EventB.class).withKey(new QueryKeyFinder()).to(pey).to(pez);
+ *     }
+ * }
+ * 
+ * </pre>
+ */
+abstract public class AppMaker {
+
+    private static final Logger logger = LoggerFactory.getLogger(AppMaker.class);
+
+    /* Use multi-maps to save the graph. */
+    private Multimap<PEMaker, StreamMaker> pe2stream = LinkedListMultimap.create();
+    private Multimap<StreamMaker, PEMaker> stream2pe = LinkedListMultimap.create();
+
+    final private App app;
+
+    AppMaker() {
+        this.app = new BaseApp();
+    }
+
+    /**
+     * Configure the application.
+     */
+    abstract protected void configure();
+
+    /* Used internally to build the graph. */
+    void add(PEMaker pem, StreamMaker stream) {
+
+        pe2stream.put(pem, stream);
+        logger.debug("Adding pe [{}] to stream [{}].", pem != null ? pem.getType().getName() : "null",
+                stream != null ? stream.getName() : "null");
+    }
+
+    /* Used internally to build the graph. */
+    void add(StreamMaker stream, PEMaker pem) {
+
+        stream2pe.put(stream, pem);
+        logger.debug("Adding stream [{}] to pe [{}].", stream != null ? stream.getName() : "null", pem != null ? pem
+                .getType().getName() : "null");
+    }
+
+    protected PEMaker addPE(Class<? extends ProcessingElement> type) {
+        PEMaker pe = new PEMaker(this, type);
+        return pe;
+    }
+
+    /**
+     * Add a stream.
+     * 
+     * @param eventType
+     *            the type of events emitted by this PE.
+     * 
+     * @return a stream maker.
+     */
+    protected StreamMaker addStream(Class<? extends Event> type) {
+        StreamMaker stream = new StreamMaker(this, type);
+        return stream;
+    }
+
+    App make() throws Exception {
+
+        /* Loop PEMaker objects to create PEs. */
+        for (PEMaker key : pe2stream.keySet()) {
+            if (key != null) {
+                key.setPe(makePE(key, key.getType()));
+            }
+
+        }
+        /* Loop StreamMaker objects to create Streams. */
+        for (StreamMaker key : stream2pe.keySet()) {
+            if (key != null) {
+                key.setStream(makeStream(key, key.getType()));
+            }
+        }
+
+        /* PE to Stream wiring. */
+        Set<PEMaker> done = Sets.newHashSet();
+        Map<PEMaker, Collection<StreamMaker>> pe2streamMap = pe2stream.asMap();
+        for (Map.Entry<PEMaker, Collection<StreamMaker>> entry : pe2streamMap.entrySet()) {
+            PEMaker pm = entry.getKey();
+            for (StreamMaker sm : entry.getValue()) {
+                if (pm != null && sm != null && !done.contains(pm)) {
+                    done.add(pm);
+                    setStreamField(pm.getPe(), sm.getStream(), sm.getType());
+                }
+            }
+        }
+
+        /* Stream to PE wiring. */
+        Map<StreamMaker, Collection<PEMaker>> stream2peMap = stream2pe.asMap();
+        for (Map.Entry<StreamMaker, Collection<PEMaker>> entry : stream2peMap.entrySet()) {
+            StreamMaker sm = entry.getKey();
+            for (PEMaker pm : entry.getValue()) {
+                if (pm != null && sm != null) {
+                    sm.getStream().setPE(pm.getPe());
+                }
+            }
+        }
+
+        return app;
+    }
+
+    /* So the magic to create a Stream from a StreamMaker. */
+    @SuppressWarnings("unchecked")
+    private <T extends Event> Stream<T> makeStream(StreamMaker sm, Class<T> type) {
+
+        Stream<T> stream = app.createStream(type);
+        stream.setName(sm.getName());
+        stream.setKey((KeyFinder<T>) sm.getKeyFinder()); // TODO: how do we make this safe?
+        return stream;
+    }
+
+    /* Do the magic to create a PE from a PEMaker. */
+    private <T extends ProcessingElement> T makePE(PEMaker pem, Class<T> type) throws NoSuchFieldException,
+            IllegalAccessException {
+        T pe = app.createPE(type);
+        pe.setPECache(pem.getCacheMaximumSize(), pem.getCacheDuration(), TimeUnit.MILLISECONDS);
+        pe.setTimerInterval(pem.getTimerInterval(), TimeUnit.MILLISECONDS);
+        pe.setTrigger(pem.getTriggerEventType(), pem.getTriggerNumEvents(), pem.getTriggerInterval(),
+                TimeUnit.MILLISECONDS);
+
+        /* Use introspection to match properties to class fields. */
+        setPEAttributes(pe, pem, type);
+        return pe;
+    }
+
+    private <T extends ProcessingElement> void setPEAttributes(T pe, PEMaker pem, Class<T> type)
+            throws NoSuchFieldException, IllegalAccessException {
+
+        PropertiesConfiguration properties = pem.getProperties();
+        @SuppressWarnings("unchecked")
+        Iterator<String> iter = properties.getKeys();
+
+        while (iter.hasNext()) {
+            String property = iter.next();
+            logger.debug("Adding property [{}] to PE of type [{}].", property, type.getName());
+            setField(property, pe, pem, type);
+        }
+    }
+
+    private <T extends ProcessingElement> void setField(String fieldName, T pe, PEMaker pm, Class<T> type)
+            throws NoSuchFieldException, IllegalAccessException {
+        try {
+            Field f = type.getDeclaredField(fieldName);
+            f.setAccessible(true);
+            logger.trace("Type: {}.", f.getType());
+            logger.trace("GenericType: {}.", f.getGenericType());
+
+            /* Set the field. */
+            if (f.getType().getCanonicalName() == "long") {
+                f.setLong(pe, pm.getProperties().getLong(fieldName));
+                return;
+            } else if (f.getType().getCanonicalName() == "int") {
+                f.setInt(pe, pm.getProperties().getInt(fieldName));
+                return;
+            } else if (f.getType().getCanonicalName() == "float") {
+                f.setFloat(pe, pm.getProperties().getFloat(fieldName));
+                return;
+            } else if (f.getType().getCanonicalName() == "double") {
+                f.setDouble(pe, pm.getProperties().getDouble(fieldName));
+                return;
+            } else if (f.getType().getCanonicalName() == "short") {
+                f.setShort(pe, pm.getProperties().getShort(fieldName));
+                return;
+            } else if (f.getType().getCanonicalName() == "byte") {
+                f.setByte(pe, pm.getProperties().getByte(fieldName));
+                return;
+            } else if (f.getType().getCanonicalName() == "boolean") {
+                f.setBoolean(pe, pm.getProperties().getBoolean(fieldName));
+                return;
+            } else if (f.getType().getCanonicalName() == "char") {
+                f.setChar(pe, (char) pm.getProperties().getByte(fieldName));
+                return;
+            } else if (f.getType().getCanonicalName() == "java.lang.String") {
+                f.set(pe, pm.getProperties().getString(fieldName));
+                return;
+            }
+
+            logger.error("Unable to set field named [{}] in PE of type [{}].", fieldName, type);
+            throw new IllegalArgumentException();
+
+            // production code should handle these exceptions more gracefully
+        } catch (NoSuchFieldException e) {
+            logger.error("There is no field named [{}] in PE of type [{}].", fieldName, type);
+            throw e;
+        } catch (IllegalArgumentException e) {
+            logger.error("Couldn't set value for field [{}] in PE of type [{}].", fieldName, type);
+            throw e;
+        }
+    }
+
+    /* We need to set stream fields in PE classes. We will infer the field by checking the Event parameter type. */
+    private <P extends ProcessingElement> void setStreamField(P pe, Stream<? extends Event> stream,
+            Class<? extends Event> eventType) throws Exception {
+
+        Field[] fields = pe.getClass().getDeclaredFields();
+        String fieldName = "";
+        Set<String> eventTypes = Sets.newHashSet();
+        for (Field field : fields) {
+            if (field.getType() == Stream.class) {
+
+                fieldName = field.getName();
+                if (field.getGenericType().toString().endsWith("<" + eventType.getCanonicalName() + ">")) {
+
+                    /* Sanity check. This AOI does not support more than one stream field with the same event type. */
+                    if (eventTypes.contains(field.getGenericType().toString())) {
+                        logger.error(
+                                "There is more than one stream field in PE [{}] for event type [{}]. The fluent API only supports one stream field per event type.",
+                                pe.getClass().getName(), eventType.getCanonicalName());
+                    }
+
+                    eventTypes.add(field.getGenericType().toString());
+                    logger.debug("Stream field [" + fieldName + "] in PE [" + pe.getClass().getCanonicalName()
+                            + "] matches event type: [" + eventType.getCanonicalName() + "].");
+
+                    /* Assign stream field. */
+                    field.setAccessible(true);
+                    field.set(pe, stream);
+                }
+            }
+        }
+
+    }
+
+    static private String toString(PEMaker pm) {
+        return pm != null ? pm.getType().getName() + " " : "null ";
+    }
+
+    static private String toString(StreamMaker sm) {
+        return sm != null ? sm.getName() + " " : "null ";
+    }
+
+    /**
+     * A printable representation of the application graph.
+     * 
+     * @return the application graph.
+     */
+    public String toString() {
+
+        StringBuilder sb = new StringBuilder();
+        sb.append("\nApplication Graph for " + this.getClass().getCanonicalName() + "\n");
+        Map<PEMaker, Collection<StreamMaker>> pe2streamMap = pe2stream.asMap();
+        for (Map.Entry<PEMaker, Collection<StreamMaker>> entry : pe2streamMap.entrySet()) {
+            sb.append(toString(entry.getKey()) + "=> ");
+            for (StreamMaker sm : entry.getValue()) {
+                sb.append(toString(sm));
+            }
+            sb.append("\n");
+        }
+
+        Map<StreamMaker, Collection<PEMaker>> stream2peMap = stream2pe.asMap();
+        for (Map.Entry<StreamMaker, Collection<PEMaker>> entry : stream2peMap.entrySet()) {
+            sb.append(toString(entry.getKey()) + "=> ");
+            for (PEMaker pm : entry.getValue()) {
+                sb.append(toString(pm));
+            }
+            sb.append("\n");
+        }
+
+        return sb.toString();
+
+    }
+
+    abstract protected void onStart();
+
+    abstract protected void onInit();
+
+    abstract protected void onClose();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/fluent/BaseApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/BaseApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/BaseApp.java
new file mode 100644
index 0000000..a6d559f
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/BaseApp.java
@@ -0,0 +1,25 @@
+package org.apache.s4.fluent;
+
+import org.apache.s4.core.App;
+
+public class BaseApp extends App {
+
+    @Override
+    protected void onStart() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onInit() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onClose() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
new file mode 100644
index 0000000..7f801b6
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
@@ -0,0 +1,27 @@
+package org.apache.s4.fluent;
+
+import org.apache.s4.core.App;
+
+public class FluentApp extends App {
+
+    final private AppMaker appMaker;
+
+    public FluentApp(AppMaker appMaker) {
+        this.appMaker = appMaker;
+    }
+
+    @Override
+    protected void onStart() {
+        appMaker.onStart();
+    }
+
+    @Override
+    protected void onInit() {
+        appMaker.onInit();
+    }
+
+    @Override
+    protected void onClose() {
+        appMaker.onClose();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/fluent/PEMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/PEMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/PEMaker.java
new file mode 100644
index 0000000..0f568ad
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/PEMaker.java
@@ -0,0 +1,246 @@
+package org.apache.s4.fluent;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Event;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Helper class to add a processing element to an S4 application.
+ * 
+ * @see example {@link S4Maker}
+ * 
+ */
+public class PEMaker {
+
+    private static final Logger logger = LoggerFactory.getLogger(PEMaker.class);
+
+    final private Class<? extends ProcessingElement> type;
+    final private AppMaker app;
+    private ProcessingElement pe = null;
+
+    private long timerInterval = 0;
+
+    private long triggerInterval = 0;
+    private Class<? extends Event> triggerEventType = null;
+    private int triggerNumEvents = 0;
+
+    private int cacheMaximumSize = 0;
+    private long cacheDuration = 0;
+
+    private PropertiesConfiguration properties = new PropertiesConfiguration();
+
+    PEMaker(AppMaker app, Class<? extends ProcessingElement> type) {
+        Preconditions.checkNotNull(type);
+        this.type = type;
+        this.app = app;
+        app.add(this, null);
+    }
+
+    /**
+     * Configure the PE expiration and cache size.
+     * <p>
+     * PE instances will be automatically removed from the cache once a fixed duration has elapsed after the PEs
+     * creation, or last access.
+     * <p>
+     * Least accessed PEs will automatically be removed from the cache when the number of PEs approaches maximumSize.
+     * <p>
+     * When this method is called all existing PE instances are destroyed.
+     * 
+     * 
+     * @param maximumSize
+     *            the approximate maximum number of PEs in the cache.
+     * @param duration
+     *            the PE duration
+     * @param timeUnit
+     *            the time unit
+     * @return the PEMaker
+     */
+    public CacheMaker addCache() {
+
+        return new CacheMaker();
+    }
+
+    /**
+     * Configure a trigger that is fired when the following conditions occur:
+     * 
+     * <ul>
+     * <li>An event of eventType arrived to the PE instance
+     * <li>numEvents have arrived since the last time this trigger was fired -OR- time since last event is greater than
+     * interval.
+     * </ul>
+     * 
+     * <p>
+     * When the trigger fires, the method <tt>trigger(EventType event)</tt> is called. Where <tt>EventType</tt> matches
+     * the argument eventType.
+     * 
+     * @param eventType
+     *            the type of event on which this trigger will fire.
+     * @param numEvents
+     *            number of events since last trigger activation. Must be greater than zero. (Set to one to trigger on
+     *            every input event.)
+     * @param interval
+     *            minimum time between triggers. Set to zero if no time interval needed.
+     * @param timeUnit
+     *            the TimeUnit for the argument interval. Can set to null if no time interval needed.
+     * @return the PEMaker
+     */
+    public TriggerMaker addTrigger() {
+
+        return new TriggerMaker();
+    }
+
+    /**
+     * Set a timer that calls {@link ProcessingElement#onTime()}.
+     * 
+     * If {@code interval==0} the timer is disabled.
+     * 
+     * @param interval
+     *            in timeUnit
+     * @param timeUnit
+     *            the timeUnit of interval
+     * @return the PEMaker
+     */
+    public TimerMaker addTimer() {
+        return new TimerMaker();
+    }
+
+    public StreamMaker emit(Class<? extends Event> type) {
+
+        logger.debug("PE [{}] emits event of type [{}].", this.getType().getName(), type.getCanonicalName());
+        StreamMaker stream = new StreamMaker(app, type);
+        app.add(this, stream);
+        return stream;
+    }
+
+    public PEMaker withKey(String key) {
+
+        return this;
+    }
+
+    public PEMaker with(String key, Object value) {
+
+        properties.addProperty(key, value);
+        return this;
+    }
+
+    /**
+     * @return the timerInterval
+     */
+    long getTimerInterval() {
+        return timerInterval;
+    }
+
+    /**
+     * @return the triggerInterval
+     */
+    long getTriggerInterval() {
+        return triggerInterval;
+    }
+
+    /**
+     * @return the triggerEventType
+     */
+    Class<? extends Event> getTriggerEventType() {
+        return triggerEventType;
+    }
+
+    /**
+     * @return the triggerNumEvents
+     */
+    int getTriggerNumEvents() {
+        return triggerNumEvents;
+    }
+
+    /**
+     * @return the cacheMaximumSize
+     */
+    int getCacheMaximumSize() {
+        return cacheMaximumSize;
+    }
+
+    /**
+     * @return the cacheDuration
+     */
+    long getCacheDuration() {
+        return cacheDuration;
+    }
+
+    /**
+     * @return the type
+     */
+    Class<? extends ProcessingElement> getType() {
+        return type;
+    }
+
+    /**
+     * @return the properties
+     */
+    PropertiesConfiguration getProperties() {
+        return properties;
+    }
+
+    /**
+     * @return the pe
+     */
+    public ProcessingElement getPe() {
+        return pe;
+    }
+
+    /**
+     * @param pe
+     *            the pe to set
+     */
+    public void setPe(ProcessingElement pe) {
+        this.pe = pe;
+    }
+
+    public class TriggerMaker {
+
+        public TriggerMaker fireOn(Class<? extends Event> eventType) {
+
+            triggerEventType = eventType;
+            return this;
+        }
+
+        public TriggerMaker ifNumEvents(int numEvents) {
+
+            triggerNumEvents = numEvents;
+            return this;
+        }
+
+        public TriggerMaker ifInterval(long interval, TimeUnit timeUnit) {
+
+            if (timeUnit != null)
+                triggerInterval = timeUnit.convert(interval, TimeUnit.MILLISECONDS);
+            return this;
+        }
+    }
+
+    public class CacheMaker {
+
+        public CacheMaker ofSize(int maxSize) {
+            cacheMaximumSize = maxSize;
+            return this;
+        }
+
+        public CacheMaker withDuration(long duration, TimeUnit timeUnit) {
+            cacheDuration = timeUnit.convert(duration, TimeUnit.MILLISECONDS);
+            return this;
+        }
+    }
+
+    public class TimerMaker {
+
+        public TimerMaker withDuration(long duration, TimeUnit timeUnit) {
+            timerInterval = TimeUnit.MILLISECONDS.convert(duration, timeUnit);
+            timerInterval = duration;
+            return this;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/main/java/org/apache/s4/fluent/StreamMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/StreamMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/StreamMaker.java
new file mode 100644
index 0000000..d90c493
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/StreamMaker.java
@@ -0,0 +1,157 @@
+package org.apache.s4.fluent;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.KeyFinder;
+import org.apache.s4.core.Stream;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Helper class to add a stream to an S4 application.
+ * 
+ * @see example {@link S4Maker}
+ * 
+ */
+public class StreamMaker {
+
+    final private AppMaker app;
+    final private Class<? extends Event> type;
+    private String name = null;
+    private KeyFinder<? extends Event> keyFinder;
+    private String keyDescriptor = null;
+    private Stream<? extends Event> stream = null;
+
+    StreamMaker(AppMaker app, Class<? extends Event> type) {
+
+        Preconditions.checkNotNull(type);
+        this.app = app;
+        this.type = type;
+        app.add(null, this);
+    }
+
+    /**
+     * Name the stream.
+     * 
+     * @param name
+     *            the stream name, default is an empty string.
+     * @return the stream maker object
+     */
+    public StreamMaker withName(String name) {
+        this.name = name;
+        return this;
+    }
+
+    /**
+     * Define the key finder for this stream.
+     * 
+     * @param keyFinder
+     *            a function to lookup the value of the key.
+     * @return the stream maker.
+     */
+    public <T extends Event> StreamMaker onKey(KeyFinder<T> keyFinder) {
+        this.keyFinder = keyFinder;
+        return this;
+    }
+
+    /**
+     * Define the key finder for this stream using a descriptor.
+     * 
+     * @param keyFinderString
+     *            a descriptor to lookup the value of the key.
+     * @return the stream maker.
+     */
+    public StreamMaker withKey(String keyDescriptor) {
+
+        this.keyDescriptor = keyDescriptor;
+        return this;
+    }
+
+    /**
+     * Send events from this stream to a PE.
+     * 
+     * @param pe
+     *            a target PE.
+     * 
+     * @return the stream maker.
+     */
+    public StreamMaker to(PEMaker pe) {
+        app.add(this, pe);
+        return this;
+    }
+
+    /**
+     * Send events from this stream to various PEs.
+     * 
+     * @param pe
+     *            a target PE array.
+     * 
+     * @return the stream maker.
+     */
+    public StreamMaker to(PEMaker[] pes) {
+        for (int i = 0; i < pes.length; i++)
+            app.add(this, pes[i]);
+        return this;
+    }
+
+    /**
+     * @return the app
+     */
+    AppMaker getApp() {
+        return app;
+    }
+
+    /**
+     * @return the type
+     */
+    Class<? extends Event> getType() {
+        return type;
+    }
+
+    /**
+     * @return the name
+     */
+    String getName() {
+
+        if (name != null) {
+            return name;
+        } else {
+
+            String key;
+            if (keyDescriptor != null) {
+                key = keyDescriptor;
+                return type.getCanonicalName() + "-" + key;
+            } else {
+                return type.getCanonicalName();
+            }
+        }
+    }
+
+    /**
+     * @return the keyFinder
+     */
+    KeyFinder<? extends Event> getKeyFinder() {
+        return keyFinder;
+    }
+
+    /**
+     * @return the keyDescriptor
+     */
+    String getKeyDescriptor() {
+        return keyDescriptor;
+    }
+
+    /**
+     * @return the stream
+     */
+    public Stream<? extends Event> getStream() {
+        return stream;
+    }
+
+    /**
+     * @param stream
+     *            the stream to set
+     */
+    public void setStream(Stream<? extends Event> stream) {
+        this.stream = stream;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/MyApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/MyApp.java
index 5fbe2c7..778931e 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/MyApp.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/MyApp.java
@@ -5,20 +5,21 @@ public class MyApp extends AppMaker {
     @Override
     protected void configure() {
 
-        PEMaker pe1, pe2;
+        PEMaker pez, pey;
         StreamMaker s1;
         StreamMaker s2, s3;
 
-        pe1 = addPE(PEZ.class);
+        pez = addPE(PEZ.class);
 
-        s1 = addStream(EventA.class).withName("My first stream.").withKey("{gender}").to(pe1);
+        s1 = addStream("stream1", EventA.class).withName("My first stream.").withKey("{gender}").to(pez);
 
-        pe2 = addPE(PEY.class).to(s1);
+        pey = addPE(PEY.class).to(s1).property("duration", 4).property("height", 99);
 
-        s2 = addStream(EventB.class).withName("My second stream.").withKey("{age}").to(pe2);
+        s2 = addStream("stream2", EventB.class).withName("My second stream.").withKey("{age}").to(pey).to(pez);
 
-        s3 = addStream(EventB.class).withName("My third stream.").withKey("{height}").to(pe2);
+        s3 = addStream("stream3", EventB.class).withKey("{height}").to(pey);
 
-        addPE(PEX.class).to(s2).to(s3);
+        addPE(PEX.class).to(s2).to(s3).property("keyword", "money");
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEY.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEY.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEY.java
index 16d951a..1db0525 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEY.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEY.java
@@ -1,9 +1,12 @@
 package org.apache.s4.appmaker;
 
 import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
 
 public class PEY extends ProcessingElement {
 
+    private Stream<EventB> stream3;
+
     @Override
     protected void onCreate() {
         // TODO Auto-generated method stub
@@ -16,4 +19,19 @@ public class PEY extends ProcessingElement {
 
     }
 
+    /**
+     * @return the stream3
+     */
+    Stream<EventB> getStream3() {
+        return stream3;
+    }
+
+    /**
+     * @param stream3
+     *            the stream3 to set
+     */
+    void setStream3(Stream<EventB> stream3) {
+        this.stream3 = stream3;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEZ.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEZ.java b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEZ.java
index e893754..9ac3761 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEZ.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/appmaker/PEZ.java
@@ -1,9 +1,43 @@
 package org.apache.s4.appmaker;
 
 import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
 
 public class PEZ extends ProcessingElement {
 
+    private Stream<EventA> stream1;
+    private Stream<EventB> stream2;
+
+    /**
+     * @return the stream1
+     */
+    Stream<EventA> getStream1() {
+        return stream1;
+    }
+
+    /**
+     * @param stream1
+     *            the stream1 to set
+     */
+    void setStream1(Stream<EventA> stream1) {
+        this.stream1 = stream1;
+    }
+
+    /**
+     * @return the stream2
+     */
+    Stream<EventB> getStream2() {
+        return stream2;
+    }
+
+    /**
+     * @param stream2
+     *            the stream2 to set
+     */
+    void setStream2(Stream<EventB> stream2) {
+        this.stream2 = stream2;
+    }
+
     @Override
     protected void onCreate() {
         // TODO Auto-generated method stub

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/AppMakerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/AppMakerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/AppMakerTest.java
new file mode 100644
index 0000000..365f987
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/AppMakerTest.java
@@ -0,0 +1,33 @@
+package org.apache.s4.fluent;
+
+import java.lang.reflect.Field;
+
+import org.junit.Test;
+
+public class AppMakerTest {
+
+    @Test
+    public void test() throws Exception {
+
+        MyApp myApp = new MyApp();
+        myApp.configure();
+        System.out.println(myApp.toString());
+        myApp.make();
+    }
+
+    @Test
+    public void testReflection() {
+
+        try {
+            Class<?> c = PEY.class;
+            Field f = c.getDeclaredField("duration");
+            System.out.format("Type: %s%n", f.getType());
+            System.out.format("GenericType: %s%n", f.getGenericType());
+
+            // production code should handle these exceptions more gracefully
+        } catch (NoSuchFieldException x) {
+            x.printStackTrace();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/DurationKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/DurationKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/DurationKeyFinder.java
new file mode 100644
index 0000000..a62a9f1
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/DurationKeyFinder.java
@@ -0,0 +1,19 @@
+package org.apache.s4.fluent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class DurationKeyFinder implements KeyFinder<EventA> {
+
+    public List<String> get(EventA event) {
+
+        List<String> results = new ArrayList<String>();
+
+        /* Retrieve the gender and add it to the list. */
+        results.add(Long.toString(event.getDuration()));
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventA.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventA.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventA.java
new file mode 100644
index 0000000..ccf37e2
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventA.java
@@ -0,0 +1,39 @@
+package org.apache.s4.fluent;
+
+import org.apache.s4.base.Event;
+
+public class EventA extends Event {
+
+    private long duration;
+    private int height;
+
+    /**
+     * @return the duration
+     */
+    public long getDuration() {
+        return duration;
+    }
+
+    /**
+     * @param duration
+     *            the duration to set
+     */
+    public void setDuration(long duration) {
+        this.duration = duration;
+    }
+
+    /**
+     * @return the height
+     */
+    public int getHeight() {
+        return height;
+    }
+
+    /**
+     * @param height the height to set
+     */
+    public void setHeight(int height) {
+        this.height = height;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventB.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventB.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventB.java
new file mode 100644
index 0000000..3e285e3
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventB.java
@@ -0,0 +1,24 @@
+package org.apache.s4.fluent;
+
+import org.apache.s4.base.Event;
+
+public class EventB extends Event {
+
+    private String query;
+
+    /**
+     * @return the query
+     */
+    public String getQuery() {
+        return query;
+    }
+
+    /**
+     * @param query
+     *            the query to set
+     */
+    public void setQuery(String query) {
+        this.query = query;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/HeightKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/HeightKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/HeightKeyFinder.java
new file mode 100644
index 0000000..daad556
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/HeightKeyFinder.java
@@ -0,0 +1,19 @@
+package org.apache.s4.fluent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class HeightKeyFinder implements KeyFinder<EventA> {
+
+    public List<String> get(EventA event) {
+
+        List<String> results = new ArrayList<String>();
+
+        /* Retrieve the gender and add it to the list. */
+        results.add(Integer.toString(event.getHeight()));
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/MyApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/MyApp.java
new file mode 100644
index 0000000..bf97eb6
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/MyApp.java
@@ -0,0 +1,48 @@
+package org.apache.s4.fluent;
+
+import java.util.concurrent.TimeUnit;
+
+public class MyApp extends AppMaker {
+
+    @Override
+    protected void configure() {
+
+        PEMaker pez, pey, pex;
+
+        /* Configure processing element pez. */
+        pez = addPE(PEZ.class);
+        pez.addTrigger().fireOn(EventA.class).ifInterval(5, TimeUnit.SECONDS);
+        pez.addCache().ofSize(1000).withDuration(3, TimeUnit.HOURS);
+
+        /* Configure processing element pey. */
+        pey = addPE(PEY.class).with("duration", 4).with("height", 99);
+        pey.addTimer().withDuration(2, TimeUnit.MINUTES);
+
+        /* Configure processing element pex. */
+        pex = addPE(PEX.class).with("query", "money");
+        pex.addCache().ofSize(100).withDuration(1, TimeUnit.MINUTES);
+
+        /* Construct the graph. */
+        pey.emit(EventA.class).onKey(new DurationKeyFinder()).to(pez);
+        pex.emit(EventB.class).onKey(new QueryKeyFinder()).to(pez);
+        pex.emit(EventB.class).onKey(new QueryKeyFinder()).to(pey).to(pez);
+    }
+
+    @Override
+    protected void onStart() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onInit() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onClose() {
+        // TODO Auto-generated method stub
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEX.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEX.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEX.java
new file mode 100644
index 0000000..891cd40
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEX.java
@@ -0,0 +1,56 @@
+package org.apache.s4.fluent;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+
+public class PEX extends ProcessingElement {
+
+    private String query;
+    private Stream<EventB> someStream;
+
+    public PEX(App app) {
+        super(app);
+    }
+
+    @Override
+    protected void onCreate() {
+
+    }
+
+    @Override
+    protected void onRemove() {
+
+    }
+
+    /**
+     * @return the keyword
+     */
+    String getKeyword() {
+        return query;
+    }
+
+    /**
+     * @param query
+     *            the keyword to set
+     */
+    void setKeyword(String query) {
+        this.query = query;
+    }
+
+    /**
+     * @return the someStream
+     */
+    public Stream<EventB> getSomeStream() {
+        return someStream;
+    }
+
+    /**
+     * @param someStream
+     *            the someStream to set
+     */
+    public void setSomeStream(Stream<EventB> someStream) {
+        this.someStream = someStream;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEY.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEY.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEY.java
new file mode 100644
index 0000000..b175657
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEY.java
@@ -0,0 +1,74 @@
+package org.apache.s4.fluent;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+
+public class PEY extends ProcessingElement {
+
+    private Stream<EventA> stream3;
+    private int height;
+    private long duration;
+
+    public PEY(App app) {
+        super(app);
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+    /**
+     * @return the stream3
+     */
+    Stream<EventA> getStream3() {
+        return stream3;
+    }
+
+    /**
+     * @param stream3
+     *            the stream3 to set
+     */
+    void setStream3(Stream<EventA> stream3) {
+        this.stream3 = stream3;
+    }
+
+    /**
+     * @return the height
+     */
+    int getHeight() {
+        return height;
+    }
+
+    /**
+     * @param height
+     *            the height to set
+     */
+    void setHeight(int height) {
+        this.height = height;
+    }
+
+    /**
+     * @return the duration
+     */
+    long getDuration() {
+        return duration;
+    }
+
+    /**
+     * @param duration
+     *            the duration to set
+     */
+    void setDuration(long duration) {
+        this.duration = duration;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEZ.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEZ.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEZ.java
new file mode 100644
index 0000000..e2f362c
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEZ.java
@@ -0,0 +1,58 @@
+package org.apache.s4.fluent;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+
+public class PEZ extends ProcessingElement {
+
+    private Stream<EventA> stream1;
+    private Stream<EventB> stream2;
+
+    public PEZ(App app) {
+        super(app);
+    }
+
+    /**
+     * @return the stream1
+     */
+    Stream<EventA> getStream1() {
+        return stream1;
+    }
+
+    /**
+     * @param stream1
+     *            the stream1 to set
+     */
+    void setStream1(Stream<EventA> stream1) {
+        this.stream1 = stream1;
+    }
+
+    /**
+     * @return the stream2
+     */
+    Stream<EventB> getStream2() {
+        return stream2;
+    }
+
+    /**
+     * @param stream2
+     *            the stream2 to set
+     */
+    void setStream2(Stream<EventB> stream2) {
+        this.stream2 = stream2;
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/QueryKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/QueryKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/QueryKeyFinder.java
new file mode 100644
index 0000000..5979a23
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/QueryKeyFinder.java
@@ -0,0 +1,19 @@
+package org.apache.s4.fluent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class QueryKeyFinder implements KeyFinder<EventB> {
+
+    public List<String> get(EventB event) {
+
+        List<String> results = new ArrayList<String>();
+
+        /* Retrieve the gender and add it to the list. */
+        results.add(event.getQuery());
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/AgeKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/AgeKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/AgeKeyFinder.java
new file mode 100644
index 0000000..a45468d
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/AgeKeyFinder.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.fluent.counter;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class AgeKeyFinder implements KeyFinder<UserEvent> {
+
+    public List<String> get(UserEvent event) {
+
+        List<String> results = new ArrayList<String>();
+
+        /* Retrieve the age and add it to the list. */
+        results.add(Integer.toString(event.getAge()));
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CountEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CountEvent.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CountEvent.java
new file mode 100644
index 0000000..3768875
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CountEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.fluent.counter;
+
+import org.apache.s4.base.Event;
+
+public class CountEvent extends Event {
+
+    private String key;
+    private long count;
+    
+    public CountEvent() {
+        
+    }
+
+    CountEvent(String key, long count) {
+        this.key = key;
+        this.count = count;
+    }
+
+    CountEvent(String key, long count, long time) {
+        super(time);
+        this.key = key;
+        this.count = count;
+    }
+
+   
+    /**
+     * @return the key
+     */
+    public String getKey() {
+        return key;
+    }
+
+    /**
+     * @return the count
+     */
+    public long getCount() {
+        return count;
+    }
+
+    public String toString() {
+        return String.format("Key: " + key + ", Count: %08d", count);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CountKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CountKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CountKeyFinder.java
new file mode 100644
index 0000000..6337fe1
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CountKeyFinder.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.fluent.counter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class CountKeyFinder implements KeyFinder<CountEvent> {
+
+    public List<String> get(CountEvent event) {
+
+        List<String> results = new ArrayList<String>();
+
+        /* Retrieve the user ID and add it to the list. */
+        results.add(event.getKey());
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CounterPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CounterPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CounterPE.java
new file mode 100644
index 0000000..f9e939a
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/CounterPE.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.fluent.counter;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+
+
+public class CounterPE extends ProcessingElement {
+
+    private Stream<CountEvent> countStream = null;
+
+    public CounterPE(App app) {
+        super(app);
+    }
+    
+    /**
+     * @return the countStream
+     */
+    public Stream<CountEvent> getCountStream() {
+        return countStream;
+    }
+
+    /**
+     * @param countStream
+     *            the countStream to set
+     */
+    public void setCountStream(Stream<CountEvent> countStream) {
+        this.countStream = countStream;
+    }
+
+    private long counter = 0;
+
+    public void onEvent(Event event) {
+
+        counter += 1;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see io.s4.ProcessingElement#sendOutputEvent()
+     */
+    public void onTrigger(Event event) {
+
+        CountEvent countEvent = new CountEvent(getId(), counter);
+        countStream.put(countEvent);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see io.s4.ProcessingElement#init()
+     */
+    @Override
+    protected void onCreate() {
+
+    }
+
+    @Override
+    protected void onRemove() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/GenderKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/GenderKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/GenderKeyFinder.java
new file mode 100644
index 0000000..17111c7
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/GenderKeyFinder.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.fluent.counter;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class GenderKeyFinder implements KeyFinder<UserEvent> {
+
+    public List<String> get(UserEvent event) {
+
+        List<String> results = new ArrayList<String>();
+
+        /* Retrieve the gender and add it to the list. */
+        results.add(Character.toString(event.getGender()));
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/GenerateUserEventPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/GenerateUserEventPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/GenerateUserEventPE.java
new file mode 100644
index 0000000..b85c4e7
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/GenerateUserEventPE.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.fluent.counter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.SingletonPE;
+import org.apache.s4.core.Stream;
+
+
+public class GenerateUserEventPE extends SingletonPE {
+
+    static String userIds[] = { "pepe", "jose", "tito", "mr_smith", "joe" };
+    static int[] ages = { 25, 2, 33, 6, 67 };
+    static char[] genders = { 'f', 'm' };
+    private Stream<UserEvent>[] targetStreams;
+    final private Random generator = new Random(22);
+
+    public GenerateUserEventPE(App app) {
+        super(app);
+    }
+
+    /**
+     * @param targetStreams the {@link UserEvent} streams.
+     */
+    public void setStreams(Stream<UserEvent>... targetStreams){
+        this.targetStreams = targetStreams;
+    }
+    
+    public void onTrigger(Event event) {
+        List<String> favorites = new ArrayList<String>();
+        favorites.add("dulce de leche");
+        favorites.add("strawberry");
+
+        int indexUserID = generator.nextInt(userIds.length);
+        int indexAge = generator.nextInt(ages.length);
+        int indexGender = generator.nextInt(2);
+
+        UserEvent userEvent = new UserEvent(userIds[indexUserID],
+                ages[indexAge], favorites, genders[indexGender]);
+
+        for (int i = 0; i < targetStreams.length; i++) {
+            targetStreams[i].put(userEvent);
+        }
+    }
+
+    @Override
+    protected void onRemove() {
+    }
+
+    static int pickRandom(int numElements) {
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/Module.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/Module.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/Module.java
new file mode 100644
index 0000000..a5acc06
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/Module.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.fluent.counter;
+
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+/**
+ * Configures the controller.
+ * 
+ * Reads a properties file, provides a {@link Communicator} singleton, and configures Guice bindings.
+ * 
+ * @author Leo Neumeyer
+ */
+public class Module extends AbstractModule {
+
+    protected PropertiesConfiguration config = null;
+
+    private void loadProperties(Binder binder) {
+
+        try {
+            InputStream is = this.getClass().getResourceAsStream("/s4-counter-example.properties");
+            config = new PropertiesConfiguration();
+            config.load(is);
+
+            System.out.println(ConfigurationUtils.toString(config));
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void configure() {
+        if (config == null)
+            loadProperties(binder());
+
+        bind(MyApp.class);
+
+        bind(Cluster.class);
+
+        /* Configure static assignment using a configuration file. */
+        bind(Assignment.class).to(AssignmentFromFile.class);
+
+        /* Configure a static cluster topology using a configuration file. */
+        bind(Topology.class).to(TopologyFromFile.class);
+
+        // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
+        // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
+        //
+        // bind(Emitter.class).to(QueueingEmitter.class);
+        // bind(Listener.class).to(QueueingListener.class);
+
+        /* Use the Netty comm layer implementation. */
+        // bind(Emitter.class).to(NettyEmitter.class);
+        // bind(Listener.class).to(NettyListener.class);
+
+        /* Use a simple UDP comm layer implementation. */
+        bind(Emitter.class).to(UDPEmitter.class);
+        bind(Listener.class).to(UDPListener.class);
+
+        /* The hashing function to map keys top partitions. */
+        bind(Hasher.class).to(DefaultHasher.class);
+
+        /* Use Kryo to serialize events. */
+        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/MyApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/MyApp.java
new file mode 100644
index 0000000..9ea29cf
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/MyApp.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.fluent.counter;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.Receiver;
+import org.apache.s4.core.Sender;
+import org.apache.s4.core.Stream;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+/*
+ * This is a sample application to test a new S4 API. 
+ * See README file for details.
+ * 
+ * */
+
+final public class MyApp extends App {
+
+    final private int interval = 1;
+    private GenerateUserEventPE generateUserEventPE;
+
+    /*
+     * 
+     * 
+     * The application graph itself is created in this Class. However, developers may provide tools for creating apps
+     * which will generate the objects.
+     * 
+     * IMPORTANT: we create a graph of PE prototypes. The prototype is a class instance that is used as a prototype from
+     * which all PE instance will be created. The prototype itself is not used as an instance. (Except when the PE is of
+     * type Singleton PE). To create a data structure for each PE instance you must do it in the method
+     * ProcessingElement.onCreate().
+     */
+
+    /*
+     * Build the application graph using POJOs. Don't like it? Write a nice tool.
+     * 
+     * @see io.s4.App#init()
+     */
+    @SuppressWarnings("unchecked")
+    @Override
+    protected void onInit() {
+
+        /* PE that prints counts to console. */
+        PrintPE printPE = createPE(PrintPE.class);
+
+        Stream<CountEvent> userCountStream = createStream(CountEvent.class).setName("User Count Stream")
+                .setKey(new CountKeyFinder()).setPE(printPE);
+
+        Stream<CountEvent> genderCountStream = createStream(CountEvent.class).setName("Gender Count Stream")
+                .setKey(new CountKeyFinder()).setPE(printPE);
+
+        Stream<CountEvent> ageCountStream = createStream(CountEvent.class).setName("Age Count Stream")
+                .setKey(new CountKeyFinder()).setPE(printPE);
+
+        /* PEs that count events by user, gender, and age. */
+        CounterPE userCountPE = createPE(CounterPE.class);// .withTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+        userCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+        userCountPE.setCountStream(userCountStream);
+
+        CounterPE genderCountPE = createPE(CounterPE.class);
+        genderCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+        genderCountPE.setCountStream(genderCountStream);
+
+        CounterPE ageCountPE = createPE(CounterPE.class);
+        ageCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+        ageCountPE.setCountStream(ageCountStream);
+
+        /* Streams that output user events keyed on user, gender, and age. */
+        Stream<UserEvent> userStream = createStream(UserEvent.class).setName("User Stream")
+                .setKey(new UserIDKeyFinder()).setPE(userCountPE);
+
+        Stream<UserEvent> genderStream = createStream(UserEvent.class).setName("Gender Stream")
+                .setKey(new GenderKeyFinder()).setPE(genderCountPE);
+
+        Stream<UserEvent> ageStream = createStream(UserEvent.class).setName("Age Stream").setKey(new AgeKeyFinder())
+                .setPE(ageCountPE);
+
+        generateUserEventPE = createPE(GenerateUserEventPE.class);
+        generateUserEventPE.setStreams(userStream, genderStream, ageStream);
+    }
+
+    /*
+     * Create and send 200 dummy events of type UserEvent.
+     * 
+     * @see io.s4.App#start()
+     */
+    @Override
+    protected void onStart() {
+
+        for (int i = 0; i < 200; i++) {
+            generateUserEventPE.onTrigger(null);
+        }
+
+        try {
+            Thread.sleep(5000);
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+
+        System.out.println("Done. Wait until the main app closes.");
+        // close();
+    }
+
+    @Override
+    protected void onClose() {
+        System.out.println("Bye.");
+
+    }
+
+    public static void main(String[] args) {
+
+        Injector injector = Guice.createInjector(new Module());
+        MyApp myApp = injector.getInstance(MyApp.class);
+        Sender sender = injector.getInstance(Sender.class);
+        Receiver receiver = injector.getInstance(Receiver.class);
+        myApp.setCommLayer(sender, receiver);
+        myApp.init();
+        myApp.start();
+
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+        myApp.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/PrintPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/PrintPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/PrintPE.java
new file mode 100644
index 0000000..dc3a25d
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/PrintPE.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.fluent.counter;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+
+
+public class PrintPE extends ProcessingElement {
+
+    public PrintPE(App app) {
+        super(app);
+    }
+
+    public void onEvent(Event event) {
+
+        System.out.println(event.toString());
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onRemove() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/README.md
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/README.md b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/README.md
new file mode 100644
index 0000000..72e0a3d
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/README.md
@@ -0,0 +1,17 @@
+S4 Counter Example
+==================
+
+In this example we do the following:
+
+- Generate dummy events (UserEvent).
+- Key events by user, gender, age.
+- Count by user, gender, age.
+- Print partial counts.
+
+The following diagram shows the application graph:
+
+![S4 Counter](https://github.com/leoneu/s4-piper/raw/master/etc/s4-counter-example.png)
+
+In in following diagram I show how Classes, PE Prototypes, PE instances, and Streams are related.
+
+![S4 Objects](https://github.com/leoneu/s4-piper/raw/master/etc/s4-objects-example.png)

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/UserEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/UserEvent.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/UserEvent.java
new file mode 100644
index 0000000..e5bf67d
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/UserEvent.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.fluent.counter;
+
+
+import java.util.List;
+
+import org.apache.s4.base.Event;
+
+public class UserEvent extends Event {
+
+    private String userID;
+    private int age;
+    private char gender;
+    private List<String> favorites;
+    
+    public UserEvent() {
+        
+    }
+
+    UserEvent(String userID, int age, List<String> favorites, char gender) {
+        this.userID = userID;
+        this.age = age;
+        this.favorites = favorites;
+        this.gender = gender;
+    }
+
+    /**
+     * @return the userID
+     */
+    public String getUserID() {
+        return userID;
+    }
+
+    /**
+     * @return the age
+     */
+    public int getAge() {
+        return age;
+    }
+
+    /**
+     * @return the favorites
+     */
+    public List<String> getFavorites() {
+        return favorites;
+    }
+
+    /**
+     * @return the gender
+     */
+    public char getGender() {
+        return gender;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/UserIDKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/UserIDKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/UserIDKeyFinder.java
new file mode 100644
index 0000000..b49739c
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/counter/UserIDKeyFinder.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.fluent.counter;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class UserIDKeyFinder implements KeyFinder<UserEvent> {
+
+    public List<String> get(UserEvent event) {
+
+        List<String> results = new ArrayList<String>();
+
+        /* Retrieve the user ID and add it to the list. */
+        results.add(event.getUserID());
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/21ff6dc4/subprojects/s4-core/src/test/resources/s4-counter-example.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/s4-counter-example.properties b/subprojects/s4-core/src/test/resources/s4-counter-example.properties
new file mode 100644
index 0000000..b60f40a
--- /dev/null
+++ b/subprojects/s4-core/src/test/resources/s4-counter-example.properties
@@ -0,0 +1,7 @@
+comm.queue_emmiter_size = 8000
+comm.queue_listener_size = 8000
+cluster.hosts = localhost
+cluster.ports = 5077
+cluster.lock_dir = /tmp
+
+