You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 14:03:28 UTC
[6/50] [abbrv] Fluent API close to complete. New counter example
using the fluent API.
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java
index d8df20a..eb96b9d 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java
@@ -15,7 +15,6 @@
*/
package org.apache.s4.example.counter;
-
import java.io.InputStream;
import org.apache.commons.configuration.ConfigurationConverter;
@@ -43,8 +42,7 @@ import com.google.inject.name.Names;
/**
* Configures the controller.
*
- * Reads a properties file, provides a {@link Communicator} singleton, and
- * configures Guice bindings.
+ * Reads a properties file, provides a {@link Communicator} singleton, and configures Guice bindings.
*
* @author Leo Neumeyer
*/
@@ -55,8 +53,7 @@ public class Module extends AbstractModule {
private void loadProperties(Binder binder) {
try {
- InputStream is = this.getClass().getResourceAsStream(
- "/s4-piper-example.properties");
+ InputStream is = this.getClass().getResourceAsStream("/s4-piper-example.properties");
config = new PropertiesConfiguration();
config.load(is);
@@ -64,8 +61,7 @@ public class Module extends AbstractModule {
// TODO - validate properties.
/* Make all properties injectable. Do we need this? */
- Names.bindProperties(binder,
- ConfigurationConverter.getProperties(config));
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
} catch (ConfigurationException e) {
binder.addError(e);
e.printStackTrace();
@@ -78,34 +74,34 @@ public class Module extends AbstractModule {
loadProperties(binder());
bind(MyApp.class);
-
+
bind(Cluster.class);
-
+
/* Configure static assignment using a configuration file. */
bind(Assignment.class).to(AssignmentFromFile.class);
-
+
/* Configure a static cluster topology using a configuration file. */
bind(Topology.class).to(TopologyFromFile.class);
-
-// bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
-// bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
-//
-// bind(Emitter.class).to(QueueingEmitter.class);
-// bind(Listener.class).to(QueueingListener.class);
-
+
+ // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
+ // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
+ //
+ // bind(Emitter.class).to(QueueingEmitter.class);
+ // bind(Listener.class).to(QueueingListener.class);
+
/* Use the Netty comm layer implementation. */
-// bind(Emitter.class).to(NettyEmitter.class);
-// bind(Listener.class).to(NettyListener.class);
-
+ // bind(Emitter.class).to(NettyEmitter.class);
+ // bind(Listener.class).to(NettyListener.class);
+
/* Use a simple UDP comm layer implementation. */
bind(Emitter.class).to(UDPEmitter.class);
bind(Listener.class).to(UDPListener.class);
-
+
/* The hashing function to map keys top partitions. */
bind(Hasher.class).to(DefaultHasher.class);
-
+
/* Use Kryo to serialize events. */
bind(SerializerDeserializer.class).to(KryoSerDeser.class);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
index cae9a3b..42d8e20 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
@@ -60,6 +60,7 @@ final public class MyApp extends App {
/* PE that prints counts to console. */
PrintPE printPE = createPE(PrintPE.class);
+ printPE.setSingleton(true);
Stream<CountEvent> userCountStream = createStream(CountEvent.class).setName("User Count Stream")
.setKey(new CountKeyFinder()).setPE(printPE);
@@ -71,7 +72,7 @@ final public class MyApp extends App {
.setKey(new CountKeyFinder()).setPE(printPE);
/* PEs that count events by user, gender, and age. */
- CounterPE userCountPE = createPE(CounterPE.class);// .withTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
+ CounterPE userCountPE = createPE(CounterPE.class);
userCountPE.setTrigger(Event.class, interval, 10l, TimeUnit.SECONDS);
userCountPE.setCountStream(userCountStream);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/AgeKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/AgeKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/AgeKeyFinder.java
new file mode 100644
index 0000000..2d772de
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/AgeKeyFinder.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.fluent.counter;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class AgeKeyFinder implements KeyFinder<UserEvent> {
+
+ public List<String> get(UserEvent event) {
+
+ List<String> results = new ArrayList<String>();
+
+ /* Retrieve the age and add it to the list. */
+ results.add(Integer.toString(event.getAge()));
+
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountEvent.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountEvent.java
new file mode 100644
index 0000000..eea336f
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.fluent.counter;
+
+import org.apache.s4.base.Event;
+
+public class CountEvent extends Event {
+
+ private String key;
+ private long count;
+
+ public CountEvent() {
+
+ }
+
+ CountEvent(String key, long count) {
+ this.key = key;
+ this.count = count;
+ }
+
+ CountEvent(String key, long count, long time) {
+ super(time);
+ this.key = key;
+ this.count = count;
+ }
+
+
+ /**
+ * @return the key
+ */
+ public String getKey() {
+ return key;
+ }
+
+ /**
+ * @return the count
+ */
+ public long getCount() {
+ return count;
+ }
+
+ public String toString() {
+ return String.format("Key: " + key + ", Count: %08d", count);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountKeyFinder.java
new file mode 100644
index 0000000..2314696
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountKeyFinder.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.fluent.counter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class CountKeyFinder implements KeyFinder<CountEvent> {
+
+ public List<String> get(CountEvent event) {
+
+ List<String> results = new ArrayList<String>();
+
+ /* Retrieve the user ID and add it to the list. */
+ results.add(event.getKey());
+
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CounterPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CounterPE.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CounterPE.java
new file mode 100644
index 0000000..87f1a6f
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CounterPE.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.fluent.counter;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CounterPE extends ProcessingElement {
+
+ private static final Logger logger = LoggerFactory.getLogger(CounterPE.class);
+
+ private Stream<CountEvent>[] countStream = null;
+
+ public CounterPE(App app) {
+ super(app);
+ }
+
+ /**
+ * @return the countStream
+ */
+ public Stream<CountEvent>[] getCountStream() {
+ return countStream;
+ }
+
+ /**
+ * @param countStream
+ * the countStream to set
+ */
+ public void setCountStream(Stream<CountEvent>[] countStream) {
+ this.countStream = countStream;
+ }
+
+ private long counter = 0;
+
+ public void onEvent(Event event) {
+
+ counter += 1;
+ logger.trace("PE with id [{}] incremented counter to [{}].", getId(), counter);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see io.s4.ProcessingElement#sendOutputEvent()
+ */
+ public void onTrigger(Event event) {
+
+ logger.trace("Sending count event for PE id [{}] with count [{}].", getId(), counter);
+ CountEvent countEvent = new CountEvent(getId(), counter);
+ emit(countEvent, countStream);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see io.s4.ProcessingElement#init()
+ */
+ @Override
+ public void onCreate() {
+
+ }
+
+ @Override
+ public void onRemove() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenderKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenderKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenderKeyFinder.java
new file mode 100644
index 0000000..d7883c8
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenderKeyFinder.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.fluent.counter;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class GenderKeyFinder implements KeyFinder<UserEvent> {
+
+ public List<String> get(UserEvent event) {
+
+ List<String> results = new ArrayList<String>();
+
+ /* Retrieve the gender and add it to the list. */
+ results.add(Character.toString(event.getGender()));
+
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenerateUserEventPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenerateUserEventPE.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenerateUserEventPE.java
new file mode 100644
index 0000000..1d53355
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenerateUserEventPE.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.fluent.counter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenerateUserEventPE extends ProcessingElement {
+
+ private static final Logger logger = LoggerFactory.getLogger(GenerateUserEventPE.class);
+
+ static String userIds[] = { "pepe", "jose", "tito", "mr_smith", "joe" };
+ static int[] ages = { 25, 2, 33, 6, 67 };
+ static char[] genders = { 'f', 'm' };
+ private Stream<UserEvent>[] targetStreams;
+ final private Random generator = new Random(22);
+
+ public GenerateUserEventPE(App app) {
+ super(app);
+ }
+
+ /**
+ * @param targetStreams
+ * the {@link UserEvent} streams.
+ */
+ public void setStreams(Stream<UserEvent>... targetStreams) {
+ this.targetStreams = targetStreams;
+ }
+
+ protected void onTime() {
+ List<String> favorites = new ArrayList<String>();
+ favorites.add("dulce de leche");
+ favorites.add("strawberry");
+
+ int indexUserID = generator.nextInt(userIds.length);
+ int indexAge = generator.nextInt(ages.length);
+ int indexGender = generator.nextInt(2);
+
+ UserEvent userEvent = new UserEvent(userIds[indexUserID], ages[indexAge], favorites, genders[indexGender]);
+ logger.trace("Sending userID: [{}], age: [{}].", userIds[indexUserID], ages[indexAge]);
+ emit(userEvent, targetStreams);
+ }
+
+ @Override
+ protected void onRemove() {
+ }
+
+ static int pickRandom(int numElements) {
+ return 0;
+ }
+
+ @Override
+ protected void onCreate() {
+ // TODO Auto-generated method stub
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java
new file mode 100644
index 0000000..cb9c098
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.fluent.counter;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.Receiver;
+import org.apache.s4.core.Sender;
+import org.apache.s4.fluent.AppMaker;
+import org.apache.s4.fluent.FluentApp;
+import org.apache.s4.fluent.PEMaker;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+/*
+ * This is a sample application to test a new S4 API.
+ * See README file for details.
+ *
+ * */
+
+final public class Main extends AppMaker {
+
+ public Main() {
+ super();
+ }
+
+ private PEMaker generateUserEventPE;
+
+ /*
+ *
+ *
+ * The application graph itself is created in this Class. However, developers may provide tools for creating apps
+ * which will generate the objects.
+ *
+ * IMPORTANT: we create a graph of PE prototypes. The prototype is a class instance that is used as a prototype from
+ * which all PE instance will be created. The prototype itself is not used as an instance. (Except when the PE is of
+ * type Singleton PE). To create a data structure for each PE instance you must do it in the method
+ * ProcessingElement.onCreate().
+ */
+
+ /*
+ * Build the application graph using POJOs. Don't like it? Write a nice tool.
+ *
+ * @see io.s4.App#init()
+ */
+ @Override
+ public void configure() {
+
+ /* PE that prints counts to console. */
+ PEMaker printPE = addPE(PrintPE.class).asSingleton();
+
+ /* PEs that count events by user, gender, and age. */
+ PEMaker userCountPE = addPE(CounterPE.class);
+ userCountPE.addTrigger().fireOn(Event.class).ifInterval(100l, TimeUnit.MILLISECONDS);
+
+ PEMaker genderCountPE = addPE(CounterPE.class);
+ genderCountPE.addTrigger().fireOn(Event.class).ifInterval(100l, TimeUnit.MILLISECONDS);
+
+ PEMaker ageCountPE = addPE(CounterPE.class);
+ ageCountPE.addTrigger().fireOn(Event.class).ifInterval(100l, TimeUnit.MILLISECONDS);
+
+ generateUserEventPE = addPE(GenerateUserEventPE.class).asSingleton();
+ generateUserEventPE.addTimer().withDuration(1, TimeUnit.MILLISECONDS);
+
+ ageCountPE.emit(CountEvent.class).onKey(new CountKeyFinder()).to(printPE);
+ genderCountPE.emit(CountEvent.class).onKey(new CountKeyFinder()).to(printPE);
+ userCountPE.emit(CountEvent.class).onKey(new CountKeyFinder()).to(printPE);
+
+ generateUserEventPE.emit(UserEvent.class).onKey(new AgeKeyFinder()).to(ageCountPE);
+ generateUserEventPE.emit(UserEvent.class).onKey(new GenderKeyFinder()).to(genderCountPE);
+ generateUserEventPE.emit(UserEvent.class).onKey(new UserIDKeyFinder()).to(userCountPE);
+ }
+
+ /*
+ * Create and send 200 dummy events of type UserEvent.
+ *
+ * @see io.s4.App#start()
+ */
+ @Override
+ public void start() {
+
+ }
+
+ @Override
+ public void close() {
+ System.out.println("Bye.");
+
+ }
+
+ // public static void main(String[] args) {
+ @Test
+ public void test() throws Exception {
+
+ Injector injector = Guice.createInjector(new Module());
+ FluentApp myApp = injector.getInstance(FluentApp.class);
+ Sender sender = injector.getInstance(Sender.class);
+ Receiver receiver = injector.getInstance(Receiver.class);
+ myApp.setCommLayer(sender, receiver);
+
+ /* Normally. the container will handle this but this is just a test. */
+ myApp.init();
+ myApp.start();
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ myApp.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
new file mode 100644
index 0000000..a8f034b
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.fluent.counter;
+
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+import org.apache.s4.fluent.AppMaker;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+/**
+ * Configures the controller.
+ *
+ * Reads a properties file, provides a {@link Communicator} singleton, and configures Guice bindings.
+ *
+ * @author Leo Neumeyer
+ */
+public class Module extends AbstractModule {
+
+ protected PropertiesConfiguration config = null;
+
+ private void loadProperties(Binder binder) {
+
+ try {
+ InputStream is = this.getClass().getResourceAsStream("/s4-counter-example.properties");
+ config = new PropertiesConfiguration();
+ config.load(is);
+
+ System.out.println(ConfigurationUtils.toString(config));
+ // TODO - validate properties.
+
+ /* Make all properties injectable. Do we need this? */
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ protected void configure() {
+ if (config == null)
+ loadProperties(binder());
+
+ bind(AppMaker.class).to(Main.class);
+
+ bind(Cluster.class);
+
+ /* Configure static assignment using a configuration file. */
+ bind(Assignment.class).to(AssignmentFromFile.class);
+
+ /* Configure a static cluster topology using a configuration file. */
+ bind(Topology.class).to(TopologyFromFile.class);
+
+ // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
+ // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
+ //
+ // bind(Emitter.class).to(QueueingEmitter.class);
+ // bind(Listener.class).to(QueueingListener.class);
+
+ /* Use the Netty comm layer implementation. */
+ // bind(Emitter.class).to(NettyEmitter.class);
+ // bind(Listener.class).to(NettyListener.class);
+
+ /* Use a simple UDP comm layer implementation. */
+ bind(Emitter.class).to(UDPEmitter.class);
+ bind(Listener.class).to(UDPListener.class);
+
+ /* The hashing function to map keys top partitions. */
+ bind(Hasher.class).to(DefaultHasher.class);
+
+ /* Use Kryo to serialize events. */
+ bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/PrintPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/PrintPE.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/PrintPE.java
new file mode 100644
index 0000000..1a5b2de
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/PrintPE.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.fluent.counter;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PrintPE extends ProcessingElement {
+
+ private static final Logger logger = LoggerFactory.getLogger(PrintPE.class);
+
+ public PrintPE(App app) {
+ super(app);
+ }
+
+ public void onEvent(Event event) {
+
+ logger.info(">>> [{}].", event.toString());
+ }
+
+ @Override
+ protected void onCreate() {
+ }
+
+ @Override
+ protected void onRemove() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/README.md
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/README.md b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/README.md
new file mode 100644
index 0000000..72e0a3d
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/README.md
@@ -0,0 +1,17 @@
+S4 Counter Example
+==================
+
+In this example we do the following:
+
+- Generate dummy events (UserEvent).
+- Key events by user, gender, age.
+- Count by user, gender, age.
+- Print partial counts.
+
+The following diagram shows the application graph:
+
+![S4 Counter](https://github.com/leoneu/s4-piper/raw/master/etc/s4-counter-example.png)
+
+In in following diagram I show how Classes, PE Prototypes, PE instances, and Streams are related.
+
+![S4 Objects](https://github.com/leoneu/s4-piper/raw/master/etc/s4-objects-example.png)
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserEvent.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserEvent.java
new file mode 100644
index 0000000..b33adc0
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserEvent.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.fluent.counter;
+
+
+import java.util.List;
+
+import org.apache.s4.base.Event;
+
+public class UserEvent extends Event {
+
+ private String userID;
+ private int age;
+ private char gender;
+ private List<String> favorites;
+
+ public UserEvent() {
+
+ }
+
+ UserEvent(String userID, int age, List<String> favorites, char gender) {
+ this.userID = userID;
+ this.age = age;
+ this.favorites = favorites;
+ this.gender = gender;
+ }
+
+ /**
+ * @return the userID
+ */
+ public String getUserID() {
+ return userID;
+ }
+
+ /**
+ * @return the age
+ */
+ public int getAge() {
+ return age;
+ }
+
+ /**
+ * @return the favorites
+ */
+ public List<String> getFavorites() {
+ return favorites;
+ }
+
+ /**
+ * @return the gender
+ */
+ public char getGender() {
+ return gender;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/f617192e/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserIDKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserIDKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserIDKeyFinder.java
new file mode 100644
index 0000000..0fcb24c
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserIDKeyFinder.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.fluent.counter;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.core.KeyFinder;
+
+public class UserIDKeyFinder implements KeyFinder<UserEvent> {
+
+ public List<String> get(UserEvent event) {
+
+ List<String> results = new ArrayList<String>();
+
+ /* Retrieve the user ID and add it to the list. */
+ results.add(event.getUserID());
+
+ return results;
+ }
+}