You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/15 16:06:01 UTC

[22/22] git commit: S4-22 automating tests

S4-22 automating tests

TODO:
- fix hardcoded code in Server.java
- complete tests
- deployment on multiple nodes
- deployment across logical clusters
- fluent API code was temporarily removed (AppMaker)


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/403162eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/403162eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/403162eb

Branch: refs/heads/piper
Commit: 403162eb91aa37a78cba646105463453f5de4d21
Parents: 7eb4882
Author: Matthieu Morel <mm...@apache.org>
Authored: Sun Jan 15 23:31:47 2012 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Sun Jan 15 23:52:06 2012 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/s4/core/App.java      |  122 ++++-----
 .../main/java/org/apache/s4/core/EventSource.java  |   62 ++---
 .../src/main/java/org/apache/s4/core/Key.java      |   52 ++++
 .../java/org/apache/s4/core/ProcessingElement.java |    7 +-
 .../src/main/java/org/apache/s4/core/Server.java   |   30 ++
 .../src/main/java/org/apache/s4/core/Stream.java   |  152 +++-------
 .../main/java/org/apache/s4/core/Streamable.java   |   13 +-
 .../main/java/org/apache/s4/fluent/AppMaker.java   |   22 +-
 .../s4/deploy/prodcon/TestProducerConsumer.java    |  173 +++++++++++
 test-apps/s4-counter/build.gradle                  |  223 +++++++++++++++
 .../s4-counter/src/main/java/s4app/ClockApp.java   |   55 ++++
 .../s4-counter/src/main/java/s4app/ClockPE.java    |   49 ++++
 test-apps/s4-showtime/build.gradle                 |  223 +++++++++++++++
 .../s4-showtime/src/main/java/s4app/ShowPE.java    |   33 +++
 .../src/main/java/s4app/ShowTimeApp.java           |   47 +++
 15 files changed, 1026 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 4a562f9..6249306 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -15,9 +15,9 @@
  */
 package org.apache.s4.core;
 
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.s4.base.Event;
@@ -25,8 +25,6 @@ import org.apache.s4.base.KeyFinder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.Multimap;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Inject;
@@ -39,11 +37,14 @@ public abstract class App {
 
     static final Logger logger = LoggerFactory.getLogger(App.class);
 
-    /* PE prototype to streams relations. */
-    final private Multimap<ProcessingElement, Streamable<? extends Event>> pe2stream = LinkedListMultimap.create();
+    /* All the PE prototypes in this app. */
+    final private List<ProcessingElement> pePrototypes = new ArrayList<ProcessingElement>();
 
-    /* Stream prototype to PE relations. */
-    final private Multimap<Streamable<? extends Event>, ProcessingElement> stream2pe = LinkedListMultimap.create();
+    /* All the internal streams in this app. */
+    final private List<Streamable> streams = new ArrayList<Streamable>();
+
+    /* All the the event sources exported by this app. */
+    final private List<EventSource> eventSources = new ArrayList<EventSource>();
 
     private ClockType clockType = ClockType.WALL_CLOCK;
     private int id = -1;
@@ -85,39 +86,52 @@ public abstract class App {
         this.id = id;
     }
 
-    /**
-     * @return all the pePrototypes
-     */
-    Collection<ProcessingElement> getPePrototypes() {
-        return pe2stream.keySet();
+    /* Should only be used within the core package. */
+    void addPEPrototype(ProcessingElement pePrototype) {
+        pePrototypes.add(pePrototype);
     }
 
-    /**
-     * @return all the pePrototypes
-     */
-    <T extends Event> Collection<ProcessingElement> getTargetPEs(Stream<T> stream) {
+    /* Should only be used within the core package. */
+    void addStream(Streamable stream) {
+        streams.add(stream);
+    }
 
-        Map<Streamable<?>, Collection<ProcessingElement>> stream2peMap = stream2pe.asMap();
+    /* Should only be used within the core package. */
+    void addEventSource(EventSource es) {
+        eventSources.add(es);
+    }
 
-        return stream2peMap.get(stream);
+    /* Returns list of PE prototypes. Should only be used within the core package. */
+    List<ProcessingElement> getPePrototypes() {
+        return pePrototypes;
     }
 
-    protected abstract void onStart();
+    /* Returns list of internal streams. Should only be used within the core package. */
+    List<Streamable> getStreams() {
+        return streams;
+    }
 
-    protected void start() {
+    /* Returns list of the event sources to be exported. Should only be used within the core package. */
+    List<EventSource> getEventSources() {
+        return eventSources;
+    }
 
-        logger.info("Prepare to start App [{}].", getClass().getName());
+    protected abstract void onStart();
 
-        /* Start all streams. */
-        for (Streamable<? extends Event> stream : getStreams()) {
-            stream.start();
-        }
+    protected void start() {
 
-        /* Allow abstract PE to initialize. */
-        for (ProcessingElement pe : getPePrototypes()) {
-            logger.info("Init prototype [{}].", pe.getClass().getName());
-            pe.initPEPrototypeInternal();
-        }
+        // logger.info("Prepare to start App [{}].", getClass().getName());
+        //
+        // /* Start all streams. */
+        // for (Streamable<? extends Event> stream : getStreams()) {
+        // stream.start();
+        // }
+        //
+        // /* Allow abstract PE to initialize. */
+        // for (ProcessingElement pe : getPePrototypes()) {
+        // logger.info("Init prototype [{}].", pe.getClass().getName());
+        // pe.initPEPrototypeInternal();
+        // }
 
         onStart();
     }
@@ -156,25 +170,16 @@ public abstract class App {
 
         /* Finally remove the entire app graph. */
         logger.trace("Clear app graph.");
-        pe2stream.clear();
-        stream2pe.clear();
-    }
-
-    void addPEPrototype(ProcessingElement pePrototype, Stream<? extends Event> stream) {
-
-        logger.info("Add PE prototype [{}] with stream [{}].", toString(pePrototype), toString(stream));
-        pe2stream.put(pePrototype, stream);
 
+        pePrototypes.clear();
+        streams.clear();
     }
 
-    void addStream(Streamable<? extends Event> stream, ProcessingElement pePrototype) {
-        logger.info("Add Stream [{}] with PE prototype [{}].", toString(stream), toString(pePrototype));
-        stream2pe.put(stream, pePrototype);
+    void addPEPrototype(ProcessingElement pePrototype, Stream<? extends Event> stream) {
 
-    }
+        // logger.info("Add PE prototype [{}] with stream [{}].", toString(pePrototype), toString(stream));
+        pePrototypes.add(pePrototype);
 
-    Collection<Streamable<? extends Event>> getStreams() {
-        return stream2pe.keySet();
     }
 
     /**
@@ -264,7 +269,7 @@ public abstract class App {
     protected <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder,
             ProcessingElement... processingElements) {
 
-        return new Stream<T>(this).setName(name).setKey(finder).setPEs(processingElements);
+        return new Stream<T>(this, name, finder, processingElements);
     }
 
     /**
@@ -281,30 +286,7 @@ public abstract class App {
      */
     protected <T extends Event> Stream<T> createStream(String name, ProcessingElement... processingElements) {
 
-        return new Stream<T>(this).setName(name).setPEs(processingElements);
-    }
-
-    /**
-     * Creates stream with default values. Use the builder methods to configure the stream. Example:
-     * <p>
-     * 
-     * <pre>
-     *  s1 = <SampleEvent> createStream().withName("My first stream.").withKey(new AKeyFinder()).to(somePE);
-     * </pre>
-     * 
-     * <p>
-     * 
-     * @param name
-     *            the name of the stream
-     * @param processingElements
-     *            the target processing elements
-     * @return the stream
-     */
-    public <T extends Event> Stream<T> createStream(Class<T> type) {
-
-        Stream<T> stream = new Stream<T>(this);
-        stream.setEventType(type);
-        return stream;
+        return new Stream<T>(this, name, processingElements);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
index 21ea23b..edd64ef 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
@@ -9,76 +9,76 @@ import org.slf4j.LoggerFactory;
 
 /**
  * 
- * A producer app uses one or more EventSource classes to provide events to streams. AT runtime, consumer apps subscribe
- * to an event source by providing a stream object. Each EventSource instance may correspond to a different type of
- * event stream. Each EventSource may have an unlimited number of subscribers.
+ * A producer app uses one or more EventSource classes to provide events to streamables. At runtime, consumer apps
+ * subscribe to an event source by providing a streamable object. Each EventSource instance may correspond to a
+ * different type of event stream. Each EventSource may have an unlimited number of subscribers.
  * 
  */
-class EventSource<T extends Event> extends Streamable<T> {
+public class EventSource implements Streamable {
 
     /* No need to synchronize this object because we expect a single thread. */
-    private Set<Stream<T>> streams = new HashSet<Stream<T>>();
+    private Set<Streamable> streamables = new HashSet<Streamable>();
     private static final Logger logger = LoggerFactory.getLogger(EventSource.class);
     final private String name;
 
-    EventSource(App app, String name) {
+    public EventSource(App app, String name) {
         this.name = name;
-        app.addStream(this, null);
+        app.addEventSource(this);
     }
 
     /**
-     * Subscribe a stream to this event source.
+     * Subscribe a streamable to this event source.
      * 
-     * @param stream
+     * @param aStream
      */
-    void subscribeStream(Stream<T> stream) {
-        logger.info("Subscribing stream: {} to event source: {}.", stream.getName(), getName());
-        streams.add(stream);
+    public void subscribeStream(Streamable aStream) {
+        logger.info("Subscribing stream: {} to event source: {}.", aStream.getName(), getName());
+        streamables.add(aStream);
     }
 
     /**
-     * Unsubscribe a stream from this event source.
+     * Unsubscribe a streamable from this event source.
      * 
      * @param stream
      */
-    void unsubscribeStream(Stream<T> stream) {
+    public void unsubscribeStream(Streamable stream) {
         logger.info("Unsubsubscribing stream: {} to event source: {}.", stream.getName(), getName());
-        streams.remove(stream);
+        streamables.remove(stream);
     }
 
     /**
-     * Send an event to all the subscribed streams.
+     * Send an event to all the subscribed streamables.
      * 
      * @param event
      */
     @Override
-    public void put(T event) {
-        for (Stream<T> stream : streams) {
+    public void put(Event event) {
+        for (Streamable stream : streamables) {
             stream.put(event);
         }
     }
 
     /**
      * 
-     * @return the number of streams subscribed to this event source.
+     * @return the number of streamables subscribed to this event source.
      */
-    int getNumSubscribers() {
-        return streams.size();
+    public int getNumSubscribers() {
+        return streamables.size();
     }
 
     /**
      * @return the name of this event source.
      */
-    String getName() {
+    public String getName() {
         return name;
     }
 
     /**
-     * Close all the streams subscribed to this event source.
+     * Close all the streamables subscribed to this event source.
      */
     @Override
-    void close() {
-        for (Stream<T> stream : streams) {
+    public void close() {
+        for (Streamable stream : streamables) {
             logger.info("Closing stream: {} in event source: {}.", stream.getName(), getName());
             stream.close();
         }
@@ -86,15 +86,9 @@ class EventSource<T extends Event> extends Streamable<T> {
 
     /**
      * 
-     * @return the set of streams subscribed to this event source.
+     * @return the set of streamables subscribed to this event source.
      */
-    Set<Stream<T>> getStreams() {
-        return streams;
-    }
-
-    @Override
-    void start() {
-        // TODO Auto-generated method stub
-
+    public Set<Streamable> getStreamables() {
+        return streamables;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/subprojects/s4-core/src/main/java/org/apache/s4/core/Key.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Key.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Key.java
new file mode 100644
index 0000000..49b3766
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Key.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.core;
+
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+
+/*
+ * The Key class is used to get the value of the key on a specific type of
+ * event. This is done to abstract all the complexity required to get the 
+ * value. The method for getting the value is implemented in a method of 
+ * an object of type KeyFinder<T>.
+ * 
+ * The application programmer provides the events and the corresponding 
+ * finders. The framework will use it to key on events.
+ */
+public class Key<T extends Event> {
+
+    final private KeyFinder<T> finder;
+    final private String separator;
+
+    public Key(KeyFinder<T> finder, String separator) {
+        this.finder = finder;
+        this.separator = separator;
+    }
+
+    public List<String> getList(T event) {
+        return finder.get(event);
+    }
+
+    public String get(T event) {
+        List<String> keys = getList(event);
+
+        return StringUtils.join(keys, separator);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index befe94f..ee4c72f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -98,7 +98,7 @@ import com.google.common.collect.Maps;
  * 
  * 
  */
-abstract public class ProcessingElement implements Cloneable {
+public abstract class ProcessingElement implements Cloneable {
 
     private static final Logger logger = LoggerFactory.getLogger(ProcessingElement.class);
     private static final String SINGLETON = "singleton";
@@ -109,7 +109,7 @@ abstract public class ProcessingElement implements Cloneable {
      * This maps holds all the instances. We make it package private to prevent concrete classes from updating the
      * collection.
      */
-    private Cache<String, ProcessingElement> peInstances;
+    Cache<String, ProcessingElement> peInstances;
 
     /* This map is initialized in the prototype and cloned to instances. */
     Map<Class<? extends Event>, Trigger> triggers;
@@ -336,7 +336,6 @@ abstract public class ProcessingElement implements Cloneable {
      *            in timeUnit
      * @param timeUnit
      *            the timeUnit of interval
-     * @return the PE prototype
      */
     public ProcessingElement setTimerInterval(long interval, TimeUnit timeUnit) {
         timerIntervalInMilliseconds = TimeUnit.MILLISECONDS.convert(interval, timeUnit);
@@ -353,7 +352,7 @@ abstract public class ProcessingElement implements Cloneable {
         timer = new Timer();
         logger.info("Created timer for PE prototype [{}] with interval [{}].", this.getClass().getName(),
                 timerIntervalInMilliseconds);
-
+        timer.schedule(new OnTimeTask(), 0, timerIntervalInMilliseconds);
         return this;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index 31513d1..35328aa 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -85,6 +85,36 @@ public class Server {
             startApp(app);
         }
 
+        EventSource savedES = null;
+        App consumerApp = null;
+        for (App app : apps) {
+            logger.info("Resolving dependencies for " + app.getClass().getName());
+            List<EventSource> eventSources = app.getEventSources();
+            if (eventSources.size() > 0) {
+                EventSource es = eventSources.get(0);
+                logger.info("App [{}] exports event source [{}].", app.getClass().getName(), es.getName());
+                savedES = es; // hardcoded
+            } else {
+
+                // hardcoded (one app has event source the other one doesn't.
+                consumerApp = app;
+            }
+        }
+        // hardcoded: make savedApp subscribe to savedES
+        logger.info("The consumer app is [{}].", consumerApp.getClass().getName());
+        // get the list of streams and find the one we are looking for that has name: "I need the time."
+        List<Streamable> streams = consumerApp.getStreams();
+        for (Streamable aStream : streams) {
+
+            String streamName = aStream.getName();
+
+            if (streamName.contentEquals("I need the time.")) {
+                logger.info("Subscribing stream [{}] from app [{}] to event source.", streamName, consumerApp
+                        .getClass().getName());
+                savedES.subscribeStream(aStream);
+            }
+        }
+
         logger.info("Completed local applications startup.");
 
         if (deploymentManager != null) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index c05d81f..bd1d351 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -16,20 +16,14 @@
 package org.apache.s4.core;
 
 import java.util.Collection;
-import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
 import org.apache.s4.base.Event;
-import org.apache.s4.base.GenericKeyFinder;
-import org.apache.s4.base.Key;
 import org.apache.s4.base.KeyFinder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-
 /**
  * {@link Stream} and {@link ProcessingElement} objects represent the links and nodes in the application graph. A stream
  * sends an {@link Event} object to {@link ProcessingElement} instances located anywhere in a cluster.
@@ -38,23 +32,22 @@ import com.google.common.collect.Sets;
  * <p>
  * To build an application create stream objects using use the {@link StreamFactory} class.
  */
-public class Stream<T extends Event> extends Streamable<T> implements Runnable {
+public class Stream<T extends Event> implements Runnable, Streamable {
 
     private static final Logger logger = LoggerFactory.getLogger(Stream.class);
 
     final static private String DEFAULT_SEPARATOR = "^";
     final static private int CAPACITY = 1000;
     private static int idCounter = 0;
-    private String name = "";
-    private Key<T> key = null;
-    private ProcessingElement[] targetPEs = null;
-    final private BlockingQueue<T> queue = new ArrayBlockingQueue<T>(CAPACITY);
-    private Thread thread;
+    final private String name;
+    final private Key<T> key;
+    final private ProcessingElement[] targetPEs;
+    final private BlockingQueue<Event> queue = new ArrayBlockingQueue<Event>(CAPACITY);
+    final private Thread thread;
     final private Sender sender;
     final private Receiver receiver;
     final private int id;
     final private App app;
-    private Class<T> eventType = null;
 
     /**
      * Send events using a {@link KeyFinder<T>}. The key finder extracts the value of the key which is used to determine
@@ -69,24 +62,22 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
      * @param processingElements
      *            the target PE prototypes for this stream.
      */
-    public Stream(App app) {
+    public Stream(App app, String name, KeyFinder<T> finder, ProcessingElement... processingElements) {
         synchronized (Stream.class) {
             id = idCounter++;
         }
         this.app = app;
-        app.addStream(this, null);
+        app.addStream(this);
+        this.name = name;
 
+        if (finder == null) {
+            this.key = null;
+        } else {
+            this.key = new Key<T>(finder, DEFAULT_SEPARATOR);
+        }
         this.sender = app.getSender();
         this.receiver = app.getReceiver();
-    }
-
-    void start() {
-
-        /* Get target PE prototypes for this stream. Remove null key. */
-        Set<? extends ProcessingElement> pes = Sets.newHashSet(app.getTargetPEs(this));
-        pes.remove(null);
-        targetPEs = new ProcessingElement[pes.size()];
-        pes.toArray(targetPEs);
+        this.targetPEs = processingElements;
 
         /* Start streaming. */
         thread = new Thread(this, name);
@@ -95,78 +86,18 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
     }
 
     /**
-     * Stop and close this stream.
-     */
-    void close() {
-        thread.interrupt();
-    }
-
-    /**
-     * Name the stream.
+     * Send events to all available {@link ProcessingElement} instances contained by the {@link ProcessingElement}
+     * prototypes passed to this constructor.
      * 
+     * @param app
+     *            we always register streams with the parent application.
      * @param name
-     *            the stream name, default is an empty string.
-     * @return the stream object
-     */
-    public Stream<T> setName(String name) {
-        this.name = name;
-        return this;
-    }
-
-    /**
-     * Define the key finder for this stream.
-     * 
-     * @param keyFinder
-     *            a function to lookup the value of the key.
-     * @return the stream object
-     */
-    public Stream<T> setKey(KeyFinder<T> keyFinder) {
-        this.key = new Key<T>(keyFinder, DEFAULT_SEPARATOR);
-        return this;
-    }
-
-    /**
-     * Define the key finder for this stream using a descriptor.
-     * 
-     * @param keyFinderString
-     *            a descriptor to lookup up the value of the key.
-     * @return the stream object
-     */
-    public Stream<T> setKey(String keyName) {
-
-        Preconditions.checkNotNull(eventType);
-
-        KeyFinder<T> kf = new GenericKeyFinder<T>(keyName, eventType);
-        setKey(kf);
-
-        return this;
-    }
-
-    /**
-     * Send events from this stream to a PE.
-     * 
-     * @param pe
-     *            a target PE.
-     * 
-     * @return the stream object
-     */
-    public Stream<T> setPE(ProcessingElement pe) {
-        app.addStream(this, pe);
-        return this;
-    }
-
-    /**
-     * Send events from this stream to various PEs.
-     * 
-     * @param pe
-     *            a target PE array.
-     * 
-     * @return the stream object
+     *            give this stream a meaningful name in the context of your application.
+     * @param processingElements
+     *            the target PE prototypes for this stream.
      */
-    public Stream<T> setPEs(ProcessingElement[] pes) {
-        for (int i = 0; i < pes.length; i++)
-            app.addStream(this, pes[i]);
-        return this;
+    public Stream(App app, String name, ProcessingElement... processingElements) {
+        this(app, name, null, processingElements);
     }
 
     /**
@@ -174,7 +105,8 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
      * 
      * @param event
      */
-    public void put(T event) {
+    @SuppressWarnings("unchecked")
+    public void put(Event event) {
         try {
             event.setStreamId(getId());
             event.setAppId(app.getId());
@@ -188,7 +120,7 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
                  * We send to a specific PE instance using the key but we don't know if the target partition is remote
                  * or local. We need to ask the sender.
                  */
-                if (sender.sendAndCheckIfLocal(key.get(event), event)) {
+                if (sender.sendAndCheckIfLocal(key.get((T) event), event)) {
 
                     /*
                      * Sender checked and decided that the target is local so we simply put the event in the queue and
@@ -208,11 +140,9 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
                 queue.put(event);
             }
         } catch (InterruptedException e) {
-            if (logger.isTraceEnabled()) {
-                e.printStackTrace();
-            }
-            logger.debug("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
-            // System.exit(-1);
+            e.printStackTrace();
+            logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
+            System.exit(-1);
         }
     }
 
@@ -226,8 +156,9 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
         try {
             queue.put((T) event);
         } catch (InterruptedException e) {
-            logger.debug("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
-            // System.exit(-1);
+            e.printStackTrace();
+            logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
+            System.exit(-1);
         }
     }
 
@@ -267,6 +198,13 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
     }
 
     /**
+     * Stop and close this stream.
+     */
+    public void close() {
+        thread.interrupt();
+    }
+
+    /**
      * @return the sender object
      */
     public Sender getSender() {
@@ -280,16 +218,13 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
         return receiver;
     }
 
-    void setEventType(Class<T> type) {
-        this.eventType = type;
-    }
-
     @Override
     public void run() {
         while (true) {
             try {
                 /* Get oldest event in queue. */
-                T event = queue.take();
+                @SuppressWarnings("unchecked")
+                T event = (T) queue.take();
 
                 /* Send event to each target PE. */
                 for (int i = 0; i < targetPEs.length; i++) {
@@ -313,8 +248,7 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
                         /* We have a key, send to target PE. */
 
                         /* STEP 1: find the PE instance for key. */
-                        ProcessingElement pe;
-                        pe = targetPEs[i].getInstanceForKey(key.get(event));
+                        ProcessingElement pe = targetPEs[i].getInstanceForKey(key.get(event));
 
                         /* STEP 2: pass event to PE instance. */
                         pe.handleInputEvent(event);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java
index 68b4bcf..ecbae5e 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java
@@ -7,27 +7,22 @@ import org.apache.s4.base.Event;
  * 
  * @param <T>
  */
-abstract class Streamable<T extends Event> {
+public interface Streamable<T extends Event> {
 
     /**
      * Put an event into the streams.
      * 
      * @param event
      */
-    abstract void put(T event);
+    public void put(Event event);
 
     /**
      * Stop and close all the streams.
      */
-    abstract void close();
+    public void close();
 
     /**
      * @return the name of this streamable object.
      */
-    abstract String getName();
-
-    /**
-     * Start all streams;
-     */
-    abstract void start();
+    public String getName();
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
index bfeabd1..436ca4c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
@@ -9,7 +9,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.s4.base.Event;
-import org.apache.s4.base.KeyFinder;
 import org.apache.s4.core.App;
 import org.apache.s4.core.ProcessingElement;
 import org.apache.s4.core.Stream;
@@ -152,7 +151,7 @@ abstract public class AppMaker {
             StreamMaker sm = entry.getKey();
             for (PEMaker pm : entry.getValue()) {
                 if (pm != null && sm != null) {
-                    sm.getStream().setPE(pm.getPe());
+                    // sm.getStream().setPE(pm.getPe());
                 }
             }
         }
@@ -164,15 +163,16 @@ abstract public class AppMaker {
     @SuppressWarnings("unchecked")
     private <T extends Event> Stream<T> makeStream(StreamMaker sm, Class<T> type) {
 
-        Stream<T> stream = app.createStream(type);
-        stream.setName(sm.getName());
-
-        if (sm.getKeyFinder() != null)
-            stream.setKey((KeyFinder<T>) sm.getKeyFinder());
-        else if (sm.getKeyDescriptor() != null)
-            stream.setKey(sm.getKeyDescriptor());
-
-        return stream;
+        // Stream<T> stream = app.createStream(type);
+        // stream.setName(sm.getName());
+        //
+        // if (sm.getKeyFinder() != null)
+        // stream.setKey((KeyFinder<T>) sm.getKeyFinder());
+        // else if (sm.getKeyDescriptor() != null)
+        // stream.setKey(sm.getKeyDescriptor());
+        //
+        // return stream;
+        return null;
     }
 
     /* Do the magic to create a PE from a PEMaker. */

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
new file mode 100644
index 0000000..1418396
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
@@ -0,0 +1,173 @@
+package org.apache.s4.deploy.prodcon;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.sun.net.httpserver.HttpServer;
+
+public class TestProducerConsumer {
+
+    private Factory zookeeperServerConnectionFactory;
+    private Process forkedNode;
+    private ZkClient zkClient;
+    private String clusterName;
+    private HttpServer httpServer;
+    private static File tmpAppsDir;
+
+    @BeforeClass
+    public static void createS4RFiles() throws Exception {
+        tmpAppsDir = Files.createTempDir();
+
+        Assert.assertTrue(tmpAppsDir.exists());
+        File gradlewFile = CoreTestUtils.findGradlewInRootDir();
+
+        CoreTestUtils.callGradleTask(gradlewFile, new File(gradlewFile.getParentFile().getAbsolutePath()
+                + "/test-apps/s4-showtime/build.gradle"), "installS4R",
+                new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
+
+        CoreTestUtils.callGradleTask(gradlewFile, new File(gradlewFile.getParentFile().getAbsolutePath()
+                + "/test-apps/s4-counter/build.gradle"), "installS4R",
+                new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
+    }
+
+    @Before
+    public void cleanLocalAppsDir() throws ConfigurationException {
+        PropertiesConfiguration config = loadConfig();
+
+        if (!new File(config.getString("appsDir")).exists()) {
+            Assert.assertTrue(new File(config.getString("appsDir")).mkdirs());
+        } else {
+            if (!config.getString("appsDir").startsWith("/tmp")) {
+                Assert.fail("apps dir should a subdir of /tmp for safety");
+            }
+            CommTestUtils.deleteDirectoryContents(new File(config.getString("appsDir")));
+        }
+    }
+
+    @Before
+    public void prepare() throws Exception {
+        CommTestUtils.cleanupTmpDirs();
+        zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
+        final ZooKeeper zk = CommTestUtils.createZkClient();
+        try {
+            zk.delete("/simpleAppCreated", -1);
+        } catch (Exception ignored) {
+        }
+
+        zk.close();
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        CommTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+        CommTestUtils.killS4App(forkedNode);
+    }
+
+    private PropertiesConfiguration loadConfig() throws org.apache.commons.configuration.ConfigurationException {
+        InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
+        PropertiesConfiguration config = new PropertiesConfiguration();
+        config.load(is);
+        return config;
+    }
+
+    @Test
+    public void testInitialDeploymentFromFileSystem() throws Exception {
+
+        File showtimeS4R = new File(loadConfig().getString("appsDir") + File.separator + "showtime"
+                + System.currentTimeMillis() + ".s4r");
+        System.out.println(tmpAppsDir.getAbsolutePath());
+        Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
+                + "/s4-showtime-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(showtimeS4R)) > 0);
+        String uriShowtime = showtimeS4R.toURI().toString();
+
+        File counterS4R = new File(loadConfig().getString("appsDir") + File.separator + "counter"
+                + System.currentTimeMillis() + ".s4r");
+        Assert.assertTrue(ByteStreams.copy(
+                Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath() + "/s4-counter-0.0.0-SNAPSHOT.s4r")),
+                Files.newOutputStreamSupplier(counterS4R)) > 0);
+
+        String uriCounter = counterS4R.toURI().toString();
+
+        initializeS4Node();
+
+        ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
+        record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uriShowtime);
+        zkClient.create("/" + clusterName + "/apps/showtime", record1, CreateMode.PERSISTENT);
+
+        ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
+        record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uriCounter);
+        zkClient.create("/" + clusterName + "/apps/counter", record2, CreateMode.PERSISTENT);
+
+        // TODO validate test through some Zookeeper notifications
+        Thread.sleep(10000);
+    }
+
+    private void initializeS4Node() throws ConfigurationException, IOException, InterruptedException {
+        // 0. package s4 app
+        // TODO this is currently done offline, and the app contains the TestApp class copied from the one in the
+        // current package .
+
+        // 1. start s4 nodes. Check that no app is deployed.
+        InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
+        PropertiesConfiguration config = new PropertiesConfiguration();
+        config.load(is);
+
+        clusterName = config.getString("cluster.name");
+        TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
+        taskSetup.clean(clusterName);
+        taskSetup.setup(clusterName, 1);
+
+        zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
+        zkClient.setZkSerializer(new ZNRecordSerializer());
+        List<String> processes = zkClient.getChildren("/" + clusterName + "/process");
+        Assert.assertTrue(processes.size() == 0);
+        final CountDownLatch signalProcessesReady = new CountDownLatch(1);
+
+        zkClient.subscribeChildChanges("/" + clusterName + "/process", new IZkChildListener() {
+
+            @Override
+            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+                if (currentChilds.size() == 2) {
+                    signalProcessesReady.countDown();
+                }
+
+            }
+        });
+
+        File tmpConfig = File.createTempFile("tmp", "config");
+        Assert.assertTrue(ByteStreams.copy(getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties"),
+                Files.newOutputStreamSupplier(tmpConfig)) > 0);
+        forkedNode = CoreTestUtils.forkS4Node(new String[] { tmpConfig.getAbsolutePath() });
+
+        // TODO synchro with ready state from zk
+        Thread.sleep(10000);
+        // Assert.assertTrue(signalProcessesReady.await(10, TimeUnit.SECONDS));
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/test-apps/s4-counter/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/s4-counter/build.gradle b/test-apps/s4-counter/build.gradle
new file mode 100644
index 0000000..37b1daf
--- /dev/null
+++ b/test-apps/s4-counter/build.gradle
@@ -0,0 +1,223 @@
+/*
+* Copyright 2010 the original author or authors.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/**
+* Apache S4 Application Build File
+*
+* Use this script to buils and package S4 apps.
+*
+* Run 'gradle install' on the s4 project to publish to your local maven repo.
+*
+* TODO: This should probably be distributed as an s4 plugin for Gradle.
+* TODO: There seem to be to be similarities with the war and jetty plugins. (war -> s4r, jetty -> s4Run).
+* We should make it easy to test the app from this script by a running a test task that starts and stops
+* an s4 server. See: http://www.gradle.org/releases/1.0-milestone-3/docs/userguide/userguide_single.html#war_plugin
+*
+* This is an interesting discussion:
+* http://gradle.1045684.n5.nabble.com/Exclude-properties-file-from-war-td3365147.html
+*
+*/
+
+/* Set the destination where we want to install the apps. */
+//s4AppInstallDir = "/tmp/s4Apps" // TODO: decide how to standarize dirs, use env var?
+
+s4AppInstallDir = hasProperty('appsDir') ? "$appsDir" : "/tmp/appsDir"
+
+s4Version = '0.5.0-SNAPSHOT'
+description = 'Apache S4 App'
+//defaultTasks 'installS4R'
+archivesBaseName = "$project.name"
+distRootFolder = "$archivesBaseName-${-> version}"
+
+
+// Append the suffix 'SNAPSHOT' when the build is not for release.
+version = new Version(major: 0, minor: 0, bugfix: 0, isRelease: false)
+group = 'org.apache.s4'
+
+apply plugin: 'java'
+apply plugin: 'eclipse'
+
+/* The app classname is set automatically from the source files. */
+def appClassname = ''
+
+/* Set Java version. */
+sourceCompatibility = 1.6
+targetCompatibility = 1.6
+
+
+
+/* All project libraries must be defined here. */
+libraries = [
+           json:               'org.json:json:20090211',
+           guava:              'com.google.guava:guava:10.0.1',
+           gson:               'com.google.code.gson:gson:1.6',
+           guice:              'com.google.inject:guice:3.0',
+           guice_assist:       'com.google.inject.extensions:guice-assistedinject:3.0',
+           guice_grapher:      'com.google.inject:guice-grapher:3.0',
+           flexjson:           'net.sf.flexjson:flexjson:2.1',
+           bcel:               'org.apache.bcel:bcel:5.2',
+           jakarta_regexp:     'jakarta-regexp:jakarta-regexp:1.4',
+           kryo:               'com.googlecode:kryo:1.04',
+           netty:              'org.jboss.netty:netty:3.2.5.Final',
+           reflectasm:         'com.esotericsoftware:reflectasm:0.8',
+           minlog:             'com.esotericsoftware:minlog:1.2',
+           asm:                'asm:asm:3.2',
+           commons_io:         'commons-io:commons-io:2.0.1',
+           commons_config:     'commons-configuration:commons-configuration:1.6',
+           commons_codec:      'commons-codec:commons-codec:1.4',
+           commons_httpclient: 'commons-httpclient:commons-httpclient:3.1',
+           commons_coll:       'net.sourceforge.collections:collections-generic:4.01', // Use this lib until the commons collection with Generics is released.
+           slf4j:              'org.slf4j:slf4j-api:1.6.1',
+           logback_core:       'ch.qos.logback:logback-core:0.9.29',
+           logback_classic:    'ch.qos.logback:logback-classic:0.9.29',
+           zk:                 'org.apache.zookeeper:zookeeper:3.3.1',
+           jcip:               'net.jcip:jcip-annotations:1.0',
+           junit:              'junit:junit:4.10',
+       ]
+
+
+dependencies {
+
+   /* S4 Platform. We only need the API, not the transitive dependencies. */
+//    s4Libs.each {  module ->
+//        compile( module ) //{ transitive = false }
+//        s4API( module )
+//    }
+
+   compile project(":s4-base")
+   compile project(":s4-comm")
+   compile project(":s4-core")
+   
+   /* Logging. */
+   compile( libraries.slf4j )
+   compile( libraries.logback_core )
+   compile( libraries.logback_classic )
+
+   /* Commons. */
+   compile( libraries.commons_io )
+   compile( libraries.commons_config )
+   compile( libraries.commons_coll )
+
+   /* Misc. */
+   compile( libraries.jcip )
+
+   /* Testing. */
+   testCompile( libraries.junit )
+}
+
+/* Set the manifest attributes for the S4 archive here.
+*  TODO: separate custom properties from std ones and set custom properties at the top of the build script.
+*/
+manifest.mainAttributes(
+       provider: 'gradle',
+       'Implementation-Url': 'http://incubator.apache.org/projects/s4.html',
+       'Implementation-Version': version,
+       'Implementation-Vendor': 'Apache S4',
+       'Implementation-Vendor-Id': 's4app',
+       'S4-App-Class': appClassname, // gets set by the s4r task.
+       'S4-Version': s4Version
+       )
+
+appDependencies = ( configurations.compile )
+
+/* This task will extract all the class files and create a fat jar. We set the manifest and the extension to make it an S4 archive file. */
+// TODO: exclude schenma files as needed (not critical) see: http://forums.gradle.org/gradle/topics/using_gradle_to_fat_jar_a_spring_project
+task s4r(type: Jar) {
+   dependsOn jar
+   from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
+   from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+   manifest = project.manifest
+   extension = 's4r'
+   
+   /* Set class name in manifest. Parse source files until we find a class that extends App.
+    * Get fully qualified Java class name and set attribute in Manifest.
+    */
+   sourceSets.main.allSource.files.each {  File file ->
+       if (appClassname =="" || appClassname == "UNKNOWN") {
+           // only execute the closure for this file if we haven't already found the app class name
+           appClassname = getAppClassname(file)
+           if(appClassname != "") {
+               manifest.mainAttributes('S4-App-Class': appClassname)
+           }
+       }
+   }
+   
+   if (appClassname == "UNKNOWN") {
+       
+       println "Couldn't find App class in source files...aborting."
+       exit(1)
+   }
+}
+
+/* List the artifacts that will br added to the s4 archive (and explode if needed). */
+s4r << {
+   appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
+   configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+
+   /* This is for debugging. */
+   //configurations.s4All.each { File file -> println 's4All: ' + file.name }
+   //deployableDependencies.each { File file -> println 'Deploy: ' + file.name }
+
+   // more debugging statements.
+   //sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
+   
+}
+
+/* Install the S4 archive to the install directory. */
+task installS4R (type: Copy) {
+   dependsOn s4r
+   from s4r.archivePath
+   into s4AppInstallDir
+}
+
+/* Generates the gradlew scripts.
+http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
+task wrapper(type: Wrapper) { gradleVersion = '1.0-milestone-3' }
+
+
+/* Parse source file to get the app classname so we can use it in the manifest.
+* TODO: Use a real Java parser. (This is not skippong comments for example.)
+*/
+def getAppClassname(file) {
+   def classname = "UNKNOWN"
+   lines= file.readLines()
+   for(line in lines) {
+       
+       def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
+       if(pn) {
+           packageName = pn[0][1] + "."
+       }
+       
+       def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
+       if (an) {
+           classname = packageName + an[0][1]
+           println "Found app class name: " + classname
+           break
+       }
+   }
+   classname
+}
+
+class Version {
+   int major
+   int minor
+   int bugfix
+   boolean isRelease
+
+   String toString() {
+       "$major.$minor.$bugfix${isRelease ? '' : '-SNAPSHOT'}"
+   }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
----------------------------------------------------------------------
diff --git a/test-apps/s4-counter/src/main/java/s4app/ClockApp.java b/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
new file mode 100644
index 0000000..316b876
--- /dev/null
+++ b/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
@@ -0,0 +1,55 @@
+package s4app;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.EventSource;
+import org.apache.s4.core.Streamable;
+
+public class ClockApp extends App {
+
+    private EventSource eventSource;
+    private ClockPE clockPE;
+
+    @Override
+    protected void start() {
+        System.out.println("Starting CounterApp...");
+        clockPE.getInstanceForKey("single");
+    }
+
+    // generic array due to varargs generates a warning.
+    @Override
+    protected void init() {
+        System.out.println("Initing CounterApp...");
+
+        clockPE = new ClockPE(this);
+        clockPE.setTimerInterval(1, TimeUnit.SECONDS);
+
+        eventSource = new EventSource(this, "I can give you the time!");
+        clockPE.setStreams((Streamable) eventSource);
+    }
+
+    @Override
+    protected void close() {
+        System.out.println("Closing CounterApp...");
+        eventSource.close();
+    }
+
+    @Override
+    protected void onStart() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onInit() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onClose() {
+        // TODO Auto-generated method stub
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/test-apps/s4-counter/src/main/java/s4app/ClockPE.java
----------------------------------------------------------------------
diff --git a/test-apps/s4-counter/src/main/java/s4app/ClockPE.java b/test-apps/s4-counter/src/main/java/s4app/ClockPE.java
new file mode 100644
index 0000000..cc81672
--- /dev/null
+++ b/test-apps/s4-counter/src/main/java/s4app/ClockPE.java
@@ -0,0 +1,49 @@
+package s4app;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Streamable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClockPE extends ProcessingElement {
+
+    private static final Logger logger = LoggerFactory.getLogger(ClockPE.class);
+
+    private Streamable[] targetStreams;
+    private long tick = 0;
+
+    public ClockPE(App app) {
+        super(app);
+    }
+
+    /**
+     * @param targetStreams
+     *            the {@link UserEvent} streams.
+     */
+    public void setStreams(Streamable... targetStreams) {
+        this.targetStreams = targetStreams;
+    }
+
+    public void onTime() {
+        Event event = new Event();
+        event.put("tick", Long.class, tick++);
+
+        logger.info("Sending event with tick {} and time {}.", event.get("tick", Long.class), event.getTime());
+        for (int i = 0; i < targetStreams.length; i++) {
+            targetStreams[i].put(event);
+        }
+    }
+
+    @Override
+    protected void onRemove() {
+
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/test-apps/s4-showtime/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/s4-showtime/build.gradle b/test-apps/s4-showtime/build.gradle
new file mode 100644
index 0000000..37b1daf
--- /dev/null
+++ b/test-apps/s4-showtime/build.gradle
@@ -0,0 +1,223 @@
+/*
+* Copyright 2010 the original author or authors.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/**
+* Apache S4 Application Build File
+*
+* Use this script to buils and package S4 apps.
+*
+* Run 'gradle install' on the s4 project to publish to your local maven repo.
+*
+* TODO: This should probably be distributed as an s4 plugin for Gradle.
+* TODO: There seem to be to be similarities with the war and jetty plugins. (war -> s4r, jetty -> s4Run).
+* We should make it easy to test the app from this script by a running a test task that starts and stops
+* an s4 server. See: http://www.gradle.org/releases/1.0-milestone-3/docs/userguide/userguide_single.html#war_plugin
+*
+* This is an interesting discussion:
+* http://gradle.1045684.n5.nabble.com/Exclude-properties-file-from-war-td3365147.html
+*
+*/
+
+/* Set the destination where we want to install the apps. */
+//s4AppInstallDir = "/tmp/s4Apps" // TODO: decide how to standarize dirs, use env var?
+
+s4AppInstallDir = hasProperty('appsDir') ? "$appsDir" : "/tmp/appsDir"
+
+s4Version = '0.5.0-SNAPSHOT'
+description = 'Apache S4 App'
+//defaultTasks 'installS4R'
+archivesBaseName = "$project.name"
+distRootFolder = "$archivesBaseName-${-> version}"
+
+
+// Append the suffix 'SNAPSHOT' when the build is not for release.
+version = new Version(major: 0, minor: 0, bugfix: 0, isRelease: false)
+group = 'org.apache.s4'
+
+apply plugin: 'java'
+apply plugin: 'eclipse'
+
+/* The app classname is set automatically from the source files. */
+def appClassname = ''
+
+/* Set Java version. */
+sourceCompatibility = 1.6
+targetCompatibility = 1.6
+
+
+
+/* All project libraries must be defined here. */
+libraries = [
+           json:               'org.json:json:20090211',
+           guava:              'com.google.guava:guava:10.0.1',
+           gson:               'com.google.code.gson:gson:1.6',
+           guice:              'com.google.inject:guice:3.0',
+           guice_assist:       'com.google.inject.extensions:guice-assistedinject:3.0',
+           guice_grapher:      'com.google.inject:guice-grapher:3.0',
+           flexjson:           'net.sf.flexjson:flexjson:2.1',
+           bcel:               'org.apache.bcel:bcel:5.2',
+           jakarta_regexp:     'jakarta-regexp:jakarta-regexp:1.4',
+           kryo:               'com.googlecode:kryo:1.04',
+           netty:              'org.jboss.netty:netty:3.2.5.Final',
+           reflectasm:         'com.esotericsoftware:reflectasm:0.8',
+           minlog:             'com.esotericsoftware:minlog:1.2',
+           asm:                'asm:asm:3.2',
+           commons_io:         'commons-io:commons-io:2.0.1',
+           commons_config:     'commons-configuration:commons-configuration:1.6',
+           commons_codec:      'commons-codec:commons-codec:1.4',
+           commons_httpclient: 'commons-httpclient:commons-httpclient:3.1',
+           commons_coll:       'net.sourceforge.collections:collections-generic:4.01', // Use this lib until the commons collection with Generics is released.
+           slf4j:              'org.slf4j:slf4j-api:1.6.1',
+           logback_core:       'ch.qos.logback:logback-core:0.9.29',
+           logback_classic:    'ch.qos.logback:logback-classic:0.9.29',
+           zk:                 'org.apache.zookeeper:zookeeper:3.3.1',
+           jcip:               'net.jcip:jcip-annotations:1.0',
+           junit:              'junit:junit:4.10',
+       ]
+
+
+dependencies {
+
+   /* S4 Platform. We only need the API, not the transitive dependencies. */
+//    s4Libs.each {  module ->
+//        compile( module ) //{ transitive = false }
+//        s4API( module )
+//    }
+
+   compile project(":s4-base")
+   compile project(":s4-comm")
+   compile project(":s4-core")
+   
+   /* Logging. */
+   compile( libraries.slf4j )
+   compile( libraries.logback_core )
+   compile( libraries.logback_classic )
+
+   /* Commons. */
+   compile( libraries.commons_io )
+   compile( libraries.commons_config )
+   compile( libraries.commons_coll )
+
+   /* Misc. */
+   compile( libraries.jcip )
+
+   /* Testing. */
+   testCompile( libraries.junit )
+}
+
+/* Set the manifest attributes for the S4 archive here.
+*  TODO: separate custom properties from std ones and set custom properties at the top of the build script.
+*/
+manifest.mainAttributes(
+       provider: 'gradle',
+       'Implementation-Url': 'http://incubator.apache.org/projects/s4.html',
+       'Implementation-Version': version,
+       'Implementation-Vendor': 'Apache S4',
+       'Implementation-Vendor-Id': 's4app',
+       'S4-App-Class': appClassname, // gets set by the s4r task.
+       'S4-Version': s4Version
+       )
+
+appDependencies = ( configurations.compile )
+
+/* This task will extract all the class files and create a fat jar. We set the manifest and the extension to make it an S4 archive file. */
+// TODO: exclude schenma files as needed (not critical) see: http://forums.gradle.org/gradle/topics/using_gradle_to_fat_jar_a_spring_project
+task s4r(type: Jar) {
+   dependsOn jar
+   from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
+   from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+   manifest = project.manifest
+   extension = 's4r'
+   
+   /* Set class name in manifest. Parse source files until we find a class that extends App.
+    * Get fully qualified Java class name and set attribute in Manifest.
+    */
+   sourceSets.main.allSource.files.each {  File file ->
+       if (appClassname =="" || appClassname == "UNKNOWN") {
+           // only execute the closure for this file if we haven't already found the app class name
+           appClassname = getAppClassname(file)
+           if(appClassname != "") {
+               manifest.mainAttributes('S4-App-Class': appClassname)
+           }
+       }
+   }
+   
+   if (appClassname == "UNKNOWN") {
+       
+       println "Couldn't find App class in source files...aborting."
+       exit(1)
+   }
+}
+
+/* List the artifacts that will br added to the s4 archive (and explode if needed). */
+s4r << {
+   appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
+   configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+
+   /* This is for debugging. */
+   //configurations.s4All.each { File file -> println 's4All: ' + file.name }
+   //deployableDependencies.each { File file -> println 'Deploy: ' + file.name }
+
+   // more debugging statements.
+   //sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
+   
+}
+
+/* Install the S4 archive to the install directory. */
+task installS4R (type: Copy) {
+   dependsOn s4r
+   from s4r.archivePath
+   into s4AppInstallDir
+}
+
+/* Generates the gradlew scripts.
+http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
+task wrapper(type: Wrapper) { gradleVersion = '1.0-milestone-3' }
+
+
+/* Parse source file to get the app classname so we can use it in the manifest.
+* TODO: Use a real Java parser. (This is not skippong comments for example.)
+*/
+def getAppClassname(file) {
+   def classname = "UNKNOWN"
+   lines= file.readLines()
+   for(line in lines) {
+       
+       def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
+       if(pn) {
+           packageName = pn[0][1] + "."
+       }
+       
+       def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
+       if (an) {
+           classname = packageName + an[0][1]
+           println "Found app class name: " + classname
+           break
+       }
+   }
+   classname
+}
+
+class Version {
+   int major
+   int minor
+   int bugfix
+   boolean isRelease
+
+   String toString() {
+       "$major.$minor.$bugfix${isRelease ? '' : '-SNAPSHOT'}"
+   }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/test-apps/s4-showtime/src/main/java/s4app/ShowPE.java
----------------------------------------------------------------------
diff --git a/test-apps/s4-showtime/src/main/java/s4app/ShowPE.java b/test-apps/s4-showtime/src/main/java/s4app/ShowPE.java
new file mode 100644
index 0000000..f68516c
--- /dev/null
+++ b/test-apps/s4-showtime/src/main/java/s4app/ShowPE.java
@@ -0,0 +1,33 @@
+package s4app;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ShowPE extends ProcessingElement {
+
+    private static final Logger logger = LoggerFactory.getLogger(ShowPE.class);
+
+    public ShowPE(App app) {
+        super(app);
+    }
+
+    public void onEvent(Event event) {
+
+        logger.info("Received event with tick {} and time {}.", event.get("tick", Long.class), event.getTime());
+
+    }
+
+    @Override
+    protected void onRemove() {
+
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
----------------------------------------------------------------------
diff --git a/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java b/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
new file mode 100644
index 0000000..1c92c57
--- /dev/null
+++ b/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
@@ -0,0 +1,47 @@
+package s4app;
+
+import org.apache.s4.core.App;
+
+public class ShowTimeApp extends App {
+
+    private ShowPE showPE;
+
+    @Override
+    protected void start() {
+        System.out.println("Starting ShowTimeApp...");
+        showPE.getInstanceForKey("single");
+    }
+
+    @Override
+    protected void init() {
+        System.out.println("Initing ShowTimeApp...");
+
+        showPE = new ShowPE(this);
+
+        /* This stream will receive events from another app. */
+        createStream("I need the time.", showPE);
+    }
+
+    @Override
+    protected void close() {
+        System.out.println("Closing ShowTimeApp...");
+    }
+
+    @Override
+    protected void onStart() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onInit() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    protected void onClose() {
+        // TODO Auto-generated method stub
+
+    }
+}