You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 14:03:28 UTC

[6/50] [abbrv] Fluent API close to complete. New counter example using the fluent API.

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java
index d8df20a..eb96b9d 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java
@@ -15,7 +15,6 @@
  */
 package org.apache.s4.example.counter;
 
-
 import java.io.InputStream;
 
 import org.apache.commons.configuration.ConfigurationConverter;
@@ -43,8 +42,7 @@ import com.google.inject.name.Names;
 /**
  * Configures the controller.
  * 
- * Reads a properties file, provides a {@link Communicator} singleton, and
- * configures Guice bindings.
+ * Reads a properties file, provides a {@link Communicator} singleton, and configures Guice bindings.
  * 
  * @author Leo Neumeyer
  */
@@ -55,8 +53,7 @@ public class Module extends AbstractModule {
     private void loadProperties(Binder binder) {
 
         try {
-            InputStream is = this.getClass().getResourceAsStream(
-                    "/s4-piper-example.properties");
+            InputStream is = this.getClass().getResourceAsStream("/s4-piper-example.properties");
             config = new PropertiesConfiguration();
             config.load(is);
 
@@ -64,8 +61,7 @@ public class Module extends AbstractModule {
             // TODO - validate properties.
 
             /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder,
-                    ConfigurationConverter.getProperties(config));
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
         } catch (ConfigurationException e) {
             binder.addError(e);
             e.printStackTrace();
@@ -78,34 +74,34 @@ public class Module extends AbstractModule {
             loadProperties(binder());
 
         bind(MyApp.class);
-        
+
         bind(Cluster.class);
-        
+
         /* Configure static assignment using a configuration file. */
         bind(Assignment.class).to(AssignmentFromFile.class);
-        
+
         /* Configure a static cluster topology using a configuration file. */
         bind(Topology.class).to(TopologyFromFile.class);
-        
-//        bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
-//        bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
-//        
-//        bind(Emitter.class).to(QueueingEmitter.class);
-//        bind(Listener.class).to(QueueingListener.class);
-        
+
+        // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
+        // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
+        //
+        // bind(Emitter.class).to(QueueingEmitter.class);
+        // bind(Listener.class).to(QueueingListener.class);
+
         /* Use the Netty comm layer implementation. */
-//        bind(Emitter.class).to(NettyEmitter.class);
-//        bind(Listener.class).to(NettyListener.class);
-        
+        // bind(Emitter.class).to(NettyEmitter.class);
+        // bind(Listener.class).to(NettyListener.class);
+
         /* Use a simple UDP comm layer implementation. */
         bind(Emitter.class).to(UDPEmitter.class);
         bind(Listener.class).to(UDPListener.class);
-        
+
         /* The hashing function to map keys top partitions. */
         bind(Hasher.class).to(DefaultHasher.class);
-        
+
         /* Use Kryo to serialize events. */
         bind(SerializerDeserializer.class).to(KryoSerDeser.class);
 
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
index cae9a3b..42d8e20 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
@@ -60,6 +60,7 @@ final public class MyApp extends App {
 
         /* PE that prints counts to console. */
         PrintPE printPE = createPE(PrintPE.class);
+        printPE.setSingleton(true);
 
         Stream<CountEvent> userCountStream = createStream(CountEvent.class).setName("User Count Stream")
                 .setKey(new CountKeyFinder()).setPE(printPE);
@@ -71,7 +72,7 @@ final public class MyApp extends App {
                 .setKey(new CountKeyFinder()).setPE(printPE);
 
         /* PEs that count events by user, gender, and age. */
-        CounterPE userCountPE = createPE(CounterPE.class);// .withTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+        CounterPE userCountPE = createPE(CounterPE.class);
         userCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
         userCountPE.setCountStream(userCountStream);
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/AgeKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/AgeKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/AgeKeyFinder.java
new file mode 100644
index 0000000..2d772de
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/AgeKeyFinder.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.fluent.counter;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class AgeKeyFinder implements KeyFinder<UserEvent> {
+
+    public List<String> get(UserEvent event) {
+
+        List<String> results = new ArrayList<String>();
+
+        /* Retrieve the age and add it to the list. */
+        results.add(Integer.toString(event.getAge()));
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountEvent.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountEvent.java
new file mode 100644
index 0000000..eea336f
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.fluent.counter;
+
+import org.apache.s4.base.Event;
+
+public class CountEvent extends Event {
+
+    private String key;
+    private long count;
+    
+    public CountEvent() {
+        
+    }
+
+    CountEvent(String key, long count) {
+        this.key = key;
+        this.count = count;
+    }
+
+    CountEvent(String key, long count, long time) {
+        super(time);
+        this.key = key;
+        this.count = count;
+    }
+
+   
+    /**
+     * @return the key
+     */
+    public String getKey() {
+        return key;
+    }
+
+    /**
+     * @return the count
+     */
+    public long getCount() {
+        return count;
+    }
+
+    public String toString() {
+        return String.format("Key: " + key + ", Count: %08d", count);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountKeyFinder.java
new file mode 100644
index 0000000..2314696
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountKeyFinder.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.fluent.counter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class CountKeyFinder implements KeyFinder<CountEvent> {
+
+    public List<String> get(CountEvent event) {
+
+        List<String> results = new ArrayList<String>();
+
+        /* Retrieve the user ID and add it to the list. */
+        results.add(event.getKey());
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CounterPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CounterPE.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CounterPE.java
new file mode 100644
index 0000000..87f1a6f
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CounterPE.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.fluent.counter;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CounterPE extends ProcessingElement {
+
+    private static final Logger logger = LoggerFactory.getLogger(CounterPE.class);
+
+    private Stream<CountEvent>[] countStream = null;
+
+    public CounterPE(App app) {
+        super(app);
+    }
+
+    /**
+     * @return the countStream
+     */
+    public Stream<CountEvent>[] getCountStream() {
+        return countStream;
+    }
+
+    /**
+     * @param countStream
+     *            the countStream to set
+     */
+    public void setCountStream(Stream<CountEvent>[] countStream) {
+        this.countStream = countStream;
+    }
+
+    private long counter = 0;
+
+    public void onEvent(Event event) {
+
+        counter += 1;
+        logger.trace("PE with id [{}] incremented counter to [{}].", getId(), counter);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see io.s4.ProcessingElement#sendOutputEvent()
+     */
+    public void onTrigger(Event event) {
+
+        logger.trace("Sending count event for PE id [{}] with count [{}].", getId(), counter);
+        CountEvent countEvent = new CountEvent(getId(), counter);
+        emit(countEvent, countStream);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see io.s4.ProcessingElement#init()
+     */
+    @Override
+    public void onCreate() {
+
+    }
+
+    @Override
+    public void onRemove() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenderKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenderKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenderKeyFinder.java
new file mode 100644
index 0000000..d7883c8
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenderKeyFinder.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.fluent.counter;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class GenderKeyFinder implements KeyFinder<UserEvent> {
+
+    public List<String> get(UserEvent event) {
+
+        List<String> results = new ArrayList<String>();
+
+        /* Retrieve the gender and add it to the list. */
+        results.add(Character.toString(event.getGender()));
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenerateUserEventPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenerateUserEventPE.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenerateUserEventPE.java
new file mode 100644
index 0000000..1d53355
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenerateUserEventPE.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.fluent.counter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenerateUserEventPE extends ProcessingElement {
+
+    private static final Logger logger = LoggerFactory.getLogger(GenerateUserEventPE.class);
+
+    static String userIds[] = { "pepe", "jose", "tito", "mr_smith", "joe" };
+    static int[] ages = { 25, 2, 33, 6, 67 };
+    static char[] genders = { 'f', 'm' };
+    private Stream<UserEvent>[] targetStreams;
+    final private Random generator = new Random(22);
+
+    public GenerateUserEventPE(App app) {
+        super(app);
+    }
+
+    /**
+     * @param targetStreams
+     *            the {@link UserEvent} streams.
+     */
+    public void setStreams(Stream<UserEvent>... targetStreams) {
+        this.targetStreams = targetStreams;
+    }
+
+    protected void onTime() {
+        List<String> favorites = new ArrayList<String>();
+        favorites.add("dulce de leche");
+        favorites.add("strawberry");
+
+        int indexUserID = generator.nextInt(userIds.length);
+        int indexAge = generator.nextInt(ages.length);
+        int indexGender = generator.nextInt(2);
+
+        UserEvent userEvent = new UserEvent(userIds[indexUserID], ages[indexAge], favorites, genders[indexGender]);
+        logger.trace("Sending userID: [{}], age: [{}].", userIds[indexUserID], ages[indexAge]);
+        emit(userEvent, targetStreams);
+    }
+
+    @Override
+    protected void onRemove() {
+    }
+
+    static int pickRandom(int numElements) {
+        return 0;
+    }
+
+    @Override
+    protected void onCreate() {
+        // TODO Auto-generated method stub
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java
new file mode 100644
index 0000000..cb9c098
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.fluent.counter;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.Receiver;
+import org.apache.s4.core.Sender;
+import org.apache.s4.fluent.AppMaker;
+import org.apache.s4.fluent.FluentApp;
+import org.apache.s4.fluent.PEMaker;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+/*
+ * This is a sample application to test a new S4 API. 
+ * See README file for details.
+ * 
+ * */
+
+final public class Main extends AppMaker {
+
+    public Main() {
+        super();
+    }
+
+    private PEMaker generateUserEventPE;
+
+    /*
+     * 
+     * 
+     * The application graph itself is created in this Class. However, developers may provide tools for creating apps
+     * which will generate the objects.
+     * 
+     * IMPORTANT: we create a graph of PE prototypes. The prototype is a class instance that is used as a prototype from
+     * which all PE instance will be created. The prototype itself is not used as an instance. (Except when the PE is of
+     * type Singleton PE). To create a data structure for each PE instance you must do it in the method
+     * ProcessingElement.onCreate().
+     */
+
+    /*
+     * Build the application graph using POJOs. Don't like it? Write a nice tool.
+     * 
+     * @see io.s4.App#init()
+     */
+    @Override
+    public void configure() {
+
+        /* PE that prints counts to console. */
+        PEMaker printPE = addPE(PrintPE.class).asSingleton();
+
+        /* PEs that count events by user, gender, and age. */
+        PEMaker userCountPE = addPE(CounterPE.class);
+        userCountPE.addTrigger().fireOn(Event.class).ifInterval(100l, TimeUnit.MILLISECONDS);
+
+        PEMaker genderCountPE = addPE(CounterPE.class);
+        genderCountPE.addTrigger().fireOn(Event.class).ifInterval(100l, TimeUnit.MILLISECONDS);
+
+        PEMaker ageCountPE = addPE(CounterPE.class);
+        ageCountPE.addTrigger().fireOn(Event.class).ifInterval(100l, TimeUnit.MILLISECONDS);
+
+        generateUserEventPE = addPE(GenerateUserEventPE.class).asSingleton();
+        generateUserEventPE.addTimer().withDuration(1, TimeUnit.MILLISECONDS);
+
+        ageCountPE.emit(CountEvent.class).onKey(new CountKeyFinder()).to(printPE);
+        genderCountPE.emit(CountEvent.class).onKey(new CountKeyFinder()).to(printPE);
+        userCountPE.emit(CountEvent.class).onKey(new CountKeyFinder()).to(printPE);
+
+        generateUserEventPE.emit(UserEvent.class).onKey(new AgeKeyFinder()).to(ageCountPE);
+        generateUserEventPE.emit(UserEvent.class).onKey(new GenderKeyFinder()).to(genderCountPE);
+        generateUserEventPE.emit(UserEvent.class).onKey(new UserIDKeyFinder()).to(userCountPE);
+    }
+
+    /*
+     * Create and send 200 dummy events of type UserEvent.
+     * 
+     * @see io.s4.App#start()
+     */
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public void close() {
+        System.out.println("Bye.");
+
+    }
+
+    // public static void main(String[] args) {
+    @Test
+    public void test() throws Exception {
+
+        Injector injector = Guice.createInjector(new Module());
+        FluentApp myApp = injector.getInstance(FluentApp.class);
+        Sender sender = injector.getInstance(Sender.class);
+        Receiver receiver = injector.getInstance(Receiver.class);
+        myApp.setCommLayer(sender, receiver);
+
+        /* Normally. the container will handle this but this is just a test. */
+        myApp.init();
+        myApp.start();
+
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        myApp.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
new file mode 100644
index 0000000..a8f034b
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.fluent.counter;
+
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+import org.apache.s4.fluent.AppMaker;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+/**
+ * Configures the controller.
+ * 
+ * Reads a properties file, provides a {@link Communicator} singleton, and configures Guice bindings.
+ * 
+ * @author Leo Neumeyer
+ */
+public class Module extends AbstractModule {
+
+    protected PropertiesConfiguration config = null;
+
+    private void loadProperties(Binder binder) {
+
+        try {
+            InputStream is = this.getClass().getResourceAsStream("/s4-counter-example.properties");
+            config = new PropertiesConfiguration();
+            config.load(is);
+
+            System.out.println(ConfigurationUtils.toString(config));
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void configure() {
+        if (config == null)
+            loadProperties(binder());
+
+        bind(AppMaker.class).to(Main.class);
+
+        bind(Cluster.class);
+
+        /* Configure static assignment using a configuration file. */
+        bind(Assignment.class).to(AssignmentFromFile.class);
+
+        /* Configure a static cluster topology using a configuration file. */
+        bind(Topology.class).to(TopologyFromFile.class);
+
+        // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
+        // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
+        //
+        // bind(Emitter.class).to(QueueingEmitter.class);
+        // bind(Listener.class).to(QueueingListener.class);
+
+        /* Use the Netty comm layer implementation. */
+        // bind(Emitter.class).to(NettyEmitter.class);
+        // bind(Listener.class).to(NettyListener.class);
+
+        /* Use a simple UDP comm layer implementation. */
+        bind(Emitter.class).to(UDPEmitter.class);
+        bind(Listener.class).to(UDPListener.class);
+
+        /* The hashing function to map keys top partitions. */
+        bind(Hasher.class).to(DefaultHasher.class);
+
+        /* Use Kryo to serialize events. */
+        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/PrintPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/PrintPE.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/PrintPE.java
new file mode 100644
index 0000000..1a5b2de
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/PrintPE.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.fluent.counter;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PrintPE extends ProcessingElement {
+
+    private static final Logger logger = LoggerFactory.getLogger(PrintPE.class);
+
+    public PrintPE(App app) {
+        super(app);
+    }
+
+    public void onEvent(Event event) {
+
+        logger.info(">>> [{}].", event.toString());
+    }
+
+    @Override
+    protected void onCreate() {
+    }
+
+    @Override
+    protected void onRemove() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/README.md
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/README.md b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/README.md
new file mode 100644
index 0000000..72e0a3d
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/README.md
@@ -0,0 +1,17 @@
+S4 Counter Example
+==================
+
+In this example we do the following:
+
+- Generate dummy events (UserEvent).
+- Key events by user, gender, age.
+- Count by user, gender, age.
+- Print partial counts.
+
+The following diagram shows the application graph:
+
+![S4 Counter](https://github.com/leoneu/s4-piper/raw/master/etc/s4-counter-example.png)
+
+In in following diagram I show how Classes, PE Prototypes, PE instances, and Streams are related.
+
+![S4 Objects](https://github.com/leoneu/s4-piper/raw/master/etc/s4-objects-example.png)

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserEvent.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserEvent.java
new file mode 100644
index 0000000..b33adc0
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserEvent.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.fluent.counter;
+
+
+import java.util.List;
+
+import org.apache.s4.base.Event;
+
+public class UserEvent extends Event {
+
+    private String userID;
+    private int age;
+    private char gender;
+    private List<String> favorites;
+    
+    public UserEvent() {
+        
+    }
+
+    UserEvent(String userID, int age, List<String> favorites, char gender) {
+        this.userID = userID;
+        this.age = age;
+        this.favorites = favorites;
+        this.gender = gender;
+    }
+
+    /**
+     * @return the userID
+     */
+    public String getUserID() {
+        return userID;
+    }
+
+    /**
+     * @return the age
+     */
+    public int getAge() {
+        return age;
+    }
+
+    /**
+     * @return the favorites
+     */
+    public List<String> getFavorites() {
+        return favorites;
+    }
+
+    /**
+     * @return the gender
+     */
+    public char getGender() {
+        return gender;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserIDKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserIDKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserIDKeyFinder.java
new file mode 100644
index 0000000..0fcb24c
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserIDKeyFinder.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.example.fluent.counter;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class UserIDKeyFinder implements KeyFinder<UserEvent> {
+
+    public List<String> get(UserEvent event) {
+
+        List<String> results = new ArrayList<String>();
+
+        /* Retrieve the user ID and add it to the list. */
+        results.add(event.getUserID());
+
+        return results;
+    }
+}