You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by le...@apache.org on 2012/02/23 00:22:31 UTC

[4/7] Committing the first working version of the S4 embedded domain-specific language.

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/HeightKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/HeightKeyFinder.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/HeightKeyFinder.java
new file mode 100644
index 0000000..4929344
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/HeightKeyFinder.java
@@ -0,0 +1,19 @@
+package org.apache.s4.edsl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.base.KeyFinder;
+
+public class HeightKeyFinder implements KeyFinder<EventA> {
+
+    public List<String> get(EventA event) {
+
+        List<String> results = new ArrayList<String>();
+
+        /* Retrieve the gender and add it to the list. */
+        results.add(Integer.toString(event.getHeight()));
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/Module.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/Module.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/Module.java
new file mode 100644
index 0000000..84d1630
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/Module.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.edsl;
+
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+/**
+ * Configures the controller.
+ * 
+ * Reads a properties file, provides a {@link Communicator} singleton, and configures Guice bindings.
+ * 
+ * @author Leo Neumeyer
+ */
+public class Module extends AbstractModule {
+
+    protected PropertiesConfiguration config = null;
+
+    private void loadProperties(Binder binder) {
+
+        try {
+            InputStream is = this.getClass().getResourceAsStream("/s4-counter-example.properties");
+            config = new PropertiesConfiguration();
+            config.load(is);
+
+            System.out.println(ConfigurationUtils.toString(config));
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void configure() {
+        if (config == null)
+            loadProperties(binder());
+
+        bind(Cluster.class);
+
+        /* Configure static assignment using a configuration file. */
+        bind(Assignment.class).to(AssignmentFromFile.class);
+
+        /* Configure a static cluster topology using a configuration file. */
+        bind(Topology.class).to(TopologyFromFile.class);
+
+        // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
+        // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
+        //
+        // bind(Emitter.class).to(QueueingEmitter.class);
+        // bind(Listener.class).to(QueueingListener.class);
+
+        /* Use the Netty comm layer implementation. */
+        // bind(Emitter.class).to(NettyEmitter.class);
+        // bind(Listener.class).to(NettyListener.class);
+
+        /* Use a simple UDP comm layer implementation. */
+        bind(Emitter.class).to(UDPEmitter.class);
+        bind(Listener.class).to(UDPListener.class);
+
+        /* The hashing function to map keys top partitions. */
+        bind(Hasher.class).to(DefaultHasher.class);
+
+        /* Use Kryo to serialize events. */
+        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/MyApp.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/MyApp.java
new file mode 100644
index 0000000..6e4a267
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/MyApp.java
@@ -0,0 +1,37 @@
+package org.apache.s4.edsl;
+
+import java.util.concurrent.TimeUnit;
+
+public class MyApp extends BuilderS4DSL {
+
+    @Override
+    public void onInit() {
+
+        pe("PEZ").type(PEZ.class).fireOn(EventA.class).afterInterval(5, TimeUnit.SECONDS).cache().size(1000)
+                .expires(3, TimeUnit.HOURS).emit(EventB.class).to("PEX").
+
+                pe("PEY").type(PEY.class).prop("duration", "4").prop("height", "99").timer()
+                .withPeriod(2, TimeUnit.MINUTES).emit(EventA.class).onField("stream3")
+                .withKeyFinder(new DurationKeyFinder()).to("PEZ").emit(EventA.class).onField("heightpez")
+                .withKeyFinder(new HeightKeyFinder()).to("PEZ").
+
+                pe("PEX").type(PEX.class).prop("query", "money").cache().size(100).expires(1, TimeUnit.MINUTES)
+                .asSingleton().emit(EventB.class).withKeyFinder(new QueryKeyFinder()).to("PEY").to("PEZ").
+
+                build();
+    }
+
+    // Make hooks public for testing. Normally this is handled by the container.
+    public void init() {
+        super.init();
+    }
+
+    public void start() {
+        super.start();
+    }
+
+    public void close() {
+        super.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEX.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEX.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEX.java
new file mode 100644
index 0000000..c142e03
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEX.java
@@ -0,0 +1,58 @@
+package org.apache.s4.edsl;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+
+public class PEX extends ProcessingElement {
+
+    private String query;
+    private Stream<EventB>[] someStream;
+    @SuppressWarnings("unused")
+    private Stream<EventA>[] streams;
+
+    public PEX(App app) {
+        super(app);
+    }
+
+    @Override
+    public void onCreate() {
+
+    }
+
+    @Override
+    public void onRemove() {
+
+    }
+
+    /**
+     * @return the keyword
+     */
+    String getKeyword() {
+        return query;
+    }
+
+    /**
+     * @param query
+     *            the keyword to set
+     */
+    void setKeyword(String query) {
+        this.query = query;
+    }
+
+    /**
+     * @return the someStream
+     */
+    public Stream<EventB>[] getSomeStream() {
+        return someStream;
+    }
+
+    /**
+     * @param someStream
+     *            the someStream to set
+     */
+    public void setSomeStream(Stream<EventB>[] someStream) {
+        this.someStream = someStream;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEY.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEY.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEY.java
new file mode 100644
index 0000000..ce9dbc7
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEY.java
@@ -0,0 +1,77 @@
+package org.apache.s4.edsl;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+
+public class PEY extends ProcessingElement {
+
+    private Stream<EventA>[] stream3;
+    @SuppressWarnings("unused")
+    private Stream<EventA>[] heightpez;
+
+    private int height;
+    private long duration;
+
+    public PEY(App app) {
+        super(app);
+    }
+
+    @Override
+    public void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+    /**
+     * @return the stream3
+     */
+    Stream<EventA>[] getStream3() {
+        return stream3;
+    }
+
+    /**
+     * @param stream3
+     *            the stream3 to set
+     */
+    void setStream3(Stream<EventA>[] stream3) {
+        this.stream3 = stream3;
+    }
+
+    /**
+     * @return the height
+     */
+    int getHeight() {
+        return height;
+    }
+
+    /**
+     * @param height
+     *            the height to set
+     */
+    void setHeight(int height) {
+        this.height = height;
+    }
+
+    /**
+     * @return the duration
+     */
+    long getDuration() {
+        return duration;
+    }
+
+    /**
+     * @param duration
+     *            the duration to set
+     */
+    void setDuration(long duration) {
+        this.duration = duration;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEZ.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEZ.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEZ.java
new file mode 100644
index 0000000..d494ae2
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEZ.java
@@ -0,0 +1,58 @@
+package org.apache.s4.edsl;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+
+public class PEZ extends ProcessingElement {
+
+    private Stream<EventA>[] stream1;
+    private Stream<EventB>[] stream2;
+
+    public PEZ(App app) {
+        super(app);
+    }
+
+    /**
+     * @return the stream1
+     */
+    Stream<EventA>[] getStream1() {
+        return stream1;
+    }
+
+    /**
+     * @param stream1
+     *            the stream1 to set
+     */
+    void setStream1(Stream<EventA>[] stream1) {
+        this.stream1 = stream1;
+    }
+
+    /**
+     * @return the stream2
+     */
+    Stream<EventB>[] getStream2() {
+        return stream2;
+    }
+
+    /**
+     * @param stream2
+     *            the stream2 to set
+     */
+    void setStream2(Stream<EventB>[] stream2) {
+        this.stream2 = stream2;
+    }
+
+    @Override
+    public void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void onRemove() {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/QueryKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/QueryKeyFinder.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/QueryKeyFinder.java
new file mode 100644
index 0000000..1338432
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/QueryKeyFinder.java
@@ -0,0 +1,19 @@
+package org.apache.s4.edsl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.base.KeyFinder;
+
+public class QueryKeyFinder implements KeyFinder<EventB> {
+
+    public List<String> get(EventB event) {
+
+        List<String> results = new ArrayList<String>();
+
+        /* Retrieve the gender and add it to the list. */
+        results.add(event.getQuery());
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
new file mode 100644
index 0000000..dbff5f5
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
@@ -0,0 +1,38 @@
+package org.apache.s4.edsl;
+
+import java.lang.reflect.Field;
+
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TestEDSL {
+
+    @Test
+    public void test() throws Exception {
+        Injector injector = Guice.createInjector(new Module());
+        MyApp myApp = injector.getInstance(MyApp.class);
+
+        /* Normally. the container will handle this but this is just a test. */
+        myApp.init();
+        myApp.start();
+        myApp.close();
+    }
+
+    @Test
+    public void testReflection() {
+
+        try {
+            Class<?> c = PEY.class;
+            Field f = c.getDeclaredField("duration");
+            System.out.format("Type: %s%n", f.getType());
+            System.out.format("GenericType: %s%n", f.getGenericType());
+
+            // production code should handle these exceptions more gracefully
+        } catch (NoSuchFieldException x) {
+            x.printStackTrace();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/s4-example.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/s4-example.gradle b/subprojects/s4-example/s4-example.gradle
index bbeb11b..95f3a97 100644
--- a/subprojects/s4-example/s4-example.gradle
+++ b/subprojects/s4-example/s4-example.gradle
@@ -20,6 +20,7 @@ dependencies {
     compile project( ":s4-base" )
     compile project( ":s4-core" )
     compile project( ":s4-comm" )
+    compile project( ":s4-edsl" )
 	compile libraries.ejml
     compile libraries.junit
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/AgeKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/AgeKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/AgeKeyFinder.java
new file mode 100644
index 0000000..f4d894c
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/AgeKeyFinder.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.edsl.counter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.base.KeyFinder;
+
+public class AgeKeyFinder implements KeyFinder<UserEvent> {
+
+    public List<String> get(UserEvent event) {
+
+        List<String> results = new ArrayList<String>();
+
+        /* Retrieve the age and add it to the list. */
+        results.add(Integer.toString(event.getAge()));
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CountEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CountEvent.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CountEvent.java
new file mode 100644
index 0000000..ac8e8ac
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CountEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.edsl.counter;
+
+import org.apache.s4.base.Event;
+
+public class CountEvent extends Event {
+
+    private String key;
+    private long count;
+    
+    public CountEvent() {
+        
+    }
+
+    CountEvent(String key, long count) {
+        this.key = key;
+        this.count = count;
+    }
+
+    CountEvent(String key, long count, long time) {
+        super(time);
+        this.key = key;
+        this.count = count;
+    }
+
+   
+    /**
+     * @return the key
+     */
+    public String getKey() {
+        return key;
+    }
+
+    /**
+     * @return the count
+     */
+    public long getCount() {
+        return count;
+    }
+
+    public String toString() {
+        return String.format("Key: " + key + ", Count: %08d", count);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CountKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CountKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CountKeyFinder.java
new file mode 100644
index 0000000..4fd5eb8
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CountKeyFinder.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.edsl.counter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.base.KeyFinder;
+
+public class CountKeyFinder implements KeyFinder<CountEvent> {
+
+    public List<String> get(CountEvent event) {
+
+        List<String> results = new ArrayList<String>();
+
+        /* Retrieve the user ID and add it to the list. */
+        results.add(event.getKey());
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterApp.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterApp.java
new file mode 100644
index 0000000..7012729
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterApp.java
@@ -0,0 +1,80 @@
+package org.apache.s4.example.edsl.counter;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.edsl.BuilderS4DSL;
+
+/**
+ * This is a sample application to test the S4 embedded domain-specific language (EDSL).
+ * 
+ * <p>
+ * Grammar:
+ * 
+ * <pre>
+ *  (pe , type , prop* , (fireOn , afterInterval? , afterNumEvents?)? , (timer, withPeriod)? , 
+ *  (cache, size , expires? )? , asSingleton? , (emit, onField?, 
+ *  (withKey|withKeyFinder)?, to+ )*  )+ , build
+ * </pre>
+ * 
+ * <p>
+ * See the <a href="http://code.google.com/p/diezel">Diezel</a> project for details.
+ * 
+ */
+final public class CounterApp extends BuilderS4DSL {
+
+    @Override
+    protected void onInit() {
+
+        pe("Print").type(PrintPE.class).asSingleton().
+
+        pe("User Count").type(CounterPE.class).fireOn(Event.class).afterInterval(100, TimeUnit.MILLISECONDS)
+                .emit(CountEvent.class).withKeyFinder(new CountKeyFinder()).to("Print").
+
+                pe("Gender Count").type(CounterPE.class).fireOn(Event.class).afterInterval(100, TimeUnit.MILLISECONDS)
+                .emit(CountEvent.class).withKeyFinder(new CountKeyFinder()).to("Print").
+
+                pe("Age Count").type(CounterPE.class).fireOn(Event.class).afterInterval(100, TimeUnit.MILLISECONDS)
+                .emit(CountEvent.class).withKeyFinder(new CountKeyFinder()).to("Print").
+
+                pe("Generate User Event").type(GenerateUserEventPE.class).timer().withPeriod(1, TimeUnit.MILLISECONDS)
+                .asSingleton().
+
+                emit(UserEvent.class).withKeyFinder(new UserIDKeyFinder()).to("User Count").
+
+                emit(UserEvent.class).withKey("gender").to("Gender Count").
+
+                emit(UserEvent.class).withKeyFinder(new AgeKeyFinder()).to("Age Count").
+
+                build();
+    }
+
+    /*
+     * Create and send 200 dummy events of type UserEvent.
+     * 
+     * @see io.s4.App#start()
+     */
+    @Override
+    protected void onStart() {
+
+    }
+
+    @Override
+    protected void onClose() {
+        System.out.println("Bye.");
+    }
+
+    // Make hooks public for testing. Normally this is handled by the container.
+    public void init() {
+        super.init();
+    }
+
+    public void start() {
+        super.start();
+    }
+
+    public void close() {
+        super.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterPE.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterPE.java
new file mode 100644
index 0000000..33e383b
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterPE.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.edsl.counter;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CounterPE extends ProcessingElement {
+
+    private static final Logger logger = LoggerFactory.getLogger(CounterPE.class);
+
+    private Stream<CountEvent>[] countStream = null;
+
+    public CounterPE(App app) {
+        super(app);
+    }
+
+    /**
+     * @return the countStream
+     */
+    public Stream<CountEvent>[] getCountStream() {
+        return countStream;
+    }
+
+    /**
+     * @param countStream
+     *            the countStream to set
+     */
+    public void setCountStream(Stream<CountEvent>[] countStream) {
+        this.countStream = countStream;
+    }
+
+    private long counter = 0;
+
+    public void onEvent(Event event) {
+
+        counter += 1;
+        logger.trace("PE with id [{}] incremented counter to [{}].", getId(), counter);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see io.s4.ProcessingElement#sendOutputEvent()
+     */
+    public void onTrigger(Event event) {
+
+        logger.trace("Sending count event for PE id [{}] with count [{}].", getId(), counter);
+        CountEvent countEvent = new CountEvent(getId(), counter);
+        emit(countEvent, countStream);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see io.s4.ProcessingElement#init()
+     */
+    @Override
+    public void onCreate() {
+
+    }
+
+    @Override
+    public void onRemove() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/GenderKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/GenderKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/GenderKeyFinder.java
new file mode 100644
index 0000000..708f862
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/GenderKeyFinder.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.edsl.counter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.base.KeyFinder;
+
+public class GenderKeyFinder implements KeyFinder<UserEvent> {
+
+    public List<String> get(UserEvent event) {
+
+        List<String> results = new ArrayList<String>();
+
+        /* Retrieve the gender and add it to the list. */
+        results.add(Character.toString(event.getGender()));
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/GenerateUserEventPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/GenerateUserEventPE.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/GenerateUserEventPE.java
new file mode 100644
index 0000000..a68a048
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/GenerateUserEventPE.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.edsl.counter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenerateUserEventPE extends ProcessingElement {
+
+    private static final Logger logger = LoggerFactory.getLogger(GenerateUserEventPE.class);
+
+    static String userIds[] = { "pepe", "jose", "tito", "mr_smith", "joe" };
+    static int[] ages = { 25, 2, 33, 6, 67 };
+    static char[] genders = { 'f', 'm' };
+    private Stream<UserEvent>[] targetStreams;
+    final private Random generator = new Random(22);
+
+    public GenerateUserEventPE(App app) {
+        super(app);
+    }
+
+    /**
+     * @param targetStreams
+     *            the {@link UserEvent} streams.
+     */
+    public void setStreams(Stream<UserEvent>... targetStreams) {
+        this.targetStreams = targetStreams;
+    }
+
+    protected void onTime() {
+        List<String> favorites = new ArrayList<String>();
+        favorites.add("dulce de leche");
+        favorites.add("strawberry");
+
+        int indexUserID = generator.nextInt(userIds.length);
+        int indexAge = generator.nextInt(ages.length);
+        int indexGender = generator.nextInt(2);
+
+        UserEvent userEvent = new UserEvent(userIds[indexUserID], ages[indexAge], favorites, genders[indexGender]);
+        logger.trace("Sending userID: [{}], age: [{}].", userIds[indexUserID], ages[indexAge]);
+        emit(userEvent, targetStreams);
+    }
+
+    @Override
+    protected void onRemove() {
+    }
+
+    static int pickRandom(int numElements) {
+        return 0;
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/Module.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/Module.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/Module.java
new file mode 100644
index 0000000..b7afc4e
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/Module.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.edsl.counter;
+
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+/**
+ * Configures the controller.
+ * 
+ * Reads a properties file, provides a {@link Communicator} singleton, and configures Guice bindings.
+ * 
+ * @author Leo Neumeyer
+ */
+public class Module extends AbstractModule {
+
+    protected PropertiesConfiguration config = null;
+
+    private void loadProperties(Binder binder) {
+
+        try {
+            InputStream is = this.getClass().getResourceAsStream("/s4-counter-example.properties");
+            config = new PropertiesConfiguration();
+            config.load(is);
+
+            System.out.println(ConfigurationUtils.toString(config));
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void configure() {
+        if (config == null)
+            loadProperties(binder());
+
+        bind(Cluster.class);
+
+        /* Configure static assignment using a configuration file. */
+        bind(Assignment.class).to(AssignmentFromFile.class);
+
+        /* Configure a static cluster topology using a configuration file. */
+        bind(Topology.class).to(TopologyFromFile.class);
+
+        // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
+        // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
+        //
+        // bind(Emitter.class).to(QueueingEmitter.class);
+        // bind(Listener.class).to(QueueingListener.class);
+
+        /* Use the Netty comm layer implementation. */
+        // bind(Emitter.class).to(NettyEmitter.class);
+        // bind(Listener.class).to(NettyListener.class);
+
+        /* Use a simple UDP comm layer implementation. */
+        bind(Emitter.class).to(UDPEmitter.class);
+        bind(Listener.class).to(UDPListener.class);
+
+        /* The hashing function to map keys top partitions. */
+        bind(Hasher.class).to(DefaultHasher.class);
+
+        /* Use Kryo to serialize events. */
+        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/PrintPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/PrintPE.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/PrintPE.java
new file mode 100644
index 0000000..bf841f4
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/PrintPE.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.edsl.counter;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PrintPE extends ProcessingElement {
+
+    private static final Logger logger = LoggerFactory.getLogger(PrintPE.class);
+
+    public PrintPE(App app) {
+        super(app);
+    }
+
+    public void onEvent(Event event) {
+
+        logger.info(">>> [{}].", event.toString());
+    }
+
+    @Override
+    protected void onCreate() {
+    }
+
+    @Override
+    protected void onRemove() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/TestCounterApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/TestCounterApp.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/TestCounterApp.java
new file mode 100644
index 0000000..29e086b
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/TestCounterApp.java
@@ -0,0 +1,26 @@
+package org.apache.s4.example.edsl.counter;
+
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TestCounterApp {
+
+    @Test
+    public void test() throws Exception {
+        Injector injector = Guice.createInjector(new Module());
+        CounterApp myApp = injector.getInstance(CounterApp.class);
+
+        /* Normally. the container will handle this but this is just a test. */
+        myApp.init();
+        myApp.start();
+
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        myApp.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/UserEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/UserEvent.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/UserEvent.java
new file mode 100644
index 0000000..f2e3bc6
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/UserEvent.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.edsl.counter;
+
+
+import java.util.List;
+
+import org.apache.s4.base.Event;
+
+public class UserEvent extends Event {
+
+    private String userID;
+    private int age;
+    private char gender;
+    private List<String> favorites;
+    
+    public UserEvent() {
+        
+    }
+
+    UserEvent(String userID, int age, List<String> favorites, char gender) {
+        this.userID = userID;
+        this.age = age;
+        this.favorites = favorites;
+        this.gender = gender;
+    }
+
+    /**
+     * @return the userID
+     */
+    public String getUserID() {
+        return userID;
+    }
+
+    /**
+     * @return the age
+     */
+    public int getAge() {
+        return age;
+    }
+
+    /**
+     * @return the favorites
+     */
+    public List<String> getFavorites() {
+        return favorites;
+    }
+
+    /**
+     * @return the gender
+     */
+    public char getGender() {
+        return gender;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/UserIDKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/UserIDKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/UserIDKeyFinder.java
new file mode 100644
index 0000000..3fded2a
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/UserIDKeyFinder.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.edsl.counter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.base.KeyFinder;
+
+public class UserIDKeyFinder implements KeyFinder<UserEvent> {
+
+    public List<String> get(UserEvent event) {
+
+        List<String> results = new ArrayList<String>();
+
+        /* Retrieve the user ID and add it to the list. */
+        results.add(event.getUserID());
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/AgeKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/AgeKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/AgeKeyFinder.java
deleted file mode 100644
index f8ee411..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/AgeKeyFinder.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.example.fluent.counter;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.s4.base.KeyFinder;
-
-public class AgeKeyFinder implements KeyFinder<UserEvent> {
-
-    public List<String> get(UserEvent event) {
-
-        List<String> results = new ArrayList<String>();
-
-        /* Retrieve the age and add it to the list. */
-        results.add(Integer.toString(event.getAge()));
-
-        return results;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountEvent.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountEvent.java
deleted file mode 100644
index eea336f..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountEvent.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.example.fluent.counter;
-
-import org.apache.s4.base.Event;
-
-public class CountEvent extends Event {
-
-    private String key;
-    private long count;
-    
-    public CountEvent() {
-        
-    }
-
-    CountEvent(String key, long count) {
-        this.key = key;
-        this.count = count;
-    }
-
-    CountEvent(String key, long count, long time) {
-        super(time);
-        this.key = key;
-        this.count = count;
-    }
-
-   
-    /**
-     * @return the key
-     */
-    public String getKey() {
-        return key;
-    }
-
-    /**
-     * @return the count
-     */
-    public long getCount() {
-        return count;
-    }
-
-    public String toString() {
-        return String.format("Key: " + key + ", Count: %08d", count);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountKeyFinder.java
deleted file mode 100644
index 8991ddd..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountKeyFinder.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.example.fluent.counter;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.s4.base.KeyFinder;
-
-public class CountKeyFinder implements KeyFinder<CountEvent> {
-
-    public List<String> get(CountEvent event) {
-
-        List<String> results = new ArrayList<String>();
-
-        /* Retrieve the user ID and add it to the list. */
-        results.add(event.getKey());
-
-        return results;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CounterPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CounterPE.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CounterPE.java
deleted file mode 100644
index 87f1a6f..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CounterPE.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.example.fluent.counter;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Stream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CounterPE extends ProcessingElement {
-
-    private static final Logger logger = LoggerFactory.getLogger(CounterPE.class);
-
-    private Stream<CountEvent>[] countStream = null;
-
-    public CounterPE(App app) {
-        super(app);
-    }
-
-    /**
-     * @return the countStream
-     */
-    public Stream<CountEvent>[] getCountStream() {
-        return countStream;
-    }
-
-    /**
-     * @param countStream
-     *            the countStream to set
-     */
-    public void setCountStream(Stream<CountEvent>[] countStream) {
-        this.countStream = countStream;
-    }
-
-    private long counter = 0;
-
-    public void onEvent(Event event) {
-
-        counter += 1;
-        logger.trace("PE with id [{}] incremented counter to [{}].", getId(), counter);
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see io.s4.ProcessingElement#sendOutputEvent()
-     */
-    public void onTrigger(Event event) {
-
-        logger.trace("Sending count event for PE id [{}] with count [{}].", getId(), counter);
-        CountEvent countEvent = new CountEvent(getId(), counter);
-        emit(countEvent, countStream);
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see io.s4.ProcessingElement#init()
-     */
-    @Override
-    public void onCreate() {
-
-    }
-
-    @Override
-    public void onRemove() {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenderKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenderKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenderKeyFinder.java
deleted file mode 100644
index 06875a6..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenderKeyFinder.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.example.fluent.counter;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.s4.base.KeyFinder;
-
-public class GenderKeyFinder implements KeyFinder<UserEvent> {
-
-    public List<String> get(UserEvent event) {
-
-        List<String> results = new ArrayList<String>();
-
-        /* Retrieve the gender and add it to the list. */
-        results.add(Character.toString(event.getGender()));
-
-        return results;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenerateUserEventPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenerateUserEventPE.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenerateUserEventPE.java
deleted file mode 100644
index 1d53355..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenerateUserEventPE.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.example.fluent.counter;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Stream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class GenerateUserEventPE extends ProcessingElement {
-
-    private static final Logger logger = LoggerFactory.getLogger(GenerateUserEventPE.class);
-
-    static String userIds[] = { "pepe", "jose", "tito", "mr_smith", "joe" };
-    static int[] ages = { 25, 2, 33, 6, 67 };
-    static char[] genders = { 'f', 'm' };
-    private Stream<UserEvent>[] targetStreams;
-    final private Random generator = new Random(22);
-
-    public GenerateUserEventPE(App app) {
-        super(app);
-    }
-
-    /**
-     * @param targetStreams
-     *            the {@link UserEvent} streams.
-     */
-    public void setStreams(Stream<UserEvent>... targetStreams) {
-        this.targetStreams = targetStreams;
-    }
-
-    protected void onTime() {
-        List<String> favorites = new ArrayList<String>();
-        favorites.add("dulce de leche");
-        favorites.add("strawberry");
-
-        int indexUserID = generator.nextInt(userIds.length);
-        int indexAge = generator.nextInt(ages.length);
-        int indexGender = generator.nextInt(2);
-
-        UserEvent userEvent = new UserEvent(userIds[indexUserID], ages[indexAge], favorites, genders[indexGender]);
-        logger.trace("Sending userID: [{}], age: [{}].", userIds[indexUserID], ages[indexAge]);
-        emit(userEvent, targetStreams);
-    }
-
-    @Override
-    protected void onRemove() {
-    }
-
-    static int pickRandom(int numElements) {
-        return 0;
-    }
-
-    @Override
-    protected void onCreate() {
-        // TODO Auto-generated method stub
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java
deleted file mode 100644
index 7c05836..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.example.fluent.counter;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.Receiver;
-import org.apache.s4.core.Sender;
-import org.apache.s4.fluent.AppMaker;
-import org.apache.s4.fluent.FluentApp;
-import org.apache.s4.fluent.PEMaker;
-import org.junit.Test;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
-/*
- * This is a sample application to test a new S4 API. 
- * See README file for details.
- * 
- * */
-
-final public class Main extends AppMaker {
-
-    public Main() {
-        super();
-    }
-
-    private PEMaker generateUserEventPE;
-
-    /*
-     * 
-     * 
-     * The application graph itself is created in this Class. However, developers may provide tools for creating apps
-     * which will generate the objects.
-     * 
-     * IMPORTANT: we create a graph of PE prototypes. The prototype is a class instance that is used as a prototype from
-     * which all PE instance will be created. The prototype itself is not used as an instance. (Except when the PE is of
-     * type Singleton PE). To create a data structure for each PE instance you must do it in the method
-     * ProcessingElement.onCreate().
-     */
-
-    /*
-     * Build the application graph using POJOs. Don't like it? Write a nice tool.
-     * 
-     * @see io.s4.App#init()
-     */
-    @Override
-    public void configure() {
-
-        /* PE that prints counts to console. */
-        PEMaker printPE = addPE(PrintPE.class);
-
-        /* PEs that count events by user, gender, and age. */
-        PEMaker userCountPE = addPE(CounterPE.class);
-        userCountPE.addTrigger().fireOn(Event.class).ifInterval(100l, TimeUnit.MILLISECONDS);
-
-        PEMaker genderCountPE = addPE(CounterPE.class);
-        genderCountPE.addTrigger().fireOn(Event.class).ifInterval(100l, TimeUnit.MILLISECONDS);
-
-        PEMaker ageCountPE = addPE(CounterPE.class);
-        ageCountPE.addTrigger().fireOn(Event.class).ifInterval(100l, TimeUnit.MILLISECONDS);
-
-        generateUserEventPE = addPE(GenerateUserEventPE.class).asSingleton();
-        generateUserEventPE.addTimer().withDuration(1, TimeUnit.MILLISECONDS);
-
-        ageCountPE.emit(CountEvent.class).onKey(new CountKeyFinder()).to(printPE);
-        genderCountPE.emit(CountEvent.class).onKey(new CountKeyFinder()).to(printPE);
-        userCountPE.emit(CountEvent.class).onKey(new CountKeyFinder()).to(printPE);
-
-        generateUserEventPE.emit(UserEvent.class).onKey(new AgeKeyFinder()).to(ageCountPE);
-        generateUserEventPE.emit(UserEvent.class).onKey("gender").to(genderCountPE);
-        generateUserEventPE.emit(UserEvent.class).onKey(new UserIDKeyFinder()).to(userCountPE);
-    }
-
-    /*
-     * Create and send 200 dummy events of type UserEvent.
-     * 
-     * @see io.s4.App#start()
-     */
-    @Override
-    public void start() {
-
-    }
-
-    @Override
-    public void close() {
-        System.out.println("Bye.");
-
-    }
-
-    // public static void main(String[] args) {
-    @Test
-    public void test() throws Exception {
-
-        Injector injector = Guice.createInjector(new Module());
-        FluentApp myApp = injector.getInstance(FluentApp.class);
-        Sender sender = injector.getInstance(Sender.class);
-        Receiver receiver = injector.getInstance(Receiver.class);
-        myApp.setCommLayer(sender, receiver);
-
-        /* Normally. the container will handle this but this is just a test. */
-        myApp.init();
-        myApp.start();
-
-        try {
-            Thread.sleep(1000);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-        myApp.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
deleted file mode 100644
index a8f034b..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.example.fluent.counter;
-
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-import org.apache.s4.fluent.AppMaker;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-/**
- * Configures the controller.
- * 
- * Reads a properties file, provides a {@link Communicator} singleton, and configures Guice bindings.
- * 
- * @author Leo Neumeyer
- */
-public class Module extends AbstractModule {
-
-    protected PropertiesConfiguration config = null;
-
-    private void loadProperties(Binder binder) {
-
-        try {
-            InputStream is = this.getClass().getResourceAsStream("/s4-counter-example.properties");
-            config = new PropertiesConfiguration();
-            config.load(is);
-
-            System.out.println(ConfigurationUtils.toString(config));
-            // TODO - validate properties.
-
-            /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-        } catch (ConfigurationException e) {
-            binder.addError(e);
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    protected void configure() {
-        if (config == null)
-            loadProperties(binder());
-
-        bind(AppMaker.class).to(Main.class);
-
-        bind(Cluster.class);
-
-        /* Configure static assignment using a configuration file. */
-        bind(Assignment.class).to(AssignmentFromFile.class);
-
-        /* Configure a static cluster topology using a configuration file. */
-        bind(Topology.class).to(TopologyFromFile.class);
-
-        // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
-        // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
-        //
-        // bind(Emitter.class).to(QueueingEmitter.class);
-        // bind(Listener.class).to(QueueingListener.class);
-
-        /* Use the Netty comm layer implementation. */
-        // bind(Emitter.class).to(NettyEmitter.class);
-        // bind(Listener.class).to(NettyListener.class);
-
-        /* Use a simple UDP comm layer implementation. */
-        bind(Emitter.class).to(UDPEmitter.class);
-        bind(Listener.class).to(UDPListener.class);
-
-        /* The hashing function to map keys top partitions. */
-        bind(Hasher.class).to(DefaultHasher.class);
-
-        /* Use Kryo to serialize events. */
-        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/PrintPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/PrintPE.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/PrintPE.java
deleted file mode 100644
index 1a5b2de..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/PrintPE.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.example.fluent.counter;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PrintPE extends ProcessingElement {
-
-    private static final Logger logger = LoggerFactory.getLogger(PrintPE.class);
-
-    public PrintPE(App app) {
-        super(app);
-    }
-
-    public void onEvent(Event event) {
-
-        logger.info(">>> [{}].", event.toString());
-    }
-
-    @Override
-    protected void onCreate() {
-    }
-
-    @Override
-    protected void onRemove() {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/README.md
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/README.md b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/README.md
deleted file mode 100644
index 72e0a3d..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/README.md
+++ /dev/null
@@ -1,17 +0,0 @@
-S4 Counter Example
-==================
-
-In this example we do the following:
-
-- Generate dummy events (UserEvent).
-- Key events by user, gender, age.
-- Count by user, gender, age.
-- Print partial counts.
-
-The following diagram shows the application graph:
-
-![S4 Counter](https://github.com/leoneu/s4-piper/raw/master/etc/s4-counter-example.png)
-
-In in following diagram I show how Classes, PE Prototypes, PE instances, and Streams are related.
-
-![S4 Objects](https://github.com/leoneu/s4-piper/raw/master/etc/s4-objects-example.png)

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserEvent.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserEvent.java
deleted file mode 100644
index b33adc0..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserEvent.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.example.fluent.counter;
-
-
-import java.util.List;
-
-import org.apache.s4.base.Event;
-
-public class UserEvent extends Event {
-
-    private String userID;
-    private int age;
-    private char gender;
-    private List<String> favorites;
-    
-    public UserEvent() {
-        
-    }
-
-    UserEvent(String userID, int age, List<String> favorites, char gender) {
-        this.userID = userID;
-        this.age = age;
-        this.favorites = favorites;
-        this.gender = gender;
-    }
-
-    /**
-     * @return the userID
-     */
-    public String getUserID() {
-        return userID;
-    }
-
-    /**
-     * @return the age
-     */
-    public int getAge() {
-        return age;
-    }
-
-    /**
-     * @return the favorites
-     */
-    public List<String> getFavorites() {
-        return favorites;
-    }
-
-    /**
-     * @return the gender
-     */
-    public char getGender() {
-        return gender;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserIDKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserIDKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserIDKeyFinder.java
deleted file mode 100644
index dd81715..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserIDKeyFinder.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.example.fluent.counter;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.s4.base.KeyFinder;
-
-public class UserIDKeyFinder implements KeyFinder<UserEvent> {
-
-    public List<String> get(UserEvent event) {
-
-        List<String> results = new ArrayList<String>();
-
-        /* Retrieve the user ID and add it to the list. */
-        results.add(event.getUserID());
-
-        return results;
-    }
-}