You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by le...@apache.org on 2012/02/14 19:36:13 UTC

[1/4] git commit: Committing the first working version of the S4 embedded domain-specific language.

Updated Branches:
  refs/heads/s4-5 [created] 6e9736673


Committing the first working version of the S4 embedded domain-specific language.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/6e973667
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/6e973667
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/6e973667

Branch: refs/heads/s4-5
Commit: 6e97366735b3363a2d56dbf6b316681d4d0a928a
Parents: 7c2688f
Author: Leo Neumeyer <le...@apache.org>
Authored: Tue Feb 14 11:35:35 2012 -0800
Committer: Leo Neumeyer <le...@apache.org>
Committed: Tue Feb 14 11:35:35 2012 -0800

----------------------------------------------------------------------
 build.gradle                                       |    3 +-
 .../src/main/java/org/apache/s4/core/App.java      |   38 ++-
 .../java/org/apache/s4/core/ProcessingElement.java |   58 ++-
 .../src/main/java/org/apache/s4/core/Stream.java   |    6 +
 .../main/java/org/apache/s4/fluent/AppMaker.java   |  418 ---------------
 .../main/java/org/apache/s4/fluent/FluentApp.java  |   49 --
 .../main/java/org/apache/s4/fluent/PEMaker.java    |  257 ---------
 .../java/org/apache/s4/fluent/StreamMaker.java     |  168 ------
 .../java/org/apache/s4/fluent/AppMakerTest.java    |   34 --
 .../org/apache/s4/fluent/DurationKeyFinder.java    |   19 -
 .../src/test/java/org/apache/s4/fluent/EventA.java |   39 --
 .../src/test/java/org/apache/s4/fluent/EventB.java |   24 -
 .../java/org/apache/s4/fluent/HeightKeyFinder.java |   19 -
 .../src/test/java/org/apache/s4/fluent/MyApp.java  |   41 --
 .../src/test/java/org/apache/s4/fluent/PEX.java    |   58 --
 .../src/test/java/org/apache/s4/fluent/PEY.java    |   77 ---
 .../src/test/java/org/apache/s4/fluent/PEZ.java    |   58 --
 .../java/org/apache/s4/fluent/QueryKeyFinder.java  |   19 -
 subprojects/s4-edsl/s4-edsl.gradle                 |    9 +
 subprojects/s4-edsl/src/main/diezel/s4/s4-impl.xml |   72 ++-
 subprojects/s4-edsl/src/main/diezel/s4/s4.xml      |   46 +-
 subprojects/s4-edsl/src/main/java/Main.java        |   15 -
 .../main/java/org/apache/s4/edsl/AppBuilder.java   |  330 ++++++++++++
 .../java/org/apache/s4/edsl/StreamBuilder.java     |   75 +++
 .../java/org/apache/s4/edsl/DurationKeyFinder.java |   19 +
 .../src/test/java/org/apache/s4/edsl/EventA.java   |   40 ++
 .../src/test/java/org/apache/s4/edsl/EventB.java   |   24 +
 .../java/org/apache/s4/edsl/HeightKeyFinder.java   |   19 +
 .../src/test/java/org/apache/s4/edsl/Module.java   |  105 ++++
 .../src/test/java/org/apache/s4/edsl/MyApp.java    |   37 ++
 .../src/test/java/org/apache/s4/edsl/PEX.java      |   58 ++
 .../src/test/java/org/apache/s4/edsl/PEY.java      |   77 +++
 .../src/test/java/org/apache/s4/edsl/PEZ.java      |   58 ++
 .../java/org/apache/s4/edsl/QueryKeyFinder.java    |   19 +
 .../src/test/java/org/apache/s4/edsl/TestEDSL.java |   38 ++
 subprojects/s4-example/s4-example.gradle           |    1 +
 .../s4/example/edsl/counter/AgeKeyFinder.java      |   34 ++
 .../apache/s4/example/edsl/counter/CountEvent.java |   58 ++
 .../s4/example/edsl/counter/CountKeyFinder.java    |   34 ++
 .../apache/s4/example/edsl/counter/CounterApp.java |   80 +++
 .../apache/s4/example/edsl/counter/CounterPE.java  |   84 +++
 .../s4/example/edsl/counter/GenderKeyFinder.java   |   34 ++
 .../example/edsl/counter/GenerateUserEventPE.java  |   77 +++
 .../org/apache/s4/example/edsl/counter/Module.java |  105 ++++
 .../apache/s4/example/edsl/counter/PrintPE.java    |   44 ++
 .../s4/example/edsl/counter/TestCounterApp.java    |   26 +
 .../apache/s4/example/edsl/counter/UserEvent.java  |   68 +++
 .../s4/example/edsl/counter/UserIDKeyFinder.java   |   34 ++
 .../s4/example/fluent/counter/AgeKeyFinder.java    |   34 --
 .../s4/example/fluent/counter/CountEvent.java      |   58 --
 .../s4/example/fluent/counter/CountKeyFinder.java  |   34 --
 .../s4/example/fluent/counter/CounterPE.java       |   84 ---
 .../s4/example/fluent/counter/GenderKeyFinder.java |   34 --
 .../fluent/counter/GenerateUserEventPE.java        |   77 ---
 .../org/apache/s4/example/fluent/counter/Main.java |  127 -----
 .../apache/s4/example/fluent/counter/Module.java   |  108 ----
 .../apache/s4/example/fluent/counter/PrintPE.java  |   44 --
 .../org/apache/s4/example/fluent/counter/README.md |   17 -
 .../s4/example/fluent/counter/UserEvent.java       |   68 ---
 .../s4/example/fluent/counter/UserIDKeyFinder.java |   34 --
 60 files changed, 1758 insertions(+), 2066 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 106dd2b..d67e54d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -73,7 +73,7 @@ libraries = [
     jcip:               'net.jcip:jcip-annotations:1.0',
     junit:              'junit:junit:4.10',
     zkclient:           'com.github.sgroschupf:zkclient:0.1',
-    diezel:             'net.ericaro:diezel-maven-plugin:1.0.0-beta-3'
+    diezel:             'net.ericaro:diezel-maven-plugin:1.0.0-beta-4-SNAPSHOT'
 ]
 
 subprojects {
@@ -129,6 +129,7 @@ subprojects {
     )
 }
 
+
 dependsOnChildren()
 
 platformProjects = [project(':s4-base'), project(':s4-core'), project(':s4-comm'), project(':s4-edsl')]

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 4a562f9..5822b89 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
@@ -42,9 +43,12 @@ public abstract class App {
     /* PE prototype to streams relations. */
     final private Multimap<ProcessingElement, Streamable<? extends Event>> pe2stream = LinkedListMultimap.create();
 
-    /* Stream prototype to PE relations. */
+    /* Stream to PE prototype relations. */
     final private Multimap<Streamable<? extends Event>, ProcessingElement> stream2pe = LinkedListMultimap.create();
 
+    /* Pes indexed by name. */
+    Map<String, ProcessingElement> peByName = Maps.newHashMap();
+
     private ClockType clockType = ClockType.WALL_CLOCK;
     private int id = -1;
     @Inject
@@ -104,6 +108,10 @@ public abstract class App {
 
     protected abstract void onStart();
 
+    /**
+     * This method is called by the container after initialization. Once this method is called, threads get started and
+     * events start flowing.
+     */
     protected void start() {
 
         logger.info("Prepare to start App [{}].", getClass().getName());
@@ -122,6 +130,9 @@ public abstract class App {
         onStart();
     }
 
+    /**
+     * This method is called by the container to initialize applications.
+     */
     protected abstract void onInit();
 
     protected void init() {
@@ -129,6 +140,9 @@ public abstract class App {
         onInit();
     }
 
+    /**
+     * This method is called by the container before unloading the application.
+     */
     protected abstract void onClose();
 
     protected void close() {
@@ -164,7 +178,11 @@ public abstract class App {
 
         logger.info("Add PE prototype [{}] with stream [{}].", toString(pePrototype), toString(stream));
         pe2stream.put(pePrototype, stream);
+    }
+
+    public ProcessingElement getPE(String name) {
 
+        return peByName.get(name);
     }
 
     void addStream(Streamable<? extends Event> stream, ProcessingElement pePrototype) {
@@ -312,14 +330,17 @@ public abstract class App {
      * 
      * @param type
      *            the processing element type.
+     * @param name
+     *            a name for this PE prototype.
      * @return the processing element prototype.
      */
-    public <T extends ProcessingElement> T createPE(Class<T> type) {
+    public <T extends ProcessingElement> T createPE(Class<T> type, String name) {
 
         try {
             // TODO: make sure this doesn't crash if PE has a constructor other than with App as argument.
             Class<?>[] types = new Class<?>[] { App.class };
             T pe = type.getDeclaredConstructor(types).newInstance(this);
+            pe.setName(name);
             return pe;
 
         } catch (Exception e) {
@@ -328,6 +349,19 @@ public abstract class App {
         }
     }
 
+    /**
+     * Creates a {@link ProcessingElement} prototype.
+     * 
+     * @param type
+     *            the processing element type.
+     * @return the processing element prototype.
+     */
+    public <T extends ProcessingElement> T createPE(Class<T> type) {
+
+        return createPE(type, null);
+
+    }
+
     static private String toString(ProcessingElement pe) {
         return pe != null ? pe.getClass().getName() + " " : "null ";
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index befe94f..152f24e 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -76,7 +76,6 @@ import com.google.common.collect.Maps;
  *         objects in the {@link #onCreate()} method. For example, if each instance requires a
  *         <tt>List<tt/> object the PE should implement the following:
  *         <pre>
- *         {@code
  *         public class MyPE extends ProcessingElement {
  * 
  *           private Map<String, Integer> wordCount;
@@ -88,7 +87,6 @@ import com.google.common.collect.Maps;
  *           logger.trace("Created a map for instance PE with id {}, getId());
  *           }
  *         }
- *         }
  *         </pre>
  * 
  * 
@@ -124,7 +122,7 @@ abstract public class ProcessingElement implements Cloneable {
     private Timer timer;
     private boolean isPrototype = true;
     private boolean isThreadSafe = false;
-    private boolean isFirst = true;
+    private String name = null;
     private boolean isSingleton = false;
 
     private transient OverloadDispatcher overloadDispatcher;
@@ -246,6 +244,34 @@ abstract public class ProcessingElement implements Cloneable {
     }
 
     /**
+     * Sets the max size of the PE cache.
+     * 
+     * <p>
+     * Least accessed PEs will automatically be removed from the cache when the number of PEs approaches maximumSize.
+     * <p>
+     * When this method is called all existing PE instances are destroyed.
+     * 
+     * 
+     * @param maximumSize
+     *            the approximate maximum number of PEs in the cache.
+     * @return the PE prototype
+     */
+    public ProcessingElement setPECache(int maximumSize) {
+
+        Preconditions.checkArgument(isPrototype, "This method can only be used on the PE prototype. Trigger not set.");
+
+        peInstances = CacheBuilder.newBuilder().maximumSize(maximumSize)
+                .build(new CacheLoader<String, ProcessingElement>() {
+                    @Override
+                    public ProcessingElement load(String key) throws Exception {
+                        return createPE(key);
+                    }
+                });
+
+        return this;
+    }
+
+    /**
      * This trigger is fired when the following conditions occur:
      * 
      * <ul>
@@ -502,9 +528,6 @@ abstract public class ProcessingElement implements Cloneable {
         /* Check if instance for key exists, otherwise create one. */
         try {
             if (isSingleton) {
-                logger.trace(
-                        "Requested a PE instance with key [{}]. The instance is a singleton and will ignore the key. The key should be set to null when requesting a singleton.",
-                        id);
                 return peInstances.get(SINGLETON);
             }
             return peInstances.get(id);
@@ -623,6 +646,29 @@ abstract public class ProcessingElement implements Cloneable {
         }
     }
 
+    /**
+     * @return the PE name
+     */
+    protected String getName() {
+        return name;
+    }
+
+    /**
+     * @param name
+     *            PE name
+     */
+    protected void setName(String name) {
+
+        if (name == null)
+            return;
+
+        this.name = name;
+        if (app.peByName.containsKey(name)) {
+            logger.warn("Using a duplicate PE name: [{}]. This is probbaly not what you wanted.", name);
+        }
+        app.peByName.put(name, this);
+    }
+
     class Trigger {
         final long intervalInMilliseconds;
         final int intervalInEvents;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index c05d81f..1790e44 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -88,6 +88,12 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
         targetPEs = new ProcessingElement[pes.size()];
         pes.toArray(targetPEs);
 
+        if (logger.isTraceEnabled()) {
+            for (ProcessingElement pe : pes) {
+                logger.trace("Starting stream [{}] with target PE [{}].", this.getName(), pe.getName());
+            }
+        }
+
         /* Start streaming. */
         thread = new Thread(this, name);
         thread.start();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
deleted file mode 100644
index bfeabd1..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
+++ /dev/null
@@ -1,418 +0,0 @@
-package org.apache.s4.fluent;
-
-import java.lang.reflect.Array;
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Event;
-import org.apache.s4.base.KeyFinder;
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Stream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.Multimap;
-
-/**
- * A fluent API to build S4 applications.
- * 
- * *
- * <p>
- * Usage example:
- * 
- * <pre>
- * 
- * &#064;Override
- * public void configure() {
- * 
- *     PEMaker pez, pey, pex;
- * 
- *     pez = addPE(PEZ.class);
- *     pez.addTrigger().fireOn(EventA.class).ifInterval(5, TimeUnit.SECONDS);
- *     pez.addCache().ofSize(1000).withDuration(3, TimeUnit.HOURS);
- * 
- *     pey = addPE(PEY.class).with(&quot;duration&quot;, 4).with(&quot;height&quot;, 99);
- *     pey.addTimer().withDuration(2, TimeUnit.MINUTES);
- * 
- *     pex = addPE(PEX.class).with(&quot;query&quot;, &quot;money&quot;).asSingleton();
- *     pex.addCache().ofSize(100).withDuration(1, TimeUnit.MINUTES);
- * 
- *     pey.emit(EventA.class).withField(&quot;stream3&quot;).onKey(new DurationKeyFinder()).to(pez);
- *     pey.emit(EventA.class).withField(&quot;heightpez&quot;).onKey(new HeightKeyFinder()).to(pez);
- *     pez.emit(EventB.class).to(pex);
- *     pex.emit(EventB.class).onKey(new QueryKeyFinder()).to(pey).to(pez);
- * }
- * 
- * 
- * </pre>
- */
-abstract public class AppMaker {
-
-    private static final Logger logger = LoggerFactory.getLogger(AppMaker.class);
-
-    /* Use multi-maps to save the graph. */
-    private Multimap<PEMaker, StreamMaker> pe2stream = LinkedListMultimap.create();
-    private Multimap<StreamMaker, PEMaker> stream2pe = LinkedListMultimap.create();
-
-    private FluentApp app;
-
-    public AppMaker() {
-
-    }
-
-    public void setApp(FluentApp app) {
-        this.app = app;
-    }
-
-    /**
-     * Configure the application.
-     */
-    protected abstract void start();
-
-    protected abstract void configure();
-
-    protected abstract void close();
-
-    /**
-     * @return the app
-     */
-    public FluentApp getApp() {
-        return app;
-    }
-
-    /* Used internally to build the graph. */
-    void add(PEMaker pem, StreamMaker stream) {
-
-        pe2stream.put(pem, stream);
-        logger.debug("Adding pe [{}] to stream [{}].", pem != null ? pem.getType().getName() : "null",
-                stream != null ? stream.getName() : "null");
-    }
-
-    /* Used internally to build the graph. */
-    void add(StreamMaker stream, PEMaker pem) {
-
-        stream2pe.put(stream, pem);
-        logger.debug("Adding stream [{}] to pe [{}].", stream != null ? stream.getName() : "null", pem != null ? pem
-                .getType().getName() : "null");
-    }
-
-    protected PEMaker addPE(Class<? extends ProcessingElement> type) {
-        PEMaker pe = new PEMaker(this, type);
-        return pe;
-    }
-
-    App make() {
-
-        logger.debug("Start MAKE.");
-
-        /* Loop PEMaker objects to create PEs. */
-        for (PEMaker key : pe2stream.keySet()) {
-            if (key != null) {
-                try {
-                    key.setPe(makePE(key, key.getType()));
-                } catch (NoSuchFieldException e) {
-                    logger.error("Couldn't make PE.", e);
-                } catch (IllegalAccessException e) {
-                    logger.error("Couldn't make PE.", e);
-                }
-            }
-
-        }
-        /* Loop StreamMaker objects to create Streams. */
-        for (StreamMaker key : stream2pe.keySet()) {
-            if (key != null) {
-                key.setStream(makeStream(key, key.getType()));
-            }
-        }
-
-        /* PE to Stream wiring. */
-        Map<PEMaker, Collection<StreamMaker>> pe2streamMap = pe2stream.asMap();
-        for (Map.Entry<PEMaker, Collection<StreamMaker>> entry : pe2streamMap.entrySet()) {
-            PEMaker pm = entry.getKey();
-            Collection<StreamMaker> streams = entry.getValue();
-
-            if (pm != null && streams != null) {
-                try {
-                    setStreamField(pm, streams);
-                } catch (Exception e) {
-                    logger.error("Couldn't make Stream.", e);
-                }
-            }
-        }
-
-        /* Stream to PE wiring. */
-        Map<StreamMaker, Collection<PEMaker>> stream2peMap = stream2pe.asMap();
-        for (Map.Entry<StreamMaker, Collection<PEMaker>> entry : stream2peMap.entrySet()) {
-            StreamMaker sm = entry.getKey();
-            for (PEMaker pm : entry.getValue()) {
-                if (pm != null && sm != null) {
-                    sm.getStream().setPE(pm.getPe());
-                }
-            }
-        }
-
-        return app;
-    }
-
-    /* Do the magic to create a Stream from a StreamMaker. */
-    @SuppressWarnings("unchecked")
-    private <T extends Event> Stream<T> makeStream(StreamMaker sm, Class<T> type) {
-
-        Stream<T> stream = app.createStream(type);
-        stream.setName(sm.getName());
-
-        if (sm.getKeyFinder() != null)
-            stream.setKey((KeyFinder<T>) sm.getKeyFinder());
-        else if (sm.getKeyDescriptor() != null)
-            stream.setKey(sm.getKeyDescriptor());
-
-        return stream;
-    }
-
-    /* Do the magic to create a PE from a PEMaker. */
-    private <T extends ProcessingElement> T makePE(PEMaker pem, Class<T> type) throws NoSuchFieldException,
-            IllegalAccessException {
-        T pe = app.createPE(type);
-        pe.setSingleton(pem.isSingleton());
-
-        if (pem.getCacheMaximumSize() > 0)
-            pe.setPECache(pem.getCacheMaximumSize(), pem.getCacheDuration(), TimeUnit.MILLISECONDS);
-
-        if (pem.getTimerInterval() > 0)
-            pe.setTimerInterval(pem.getTimerInterval(), TimeUnit.MILLISECONDS);
-
-        if (pem.getTriggerEventType() != null) {
-            if (pem.getTriggerNumEvents() > 0 || pem.getTriggerInterval() > 0) {
-                pe.setTrigger(pem.getTriggerEventType(), pem.getTriggerNumEvents(), pem.getTriggerInterval(),
-                        TimeUnit.MILLISECONDS);
-            }
-        }
-
-        /* Use introspection to match properties to class fields. */
-        setPEAttributes(pe, pem, type);
-        return pe;
-    }
-
-    private <T extends ProcessingElement> void setPEAttributes(T pe, PEMaker pem, Class<T> type)
-            throws NoSuchFieldException, IllegalAccessException {
-
-        PropertiesConfiguration properties = pem.getProperties();
-        @SuppressWarnings("unchecked")
-        Iterator<String> iter = properties.getKeys();
-
-        while (iter.hasNext()) {
-            String property = iter.next();
-            logger.debug("Adding property [{}] to PE of type [{}].", property, type.getName());
-            setField(property, pe, pem, type);
-        }
-    }
-
-    private <T extends ProcessingElement> void setField(String fieldName, T pe, PEMaker pm, Class<T> type)
-            throws NoSuchFieldException, IllegalAccessException {
-        try {
-            Field f = type.getDeclaredField(fieldName);
-            f.setAccessible(true);
-            logger.trace("Type: {}.", f.getType());
-            logger.trace("GenericType: {}.", f.getGenericType());
-
-            /* Set the field. */
-            if (f.getType().getCanonicalName() == "long") {
-                f.setLong(pe, pm.getProperties().getLong(fieldName));
-                return;
-            } else if (f.getType().getCanonicalName() == "int") {
-                f.setInt(pe, pm.getProperties().getInt(fieldName));
-                return;
-            } else if (f.getType().getCanonicalName() == "float") {
-                f.setFloat(pe, pm.getProperties().getFloat(fieldName));
-                return;
-            } else if (f.getType().getCanonicalName() == "double") {
-                f.setDouble(pe, pm.getProperties().getDouble(fieldName));
-                return;
-            } else if (f.getType().getCanonicalName() == "short") {
-                f.setShort(pe, pm.getProperties().getShort(fieldName));
-                return;
-            } else if (f.getType().getCanonicalName() == "byte") {
-                f.setByte(pe, pm.getProperties().getByte(fieldName));
-                return;
-            } else if (f.getType().getCanonicalName() == "boolean") {
-                f.setBoolean(pe, pm.getProperties().getBoolean(fieldName));
-                return;
-            } else if (f.getType().getCanonicalName() == "char") {
-                f.setChar(pe, (char) pm.getProperties().getByte(fieldName));
-                return;
-            } else if (f.getType().getCanonicalName() == "java.lang.String") {
-                f.set(pe, pm.getProperties().getString(fieldName));
-                return;
-            }
-
-            logger.error("Unable to set field named [{}] in PE of type [{}].", fieldName, type);
-            throw new IllegalArgumentException();
-
-            // production code should handle these exceptions more gracefully
-        } catch (NoSuchFieldException e) {
-            logger.error("There is no field named [{}] in PE of type [{}].", fieldName, type);
-            throw e;
-        } catch (IllegalArgumentException e) {
-            logger.error("Couldn't set value for field [{}] in PE of type [{}].", fieldName, type);
-            throw e;
-        }
-    }
-
-    /* Set the stream fields in PE classes. Infer the field by checking the stream parameter type <? extends Event>. */
-    private void setStreamField(PEMaker pm, Collection<StreamMaker> streams) throws Exception {
-
-        /*
-         * Create a map of the stream fields to the corresponding generic type. We will use this info to assign the
-         * streams. If the field type matches the stream type and there is no ambiguity, then the assignment is easy. If
-         * more than one field has the same type, then then we need to do more work.
-         */
-        Field[] fields = pm.getPe().getClass().getDeclaredFields();
-        Multimap<String, Field> typeMap = LinkedListMultimap.create();
-        logger.debug("Analyzing PE [{}].", pm.getPe().getClass().getName());
-        for (Field field : fields) {
-            logger.trace("Field [{}] is of generic type [{}].", field.getName(), field.getGenericType());
-
-            if (field.getType() == Stream[].class) {
-                logger.debug("Found stream field: {}", field.getGenericType());
-
-                /* Track what fields have streams with the same event type. */
-                String key = field.getGenericType().toString();
-                typeMap.put(key, field);
-            }
-        }
-
-        /* Assign streams to stream fields. */
-        Multimap<Field, Stream<? extends Event>> assignment = LinkedListMultimap.create();
-        for (StreamMaker sm : streams) {
-
-            if (sm == null)
-                continue;
-
-            Stream<? extends Event> stream = sm.getStream();
-            Class<? extends Event> eventType = sm.getType();
-            String key = Stream.class.getCanonicalName() + "<" + eventType.getCanonicalName() + ">[]";
-            if (typeMap.containsKey(key)) {
-                String fieldName;
-                Field field;
-                Collection<Field> streamFields = typeMap.get(key);
-                int numStreamFields = streamFields.size();
-                logger.debug("Found [{}] stream fields for type [{}].", numStreamFields, key);
-
-                if (numStreamFields > 1) {
-
-                    /*
-                     * There is more than one field that can be used for this stream type. To resolve the ambiguity we
-                     * need additional information. The app graph should include the name of the field that should be
-                     * used to assign this stream. If the name is missing we bail out.
-                     */
-                    fieldName = sm.getFieldName();
-
-                    /* Bail out. */
-                    if (fieldName == null) {
-                        String msg = String
-                                .format("There are [%d] stream fields in PE [%s]. To assign stream [%s] you need to provide the field name in the application graph using the method withFiled(). See Javadocs for an example.",
-                                        numStreamFields, pm.getPe().getClass().getName(), stream.getName());
-                        logger.error(msg);
-                        throw new Exception(msg);
-                    }
-
-                    /* Use the provided field name to choose the PE field. */
-                    field = pm.getPe().getClass().getDeclaredField(fieldName);
-
-                } else {
-
-                    /*
-                     * The easy case, no ambiguity, we don't need an explicit field name to be provided. We have the
-                     * field that matches the stream type.
-                     */
-                    Iterator<Field> iter = streamFields.iterator();
-                    field = iter.next(); // Note that numStreamFields == 1, the size of this collection is 1.
-                    logger.debug("Using field [{}].", field.getName());
-                }
-
-                /*
-                 * By now, we found the field to use for this stream or we bailed out. We are not ready to finish yet.
-                 * There may be more than one stream that needs to be assigned to this field. The stream fields must be
-                 * arrays by convention and there may be more than one stream assigned to this fields. For now we create
-                 * a multimap from field to streams so we can construct the array in the next pass.
-                 */
-                assignment.put(field, stream);
-
-            } else {
-
-                /* We couldn't find a match. Tell user to fix the application. */
-                String msg = String.format(
-                        "There is no stream of type [%s] in PE [%s]. I was unable to assign stream [%s].", key, pm
-                                .getPe().getClass().getName(), stream.getName());
-                logger.error(msg);
-                throw new Exception(msg);
-
-            }
-        }
-        /* Now we construct the array and do the final assignment. */
-
-        Map<Field, Collection<Stream<? extends Event>>> assignmentMap = assignment.asMap();
-        for (Map.Entry<Field, Collection<Stream<? extends Event>>> entry : assignmentMap.entrySet()) {
-            Field f = entry.getKey();
-
-            int arraySize = entry.getValue().size();
-            @SuppressWarnings("unchecked")
-            Stream<? extends Event> streamArray[] = (Stream<? extends Event>[]) Array.newInstance(Stream.class,
-                    arraySize);
-            int i = 0;
-            for (Stream<? extends Event> s : entry.getValue()) {
-                streamArray[i++] = s;
-
-                f.setAccessible(true);
-                f.set(pm.getPe(), streamArray);
-                logger.debug("Assigned [{}] streams to field [{}].", streamArray.length, f.getName());
-            }
-        }
-    }
-
-    static private String toString(PEMaker pm) {
-        return pm != null ? pm.getType().getName() + " " : "null ";
-    }
-
-    static private String toString(StreamMaker sm) {
-        return sm != null ? sm.getName() + " " : "null ";
-    }
-
-    /**
-     * A printable representation of the application graph.
-     * 
-     * @return the application graph.
-     */
-    public String toString() {
-
-        StringBuilder sb = new StringBuilder();
-        sb.append("\nApplication Graph for " + this.getClass().getCanonicalName() + "\n");
-        Map<PEMaker, Collection<StreamMaker>> pe2streamMap = pe2stream.asMap();
-        for (Map.Entry<PEMaker, Collection<StreamMaker>> entry : pe2streamMap.entrySet()) {
-            sb.append(toString(entry.getKey()) + "=> ");
-            for (StreamMaker sm : entry.getValue()) {
-                sb.append(toString(sm));
-            }
-            sb.append("\n");
-        }
-
-        Map<StreamMaker, Collection<PEMaker>> stream2peMap = stream2pe.asMap();
-        for (Map.Entry<StreamMaker, Collection<PEMaker>> entry : stream2peMap.entrySet()) {
-            sb.append(toString(entry.getKey()) + "=> ");
-            for (PEMaker pm : entry.getValue()) {
-                sb.append(toString(pm));
-            }
-            sb.append("\n");
-        }
-
-        return sb.toString();
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
deleted file mode 100644
index 9f66e0d..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package org.apache.s4.fluent;
-
-import org.apache.s4.core.App;
-
-import com.google.inject.Inject;
-
-/**
- * The Fluent API uses this class to construct apps automatically. Users should not have to use this class directly.
- * 
- */
-public class FluentApp extends App {
-
-    final private AppMaker appMaker;
-
-    @Inject
-    public FluentApp(AppMaker appMaker) {
-        super();
-        this.appMaker = appMaker;
-        appMaker.setApp(this);
-    }
-
-    @Override
-    protected void onStart() {
-        appMaker.start();
-    }
-
-    @Override
-    protected void onInit() {
-        appMaker.configure();
-        appMaker.make();
-    }
-
-    @Override
-    protected void onClose() {
-        appMaker.close();
-    }
-
-    public void start() {
-        super.start();
-    }
-
-    public void init() {
-        super.init();
-    }
-
-    public void close() {
-        super.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/main/java/org/apache/s4/fluent/PEMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/PEMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/PEMaker.java
deleted file mode 100644
index e5de2cb..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/PEMaker.java
+++ /dev/null
@@ -1,257 +0,0 @@
-package org.apache.s4.fluent;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Event;
-import org.apache.s4.core.ProcessingElement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Helper class to add a processing element to an S4 application.
- * 
- * @see example {@link S4Maker}
- * 
- */
-public class PEMaker {
-
-    private static final Logger logger = LoggerFactory.getLogger(PEMaker.class);
-
-    final private Class<? extends ProcessingElement> type;
-    final private AppMaker app;
-    private ProcessingElement pe = null;
-
-    private long timerInterval = 0;
-
-    private long triggerInterval = 0;
-    private Class<? extends Event> triggerEventType = null;
-    private int triggerNumEvents = 0;
-
-    private int cacheMaximumSize = 0;
-    private long cacheDuration = 0;
-
-    private PropertiesConfiguration properties = new PropertiesConfiguration();
-
-    private boolean isSingleton = false;
-
-    PEMaker(AppMaker app, Class<? extends ProcessingElement> type) {
-        Preconditions.checkNotNull(type);
-        this.type = type;
-        this.app = app;
-        app.add(this, null);
-    }
-
-    /**
-     * Configure the PE expiration and cache size.
-     * <p>
-     * PE instances will be automatically removed from the cache once a fixed duration has elapsed after the PEs
-     * creation, or last access.
-     * <p>
-     * Least accessed PEs will automatically be removed from the cache when the number of PEs approaches maximumSize.
-     * <p>
-     * When this method is called all existing PE instances are destroyed.
-     * 
-     * 
-     * @param maximumSize
-     *            the approximate maximum number of PEs in the cache.
-     * @param duration
-     *            the PE duration
-     * @param timeUnit
-     *            the time unit
-     * @return the PEMaker
-     */
-    public CacheMaker addCache() {
-
-        return new CacheMaker();
-    }
-
-    /**
-     * Configure a trigger that is fired when the following conditions occur:
-     * 
-     * <ul>
-     * <li>An event of eventType arrived to the PE instance
-     * <li>numEvents have arrived since the last time this trigger was fired -OR- time since last event is greater than
-     * interval.
-     * </ul>
-     * 
-     * <p>
-     * When the trigger fires, the method <tt>trigger(EventType event)</tt> is called. Where <tt>EventType</tt> matches
-     * the argument eventType.
-     * 
-     * @param eventType
-     *            the type of event on which this trigger will fire.
-     * @param numEvents
-     *            number of events since last trigger activation. Must be greater than zero. (Set to one to trigger on
-     *            every input event.)
-     * @param interval
-     *            minimum time between triggers. Set to zero if no time interval needed.
-     * @param timeUnit
-     *            the TimeUnit for the argument interval. Can set to null if no time interval needed.
-     * @return the PEMaker
-     */
-    public TriggerMaker addTrigger() {
-
-        return new TriggerMaker();
-    }
-
-    /**
-     * Set a timer that calls {@link ProcessingElement#onTime()}.
-     * 
-     * If {@code interval==0} the timer is disabled.
-     * 
-     * @param interval
-     *            in timeUnit
-     * @param timeUnit
-     *            the timeUnit of interval
-     * @return the PEMaker
-     */
-    public TimerMaker addTimer() {
-        return new TimerMaker();
-    }
-
-    public StreamMaker emit(Class<? extends Event> type) {
-
-        logger.debug("PE [{}] emits event of type [{}].", this.getType().getName(), type.getCanonicalName());
-        StreamMaker stream = new StreamMaker(app, type);
-        app.add(this, stream);
-        return stream;
-    }
-
-    public PEMaker withKey(String key) {
-
-        return this;
-    }
-
-    public PEMaker with(String key, Object value) {
-
-        properties.addProperty(key, value);
-        return this;
-    }
-
-    /**
-     * @return the timerInterval
-     */
-    long getTimerInterval() {
-        return timerInterval;
-    }
-
-    /**
-     * @return the triggerInterval
-     */
-    long getTriggerInterval() {
-        return triggerInterval;
-    }
-
-    /**
-     * @return the triggerEventType
-     */
-    Class<? extends Event> getTriggerEventType() {
-        return triggerEventType;
-    }
-
-    /**
-     * @return the triggerNumEvents
-     */
-    int getTriggerNumEvents() {
-        return triggerNumEvents;
-    }
-
-    /**
-     * @return the cacheMaximumSize
-     */
-    int getCacheMaximumSize() {
-        return cacheMaximumSize;
-    }
-
-    /**
-     * @return the cacheDuration
-     */
-    long getCacheDuration() {
-        return cacheDuration;
-    }
-
-    /**
-     * @return the type
-     */
-    Class<? extends ProcessingElement> getType() {
-        return type;
-    }
-
-    /**
-     * @return the properties
-     */
-    PropertiesConfiguration getProperties() {
-        return properties;
-    }
-
-    /**
-     * @return the pe
-     */
-    public ProcessingElement getPe() {
-        return pe;
-    }
-
-    /**
-     * @param pe
-     *            the pe to set
-     */
-    public void setPe(ProcessingElement pe) {
-        this.pe = pe;
-    }
-
-    public class TriggerMaker {
-
-        public TriggerMaker fireOn(Class<? extends Event> eventType) {
-
-            triggerEventType = eventType;
-            return this;
-        }
-
-        public TriggerMaker ifNumEvents(int numEvents) {
-
-            triggerNumEvents = numEvents;
-            return this;
-        }
-
-        public TriggerMaker ifInterval(long interval, TimeUnit timeUnit) {
-
-            if (timeUnit != null)
-                triggerInterval = timeUnit.convert(interval, TimeUnit.MILLISECONDS);
-            return this;
-        }
-    }
-
-    public class CacheMaker {
-
-        public CacheMaker ofSize(int maxSize) {
-            cacheMaximumSize = maxSize;
-            return this;
-        }
-
-        public CacheMaker withDuration(long duration, TimeUnit timeUnit) {
-            cacheDuration = timeUnit.convert(duration, TimeUnit.MILLISECONDS);
-            return this;
-        }
-    }
-
-    public class TimerMaker {
-
-        public TimerMaker withDuration(long duration, TimeUnit timeUnit) {
-            timerInterval = TimeUnit.MILLISECONDS.convert(duration, timeUnit);
-            timerInterval = duration;
-            return this;
-        }
-    }
-
-    public PEMaker asSingleton() {
-        this.isSingleton = true;
-        return this;
-    }
-
-    public boolean isSingleton() {
-        return isSingleton;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/main/java/org/apache/s4/fluent/StreamMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/StreamMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/StreamMaker.java
deleted file mode 100644
index 3d3ea0e..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/StreamMaker.java
+++ /dev/null
@@ -1,168 +0,0 @@
-package org.apache.s4.fluent;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.base.KeyFinder;
-import org.apache.s4.core.Stream;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Helper class to add a stream to an S4 application.
- * 
- * @see example {@link S4Maker}
- * 
- */
-public class StreamMaker {
-
-    final private AppMaker app;
-    final private Class<? extends Event> type;
-    private String name = null;
-    private KeyFinder<? extends Event> keyFinder;
-    private String keyDescriptor = null;
-    private String fieldName;
-    private Stream<? extends Event> stream = null;
-
-    StreamMaker(AppMaker app, Class<? extends Event> type) {
-
-        Preconditions.checkNotNull(type);
-        this.app = app;
-        this.type = type;
-        app.add(null, this);
-    }
-
-    /**
-     * Name the stream.
-     * 
-     * @param name
-     *            the stream name, default is an empty string.
-     * @return the stream maker object
-     */
-    public StreamMaker withName(String name) {
-        this.name = name;
-        return this;
-    }
-
-    /**
-     * Define the key finder for this stream.
-     * 
-     * @param keyFinder
-     *            a function to lookup the value of the key.
-     * @return the stream maker.
-     */
-    public <T extends Event> StreamMaker onKey(KeyFinder<T> keyFinder) {
-        this.keyFinder = keyFinder;
-        return this;
-    }
-
-    /**
-     * Define the key finder for this stream using a descriptor.
-     * 
-     * @param keyFinderString
-     *            a descriptor to lookup the value of the key.
-     * @return the stream maker.
-     */
-    public StreamMaker onKey(String keyDescriptor) {
-
-        this.keyDescriptor = keyDescriptor;
-        return this;
-    }
-
-    /**
-     * Send events from this stream to a PE.
-     * 
-     * @param pe
-     *            a target PE.
-     * 
-     * @return the stream maker.
-     */
-    public StreamMaker to(PEMaker pe) {
-        app.add(this, pe);
-        return this;
-    }
-
-    /**
-     * Send events from this stream to various PEs.
-     * 
-     * @param pe
-     *            a target PE array.
-     * 
-     * @return the stream maker.
-     */
-    public StreamMaker to(PEMaker[] pes) {
-        for (int i = 0; i < pes.length; i++)
-            app.add(this, pes[i]);
-        return this;
-    }
-
-    /**
-     * @return the app
-     */
-    AppMaker getApp() {
-        return app;
-    }
-
-    /**
-     * @return the type
-     */
-    Class<? extends Event> getType() {
-        return type;
-    }
-
-    /**
-     * @return the name
-     */
-    String getName() {
-
-        if (name != null) {
-            return name;
-        }
-
-        if (keyDescriptor != null) {
-            return type.getCanonicalName() + "," + keyDescriptor;
-        } else if (keyFinder != null) {
-            return type.getCanonicalName() + "," + keyFinder.getClass().getCanonicalName();
-        } else
-            return type.getCanonicalName();
-    }
-
-    /**
-     * @return the keyFinder
-     */
-    KeyFinder<? extends Event> getKeyFinder() {
-        return keyFinder;
-    }
-
-    /**
-     * @return the keyDescriptor
-     */
-    String getKeyDescriptor() {
-        return keyDescriptor;
-    }
-
-    /**
-     * @return the field name
-     */
-    String getFieldName() {
-        return fieldName;
-    }
-
-    /**
-     * @return the stream
-     */
-    public Stream<? extends Event> getStream() {
-        return stream;
-    }
-
-    /**
-     * @param stream
-     *            the stream to set
-     */
-    public void setStream(Stream<? extends Event> stream) {
-        this.stream = stream;
-    }
-
-    public StreamMaker withField(String fieldName) {
-        this.fieldName = fieldName;
-        return this;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/AppMakerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/AppMakerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/AppMakerTest.java
deleted file mode 100644
index 742eb7e..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/AppMakerTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.s4.fluent;
-
-import java.lang.reflect.Field;
-
-import org.junit.Test;
-
-public class AppMakerTest {
-
-    @Test
-    public void test() throws Exception {
-
-        MyApp myApp = new MyApp();
-        myApp.setApp(new FluentApp(myApp));
-        myApp.configure();
-        System.out.println(myApp.toString());
-        myApp.make();
-    }
-
-    @Test
-    public void testReflection() {
-
-        try {
-            Class<?> c = PEY.class;
-            Field f = c.getDeclaredField("duration");
-            System.out.format("Type: %s%n", f.getType());
-            System.out.format("GenericType: %s%n", f.getGenericType());
-
-            // production code should handle these exceptions more gracefully
-        } catch (NoSuchFieldException x) {
-            x.printStackTrace();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/DurationKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/DurationKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/DurationKeyFinder.java
deleted file mode 100644
index c1831e5..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/DurationKeyFinder.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.s4.fluent;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.s4.base.KeyFinder;
-
-public class DurationKeyFinder implements KeyFinder<EventA> {
-
-    public List<String> get(EventA event) {
-
-        List<String> results = new ArrayList<String>();
-
-        /* Retrieve the gender and add it to the list. */
-        results.add(Long.toString(event.getDuration()));
-
-        return results;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventA.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventA.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventA.java
deleted file mode 100644
index ccf37e2..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventA.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package org.apache.s4.fluent;
-
-import org.apache.s4.base.Event;
-
-public class EventA extends Event {
-
-    private long duration;
-    private int height;
-
-    /**
-     * @return the duration
-     */
-    public long getDuration() {
-        return duration;
-    }
-
-    /**
-     * @param duration
-     *            the duration to set
-     */
-    public void setDuration(long duration) {
-        this.duration = duration;
-    }
-
-    /**
-     * @return the height
-     */
-    public int getHeight() {
-        return height;
-    }
-
-    /**
-     * @param height the height to set
-     */
-    public void setHeight(int height) {
-        this.height = height;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventB.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventB.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventB.java
deleted file mode 100644
index 3e285e3..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventB.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.s4.fluent;
-
-import org.apache.s4.base.Event;
-
-public class EventB extends Event {
-
-    private String query;
-
-    /**
-     * @return the query
-     */
-    public String getQuery() {
-        return query;
-    }
-
-    /**
-     * @param query
-     *            the query to set
-     */
-    public void setQuery(String query) {
-        this.query = query;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/HeightKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/HeightKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/HeightKeyFinder.java
deleted file mode 100644
index 4bf4afe..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/HeightKeyFinder.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.s4.fluent;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.s4.base.KeyFinder;
-
-public class HeightKeyFinder implements KeyFinder<EventA> {
-
-    public List<String> get(EventA event) {
-
-        List<String> results = new ArrayList<String>();
-
-        /* Retrieve the gender and add it to the list. */
-        results.add(Integer.toString(event.getHeight()));
-
-        return results;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/MyApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/MyApp.java
deleted file mode 100644
index b6678c5..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/MyApp.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.apache.s4.fluent;
-
-import java.util.concurrent.TimeUnit;
-
-public class MyApp extends AppMaker {
-
-    @Override
-    public void configure() {
-
-        PEMaker pez, pey, pex;
-
-        /* Configure processing element pez. */
-        pez = addPE(PEZ.class);
-        pez.addTrigger().fireOn(EventA.class).ifInterval(5, TimeUnit.SECONDS);
-        pez.addCache().ofSize(1000).withDuration(3, TimeUnit.HOURS);
-
-        /* Configure processing element pey. */
-        pey = addPE(PEY.class).with("duration", 4).with("height", 99);
-        pey.addTimer().withDuration(2, TimeUnit.MINUTES);
-
-        /* Configure processing element pex. */
-        pex = addPE(PEX.class).with("query", "money").asSingleton();
-        pex.addCache().ofSize(100).withDuration(1, TimeUnit.MINUTES);
-
-        /* Construct the graph. */
-        pey.emit(EventA.class).withField("stream3").onKey(new DurationKeyFinder()).to(pez);
-        pey.emit(EventA.class).withField("heightpez").onKey(new HeightKeyFinder()).to(pez);
-        pez.emit(EventB.class).to(pex);
-        pex.emit(EventB.class).onKey(new QueryKeyFinder()).to(pey).to(pez);
-    }
-
-    @Override
-    public void start() {
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    public void close() {
-        // TODO Auto-generated method stub
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEX.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEX.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEX.java
deleted file mode 100644
index 4ebc581..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEX.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package org.apache.s4.fluent;
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Stream;
-
-public class PEX extends ProcessingElement {
-
-    private String query;
-    private Stream<EventB>[] someStream;
-    @SuppressWarnings("unused")
-    private Stream<EventA>[] streams;
-
-    public PEX(App app) {
-        super(app);
-    }
-
-    @Override
-    public void onCreate() {
-
-    }
-
-    @Override
-    public void onRemove() {
-
-    }
-
-    /**
-     * @return the keyword
-     */
-    String getKeyword() {
-        return query;
-    }
-
-    /**
-     * @param query
-     *            the keyword to set
-     */
-    void setKeyword(String query) {
-        this.query = query;
-    }
-
-    /**
-     * @return the someStream
-     */
-    public Stream<EventB>[] getSomeStream() {
-        return someStream;
-    }
-
-    /**
-     * @param someStream
-     *            the someStream to set
-     */
-    public void setSomeStream(Stream<EventB>[] someStream) {
-        this.someStream = someStream;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEY.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEY.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEY.java
deleted file mode 100644
index 4c03cd9..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEY.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package org.apache.s4.fluent;
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Stream;
-
-public class PEY extends ProcessingElement {
-
-    private Stream<EventA>[] stream3;
-    @SuppressWarnings("unused")
-    private Stream<EventA>[] heightpez;
-
-    private int height;
-    private long duration;
-
-    public PEY(App app) {
-        super(app);
-    }
-
-    @Override
-    public void onCreate() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void onRemove() {
-        // TODO Auto-generated method stub
-
-    }
-
-    /**
-     * @return the stream3
-     */
-    Stream<EventA>[] getStream3() {
-        return stream3;
-    }
-
-    /**
-     * @param stream3
-     *            the stream3 to set
-     */
-    void setStream3(Stream<EventA>[] stream3) {
-        this.stream3 = stream3;
-    }
-
-    /**
-     * @return the height
-     */
-    int getHeight() {
-        return height;
-    }
-
-    /**
-     * @param height
-     *            the height to set
-     */
-    void setHeight(int height) {
-        this.height = height;
-    }
-
-    /**
-     * @return the duration
-     */
-    long getDuration() {
-        return duration;
-    }
-
-    /**
-     * @param duration
-     *            the duration to set
-     */
-    void setDuration(long duration) {
-        this.duration = duration;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEZ.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEZ.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEZ.java
deleted file mode 100644
index 25a08fc..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEZ.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package org.apache.s4.fluent;
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Stream;
-
-public class PEZ extends ProcessingElement {
-
-    private Stream<EventA>[] stream1;
-    private Stream<EventB>[] stream2;
-
-    public PEZ(App app) {
-        super(app);
-    }
-
-    /**
-     * @return the stream1
-     */
-    Stream<EventA>[] getStream1() {
-        return stream1;
-    }
-
-    /**
-     * @param stream1
-     *            the stream1 to set
-     */
-    void setStream1(Stream<EventA>[] stream1) {
-        this.stream1 = stream1;
-    }
-
-    /**
-     * @return the stream2
-     */
-    Stream<EventB>[] getStream2() {
-        return stream2;
-    }
-
-    /**
-     * @param stream2
-     *            the stream2 to set
-     */
-    void setStream2(Stream<EventB>[] stream2) {
-        this.stream2 = stream2;
-    }
-
-    @Override
-    public void onCreate() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void onRemove() {
-        // TODO Auto-generated method stub
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/QueryKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/QueryKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/QueryKeyFinder.java
deleted file mode 100644
index 050427b..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/QueryKeyFinder.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.s4.fluent;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.s4.base.KeyFinder;
-
-public class QueryKeyFinder implements KeyFinder<EventB> {
-
-    public List<String> get(EventB event) {
-
-        List<String> results = new ArrayList<String>();
-
-        /* Retrieve the gender and add it to the list. */
-        results.add(event.getQuery());
-
-        return results;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/s4-edsl.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/s4-edsl.gradle b/subprojects/s4-edsl/s4-edsl.gradle
index 4568799..7dc2c07 100644
--- a/subprojects/s4-edsl/s4-edsl.gradle
+++ b/subprojects/s4-edsl/s4-edsl.gradle
@@ -19,6 +19,13 @@ sourceSets {
 buildscript {
     repositories {
         mavenCentral()
+        
+        maven {
+            url "http://oss.sonatype.org/content/repositories/snapshots"
+        }
+        maven {
+            url "http://oss.sonatype.org/content/repositories/releases"
+        }
     }
     dependencies {
         classpath libraries.diezel
@@ -39,3 +46,5 @@ task generateSources << {
 
 compileJava.source generateSources.outputs.files, sourceSets.main.java
 
+eclipseClasspath.dependsOn generateSources
+

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/main/diezel/s4/s4-impl.xml
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/main/diezel/s4/s4-impl.xml b/subprojects/s4-edsl/src/main/diezel/s4/s4-impl.xml
index 4e2cdbe..94e0dde 100644
--- a/subprojects/s4-edsl/src/main/diezel/s4/s4-impl.xml
+++ b/subprojects/s4-edsl/src/main/diezel/s4/s4-impl.xml
@@ -1,83 +1,109 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <diezelImplementation xmlns="http://diezel.ericaro.net/2.0.0/">
-	<package>org.apache.s4.core.edsl</package>
+	<package>org.apache.s4.edsl</package>
 	<name>Builder</name>
-	<implements>org.apache.s4.core.edsl.S4DSL</implements>
+	<extends>AppBuilder</extends>
+	<implements>org.apache.s4.edsl.S4DSL</implements>	
 	<transitions>
 		<transitionImplementation name="pe">
 			<body>
-				System.out.println("pe");
+			    clearPEState();peName = name;logger.debug("PE name: " + peName);
 			</body>
 		</transitionImplementation>
 		<transitionImplementation name="type">
 			<body>
-				System.out.println("type");
+				processingElement = createPE(peType, peName);
+				logger.debug("peType: " + peType);
+			</body>
+		</transitionImplementation>
+		<transitionImplementation name="prop">
+			<body>
+			    addProperty(propName, propValue);
+			    logger.debug("prop: " + propName + " = " + propValue);
 			</body>
 		</transitionImplementation>
 		<transitionImplementation name="fireOn">
 			<body>
-				System.out.println("fireOn");
+				triggerEventType = eventType;
+				logger.debug("EventType: " + eventType);
+			</body>
+		</transitionImplementation>
+		<transitionImplementation name="afterInterval">
+			<body>
+				triggerInterval = interval;	triggerTimeUnit = timeUnit; processingElement.setTrigger(triggerEventType, 1, triggerInterval, triggerTimeUnit); logger.debug("Interval: " + triggerInterval);
 			</body>
 		</transitionImplementation>
-		<transitionImplementation name="ifInterval">
+		<transitionImplementation name="afterNumEvents">
 			<body>
-				System.out.println("ifInterval");
+				processingElement.setTrigger(triggerEventType, numEvents, triggerInterval, triggerTimeUnit);
+				logger.debug("afterNumeEvents: " + numEvents);
 			</body>
 		</transitionImplementation>
 		<transitionImplementation name="timer">
 			<body>
-				System.out.println("timer");
+				logger.debug("timer on");
 			</body>
 		</transitionImplementation>
 		<transitionImplementation name="withPeriod">
 			<body>
-				System.out.println("withPeriod");
+				processingElement.setTimerInterval(interval, timeUnit);
+				logger.debug("withPeriod: " + interval);
 			</body>
 		</transitionImplementation>
 		<transitionImplementation name="cache">
 			<body>
-				System.out.println("cache");
+				logger.debug("cache");
+			</body>
+		</transitionImplementation>
+		<transitionImplementation name="size">
+			<body>
+				cacheSize = size; processingElement.setPECache(size); logger.debug("cache size: " + cacheSize);
 			</body>
 		</transitionImplementation>
 		<transitionImplementation name="expires">
 			<body>
-				System.out.println("expires");
+				processingElement.setPECache(cacheSize, duration, timeUnit); logger.debug("expires: " + duration);
 			</body>
 		</transitionImplementation>
 		<transitionImplementation name="asSingleton">
 			<body>
-				System.out.println("asSingleton");
+				processingElement.setSingleton(true);
+				logger.debug("asSingleton");
 			</body>
 		</transitionImplementation>
-		<transitionImplementation name="emitEvent">
+		<transitionImplementation name="emit">
 			<body>
-				System.out.println("emitEvent");
+				streamBuilder = new StreamBuilder(app, event); streamBuilders.add(streamBuilder); addPe2Stream(processingElement, streamBuilder); logger.debug("emit event: " + event);
 			</body>
 		</transitionImplementation>
 		<transitionImplementation name="onField">
 			<body>
-				System.out.println("onField");
+				streamBuilder.setFieldName(fieldName);
+				logger.debug("onField");
 			</body>
 		</transitionImplementation>
-		<transitionImplementation name="onKey">
+		<transitionImplementation name="withKeyFinder">
 			<body>
-				System.out.println("onKey");
+				streamBuilder.setKeyFinder(keyFinder);
+				logger.debug("withKeyFinder");
 			</body>
 		</transitionImplementation>
-		<transitionImplementation name="to">
+		<transitionImplementation name="withKey">
 			<body>
-				System.out.println("to");
+				streamBuilder.setKey(key);
+				logger.debug("withKey");
 			</body>
 		</transitionImplementation>
-		<transitionImplementation name="info">
+		<transitionImplementation name="to">
 			<body>
-				System.out.println("info");
+				streamBuilder.to(targetPeName);
+				logger.debug("to: " + targetPeName);
 			</body>
 		</transitionImplementation>
 		<transitionImplementation name="build">
 			<body>
-				System.out.println("build");
-				return "Done!";
+				logger.debug("build");
+				return buildApp();
 			</body>
 		</transitionImplementation>
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/main/diezel/s4/s4.xml
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/main/diezel/s4/s4.xml b/subprojects/s4-edsl/src/main/diezel/s4/s4.xml
index 85d6bcb..008da11 100644
--- a/subprojects/s4-edsl/src/main/diezel/s4/s4.xml
+++ b/subprojects/s4-edsl/src/main/diezel/s4/s4.xml
@@ -1,56 +1,62 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <diezel xmlns="http://diezel.ericaro.net/2.0.0/">
-	<package>org.apache.s4.core.edsl</package>
+	<package>org.apache.s4.edsl</package>
 	<name>S4DSL</name>
-	<expression>(pe , type , (fireOn , ifInterval? )? , (timer, withPeriod)? , (cache, size , expires? )? , asSingleton? , (emitEvent, onField?, onKey?, to )+  )+ , info?, build</expression>
+	<expression>(pe , type , prop* , (fireOn , afterInterval? , afterNumEvents?)? , (timer, withPeriod)? , (cache, size , expires? )? , asSingleton? , (emit, onField?, (withKey|withKeyFinder)?, to+ )*  )+ , build</expression>
 	<transitions>
 		<transition name="pe">
-			<signature>pe()</signature>
+			<signature>pe(java.lang.String name)</signature>
 		</transition>
 		<transition name="type">
-			<signature>type()</signature>
+			<signature><![CDATA[type(java.lang.Class<? extends org.apache.s4.core.ProcessingElement> peType)]]></signature>
+		</transition>
+		<transition name="prop">
+			<signature>prop(java.lang.String propName, java.lang.String propValue)</signature>
 		</transition>
 		<transition name="fireOn">
-			<signature>fireOn()</signature>
+			<signature><![CDATA[fireOn(java.lang.Class<? extends org.apache.s4.base.Event> eventType)]]></signature>
+		</transition>
+		<transition name="afterInterval">
+			<signature>afterInterval(long interval, java.util.concurrent.TimeUnit timeUnit)</signature>
 		</transition>
-		<transition name="ifInterval">
-			<signature>ifInterval()</signature>
+		<transition name="afterNumEvents">
+			<signature>afterNumEvents(int numEvents)</signature>
 		</transition>
 		<transition name="timer">
 			<signature>timer()</signature>
 		</transition>
 		<transition name="withPeriod">
-			<signature>withPeriod()</signature>
+			<signature>withPeriod(long interval, java.util.concurrent.TimeUnit timeUnit)</signature>
 		</transition>
 		<transition name="cache">
 			<signature>cache()</signature>
 		</transition>
 		<transition name="size">
-			<signature>size()</signature>
+			<signature>size(int size)</signature>
 		</transition>
 		<transition name="expires">
-			<signature>expires()</signature>
+			<signature>expires(long duration, java.util.concurrent.TimeUnit timeUnit)</signature>
 		</transition>
 		<transition name="asSingleton">
 			<signature>asSingleton()</signature>
 		</transition>
-		<transition name="emitEvent">
-			<signature>emitEvent()</signature>
+		<transition name="emit">
+			<signature><![CDATA[emit(java.lang.Class<? extends org.apache.s4.base.Event> event)]]></signature>
 		</transition>
 		<transition name="onField">
-			<signature>onField()</signature>
+			<signature>onField(java.lang.String fieldName)</signature>
 		</transition>
-		<transition name="onKey">
-			<signature>onKey()</signature>
+		<transition name="withKeyFinder">
+			<signature><![CDATA[withKeyFinder(org.apache.s4.base.KeyFinder<? extends org.apache.s4.base.Event> keyFinder)]]></signature>
 		</transition>
+		<transition name="withKey">
+			<signature>withKey(java.lang.String key)</signature>
+		</transition>		
 		<transition name="to">
-			<signature>to()</signature>
-		</transition>
-		<transition name="info">
-			<signature>info()</signature>
+			<signature>to(java.lang.String targetPeName)</signature>
 		</transition>
 		<transition name="build">
-			<return>java.lang.String</return>
+			<return>org.apache.s4.core.App</return>
 			<signature>build()</signature>
 		</transition>
 	</transitions>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/main/java/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/main/java/Main.java b/subprojects/s4-edsl/src/main/java/Main.java
deleted file mode 100644
index b2d50ba..0000000
--- a/subprojects/s4-edsl/src/main/java/Main.java
+++ /dev/null
@@ -1,15 +0,0 @@
-import org.apache.s4.core.edsl.BuilderS4DSL;
-
-public class Main {
-    public static BuilderS4DSL build() {
-        return new BuilderS4DSL();
-    }
-
-    public static void main(String[] args) {
-
-        String app = new BuilderS4DSL().pe().type().fireOn().ifInterval().cache().size().asSingleton().emitEvent()
-                .onField().to().build();
-        System.out.println(app);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/AppBuilder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/AppBuilder.java b/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/AppBuilder.java
new file mode 100644
index 0000000..cf7fa69
--- /dev/null
+++ b/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/AppBuilder.java
@@ -0,0 +1,330 @@
+package org.apache.s4.edsl;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+/**
+ * Implementation of the S4 embedded domain-specific language (EDSL).
+ * 
+ * <p>
+ * To write an app extend this class and define the application graph using a chain of methods as follows:
+ * 
+ * <pre>
+ *    final public class MyApp extends BuilderS4DSL {
+ * 
+ *     protected void onInit() {
+ * 
+ *         pe("Consumer").type(ConsumerPE.class).asSingleton().
+ *         pe("Producer").type(ProducerPE.class).timer().withPeriod(1, TimeUnit.MILLISECONDS).asSingleton().
+ *         emit(SomeEvent.class).withKey("someKey").to("Consumer").
+ *         build()
+ *    }
+ * </pre>
+ * 
+ * <p>
+ * A few things to notice:
+ * <ul>
+ * <li>Applications must extend class {@link BuilderS4DSL}
+ * <li>The graph definition is implemented in the {@link App#onInit} method which is called by the container when the
+ * application is loaded.
+ * <li>PEs are defined using strings because they need to be referenced by other parts of the graph. By doing this, we
+ * can create the whole application in a single chain of methods.
+ * <li>To assign target streams to PE fields additional information may need to be provided using the {@code onField}
+ * grammar token when there is an ambiguity. This will happen when a PE has more than one targetStream field with the
+ * same {@link Event} type. Use the construct {@code emit(SomeEvent.class).onField("streamFieldName")}. If the PE
+ * doesn't have a field named {@code "streamField"} whose stream parameter type is {@code someEvent)} then the parser
+ * will fail to build the app.
+ * <li>To configure a PE, set property values by chaining any number of {@code prop(name, value)} methods. The name
+ * should match a PE field and the value will be parsed using the type of that field.
+ * </ul>
+ * <p>
+ * Grammar:
+ * 
+ * <pre>
+ *  (pe , type , prop* , (fireOn , afterInterval? , afterNumEvents?)? , (timer, withPeriod)? , 
+ *  (cache, size , expires? )? , asSingleton? , (emit, onField?, 
+ *  (withKey|withKeyFinder)?, to+ )*  )+ , build
+ * </pre>
+ * 
+ * <p>
+ * See the <a href="http://code.google.com/p/diezel">Diezel</a> project for details.
+ * 
+ * @author Leo Neumeyer (@leoneu)
+ */
+public class AppBuilder extends App {
+
+    protected App app = this;
+
+    static final Logger logger = LoggerFactory.getLogger(AppBuilder.class);
+
+    private Multimap<ProcessingElement, StreamBuilder<? extends Event>> pe2stream = LinkedListMultimap.create();
+    Set<StreamBuilder<? extends Event>> streamBuilders = Sets.newHashSet();
+
+    /* Variables used to hold values from state to state. */
+    ProcessingElement processingElement;
+    String peName;
+    Class<? extends Event> triggerEventType;
+    long triggerInterval = 0;
+    TimeUnit triggerTimeUnit;
+    int cacheSize;
+    StreamBuilder<? extends Event> streamBuilder;
+    String propertyName, propertyValue;
+
+    public static AppBuilder getAppBuilder() {
+        return new BuilderS4DSL();
+    }
+
+    void addProperty(String name, String value) {
+        propertyName = name;
+        propertyValue = value;
+        setField();
+    }
+
+    void addPe2Stream(ProcessingElement pe, StreamBuilder<? extends Event> st) {
+        pe2stream.put(pe, st);
+    }
+
+    App buildApp() {
+
+        /* Stream to PE writing. */
+        for (StreamBuilder<? extends Event> sb : streamBuilders) {
+            for (String peName : sb.pes) {
+                ProcessingElement pe = getPE(peName);
+                sb.stream.setPE(pe);
+            }
+        }
+
+        /* PE to Stream wiring. */
+        Map<ProcessingElement, Collection<StreamBuilder<? extends Event>>> pe2streamMap = pe2stream.asMap();
+        for (Map.Entry<ProcessingElement, Collection<StreamBuilder<? extends Event>>> entry : pe2streamMap.entrySet()) {
+            ProcessingElement pe = entry.getKey();
+            Collection<StreamBuilder<? extends Event>> streams = entry.getValue();
+
+            if (pe != null && streams != null) {
+                try {
+                    setStreamField(pe, streams);
+                } catch (Exception e) {
+                    logger.error("Unable to build app.", e);
+                    return null;
+                }
+            }
+        }
+
+        return this;
+    }
+
+    /**
+     * @param peName
+     *            the peName to set
+     */
+    protected void setPeName(String peName) {
+        this.peName = peName;
+    }
+
+    /*
+     * Cannot create an abstract class in Diezel so for now, I just implement the abstract methods here. They need to be
+     * overloaded by the app developer.
+     */
+    @Override
+    protected void onStart() {
+    }
+
+    @Override
+    protected void onInit() {
+    }
+
+    @Override
+    protected void onClose() {
+    }
+
+    private <T extends ProcessingElement> void setField() {
+
+        logger.debug("Adding property [{}] to PE of type [{}].", propertyName, processingElement.getClass().getName());
+
+        Class<? extends ProcessingElement> type = processingElement.getClass();
+
+        try {
+            Field f = type.getDeclaredField(propertyName);
+            f.setAccessible(true);
+            logger.trace("Type: {}.", f.getType());
+            logger.trace("GenericType: {}.", f.getGenericType());
+
+            /* Set the field. */
+            if (f.getType().getCanonicalName() == "long") {
+                f.setLong(processingElement, Long.parseLong(propertyValue));
+                return;
+            } else if (f.getType().getCanonicalName() == "int") {
+                f.setInt(processingElement, Integer.parseInt(propertyValue));
+                return;
+            } else if (f.getType().getCanonicalName() == "float") {
+                f.setFloat(processingElement, Float.parseFloat(propertyValue));
+                return;
+            } else if (f.getType().getCanonicalName() == "double") {
+                f.setDouble(processingElement, Double.parseDouble(propertyValue));
+                return;
+            } else if (f.getType().getCanonicalName() == "short") {
+                f.setShort(processingElement, Short.parseShort(propertyValue));
+                return;
+            } else if (f.getType().getCanonicalName() == "byte") {
+                f.setByte(processingElement, Byte.parseByte(propertyValue));
+                return;
+            } else if (f.getType().getCanonicalName() == "boolean") {
+                f.setBoolean(processingElement, Boolean.parseBoolean(propertyValue));
+                return;
+            } else if (f.getType().getCanonicalName() == "char") {
+                f.setChar(processingElement, (char) Byte.parseByte(propertyValue));
+                return;
+            } else if (f.getType().getCanonicalName() == "java.lang.String") {
+                f.set(processingElement, propertyValue);
+                return;
+            }
+
+            logger.error("Unable to set field named [{}] in PE of type [{}].", propertyName, type);
+            throw new IllegalArgumentException();
+
+            // production code should handle these exceptions more gracefully
+        } catch (NoSuchFieldException e) {
+            logger.error("There is no field named [{}] in PE of type [{}].", propertyName, type);
+        } catch (Exception e) {
+            logger.error("Couldn't set value for field [{}] in PE of type [{}].", propertyName, type);
+        }
+    }
+
+    /* Set the stream fields in PE classes. Infer the field by checking the stream parameter type <? extends Event>. */
+    private void setStreamField(ProcessingElement pe, Collection<StreamBuilder<? extends Event>> streams)
+            throws Exception {
+
+        /*
+         * Create a map of the stream fields to the corresponding generic type. We will use this info to assign the
+         * streams. If the field type matches the stream type and there is no ambiguity, then the assignment is easy. If
+         * more than one field has the same type, then then we need to do more work.
+         */
+        Field[] fields = pe.getClass().getDeclaredFields();
+        Multimap<String, Field> typeMap = LinkedListMultimap.create();
+        logger.debug("Analyzing PE [{}].", pe.getClass().getName());
+        for (Field field : fields) {
+            logger.trace("Field [{}] is of generic type [{}].", field.getName(), field.getGenericType());
+
+            if (field.getType() == Stream[].class) {
+                logger.debug("Found stream field: {}", field.getGenericType());
+
+                /* Track what fields have streams with the same event type. */
+                String key = field.getGenericType().toString();
+                typeMap.put(key, field);
+            }
+        }
+
+        /* Assign streams to stream fields. */
+        Multimap<Field, Stream<? extends Event>> assignment = LinkedListMultimap.create();
+        for (StreamBuilder<? extends Event> sm : streams) {
+
+            Stream<? extends Event> stream = sm.stream;
+            Class<? extends Event> eventType = sm.type;
+            String key = Stream.class.getCanonicalName() + "<" + eventType.getCanonicalName() + ">[]";
+            if (typeMap.containsKey(key)) {
+                String fieldName;
+                Field field;
+                Collection<Field> streamFields = typeMap.get(key);
+                int numStreamFields = streamFields.size();
+                logger.debug("Found [{}] stream fields for type [{}].", numStreamFields, key);
+
+                if (numStreamFields > 1) {
+
+                    /*
+                     * There is more than one field that can be used for this stream type. To resolve the ambiguity we
+                     * need additional information. The app graph should include the name of the field that should be
+                     * used to assign this stream. If the name is missing we bail out.
+                     */
+                    fieldName = sm.fieldName;
+
+                    /* Bail out. */
+                    if (fieldName == null) {
+                        String msg = String
+                                .format("There are [%d] stream fields in PE [%s]. To assign stream [%s] you need to provide the field name in the application graph using the method withFiled(). See Javadocs for an example.",
+                                        numStreamFields, pe.getClass().getName(), stream.getName());
+                        logger.error(msg);
+                        throw new Exception(msg);
+                    }
+
+                    /* Use the provided field name to choose the PE field. */
+                    field = pe.getClass().getDeclaredField(fieldName);
+
+                } else {
+
+                    /*
+                     * The easy case, no ambiguity, we don't need an explicit field name to be provided. We have the
+                     * field that matches the stream type.
+                     */
+                    Iterator<Field> iter = streamFields.iterator();
+                    field = iter.next(); // Note that numStreamFields == 1, the size of this collection is 1.
+                    logger.debug("Using field [{}].", field.getName());
+                }
+
+                /*
+                 * By now, we found the field to use for this stream or we bailed out. We are not ready to finish yet.
+                 * There may be more than one stream that needs to be assigned to this field. The stream fields must be
+                 * arrays by convention and there may be more than one stream assigned to this fields. For now we create
+                 * a multimap from field to streams so we can construct the array in the next step.
+                 */
+                assignment.put(field, stream);
+
+            } else {
+
+                /* We couldn't find a match. Tell user to fix the EDSL code. */
+                String msg = String.format(
+                        "There is no stream of type [%s] in PE [%s]. I was unable to assign stream [%s].", key, pe
+                                .getClass().getName(), stream.getName());
+                logger.error(msg);
+                throw new Exception(msg);
+
+            }
+        }
+        /* Now we construct the array and do the final assignment. */
+
+        Map<Field, Collection<Stream<? extends Event>>> assignmentMap = assignment.asMap();
+        for (Map.Entry<Field, Collection<Stream<? extends Event>>> entry : assignmentMap.entrySet()) {
+            Field f = entry.getKey();
+
+            int arraySize = entry.getValue().size();
+            @SuppressWarnings("unchecked")
+            Stream<? extends Event> streamArray[] = (Stream<? extends Event>[]) Array.newInstance(Stream.class,
+                    arraySize);
+            int i = 0;
+            for (Stream<? extends Event> s : entry.getValue()) {
+                streamArray[i++] = s;
+
+                f.setAccessible(true);
+                f.set(pe, streamArray);
+                logger.debug("Assigned [{}] streams to field [{}].", streamArray.length, f.getName());
+            }
+        }
+    }
+
+    void clearPEState() {
+        propertyName = null;
+        propertyValue = null;
+        processingElement = null;
+        peName = null;
+        triggerEventType = null;
+        triggerTimeUnit = null;
+        cacheSize = -1;
+        streamBuilder = null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/StreamBuilder.java b/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/StreamBuilder.java
new file mode 100644
index 0000000..9412855
--- /dev/null
+++ b/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/StreamBuilder.java
@@ -0,0 +1,75 @@
+package org.apache.s4.edsl;
+
+import java.util.Set;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+import org.apache.s4.core.Stream;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+/**
+ * Helper class to add a stream to an S4 application. This class and methods are private package. No need for app
+ * developers to see this class.
+ * 
+ */
+class StreamBuilder<T extends Event> {
+
+    Class<T> type;
+    String fieldName;
+    Stream<T> stream;
+    Set<String> pes = Sets.newHashSet();
+
+    StreamBuilder(App app, Class<T> type) {
+
+        Preconditions.checkNotNull(type);
+        this.type = type;
+        stream = app.createStream(type);
+        stream.setName(type.getCanonicalName()); // Default name.
+    }
+
+    void setEventType(Class<T> type) {
+        this.type = type;
+    }
+
+    /**
+     * Name the stream.
+     * 
+     * @param name
+     *            the stream name, default is an empty string.
+     * @return the stream maker object
+     */
+    void setName(String name) {
+        stream.setName(name);
+    }
+
+    /**
+     * Define the key finder for this stream.
+     * 
+     * @param keyFinder
+     *            a function to lookup the value of the key.
+     */
+    @SuppressWarnings("unchecked")
+    void setKeyFinder(KeyFinder<?> keyFinder) {
+        stream.setKey((KeyFinder<T>) keyFinder);
+        stream.setName(type.getCanonicalName() + "," + keyFinder.getClass().getCanonicalName());
+    }
+
+    void setKey(String keyDescriptor) {
+
+        stream.setKey(keyDescriptor);
+        stream.setName(type.getCanonicalName() + "," + keyDescriptor);
+    }
+
+    // Not all PE may have been created, we use PE Name as a placeholder. The PE prototypes will be assigned in the
+    // buildApp() method in AppBuilder.
+    void to(String peName) {
+        pes.add(peName);
+    }
+
+    void setFieldName(String fieldName) {
+        this.fieldName = fieldName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/DurationKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/DurationKeyFinder.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/DurationKeyFinder.java
new file mode 100644
index 0000000..78bddb0
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/DurationKeyFinder.java
@@ -0,0 +1,19 @@
+package org.apache.s4.edsl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.base.KeyFinder;
+
+public class DurationKeyFinder implements KeyFinder<EventA> {
+
+    public List<String> get(EventA event) {
+
+        List<String> results = new ArrayList<String>();
+
+        /* Retrieve the gender and add it to the list. */
+        results.add(Long.toString(event.getDuration()));
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/EventA.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/EventA.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/EventA.java
new file mode 100644
index 0000000..c881ea5
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/EventA.java
@@ -0,0 +1,40 @@
+package org.apache.s4.edsl;
+
+import org.apache.s4.base.Event;
+
+public class EventA extends Event {
+
+    private long duration;
+    private int height;
+
+    /**
+     * @return the duration
+     */
+    public long getDuration() {
+        return duration;
+    }
+
+    /**
+     * @param duration
+     *            the duration to set
+     */
+    public void setDuration(long duration) {
+        this.duration = duration;
+    }
+
+    /**
+     * @return the height
+     */
+    public int getHeight() {
+        return height;
+    }
+
+    /**
+     * @param height
+     *            the height to set
+     */
+    public void setHeight(int height) {
+        this.height = height;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/EventB.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/EventB.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/EventB.java
new file mode 100644
index 0000000..0ebe91e
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/EventB.java
@@ -0,0 +1,24 @@
+package org.apache.s4.edsl;
+
+import org.apache.s4.base.Event;
+
+public class EventB extends Event {
+
+    private String query;
+
+    /**
+     * @return the query
+     */
+    public String getQuery() {
+        return query;
+    }
+
+    /**
+     * @param query
+     *            the query to set
+     */
+    public void setQuery(String query) {
+        this.query = query;
+    }
+
+}