You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by le...@apache.org on 2012/02/23 00:22:31 UTC
[4/7] Committing the first working version of the S4 embedded
domain-specific language.
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/HeightKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/HeightKeyFinder.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/HeightKeyFinder.java
new file mode 100644
index 0000000..4929344
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/HeightKeyFinder.java
@@ -0,0 +1,19 @@
+package org.apache.s4.edsl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.base.KeyFinder;
+
+public class HeightKeyFinder implements KeyFinder<EventA> {
+
+ public List<String> get(EventA event) {
+
+ List<String> results = new ArrayList<String>();
+
+ /* Retrieve the gender and add it to the list. */
+ results.add(Integer.toString(event.getHeight()));
+
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/Module.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/Module.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/Module.java
new file mode 100644
index 0000000..84d1630
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/Module.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.edsl;
+
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+/**
+ * Configures the controller.
+ *
+ * Reads a properties file, provides a {@link Communicator} singleton, and configures Guice bindings.
+ *
+ * @author Leo Neumeyer
+ */
+public class Module extends AbstractModule {
+
+ protected PropertiesConfiguration config = null;
+
+ private void loadProperties(Binder binder) {
+
+ try {
+ InputStream is = this.getClass().getResourceAsStream("/s4-counter-example.properties");
+ config = new PropertiesConfiguration();
+ config.load(is);
+
+ System.out.println(ConfigurationUtils.toString(config));
+ // TODO - validate properties.
+
+ /* Make all properties injectable. Do we need this? */
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ protected void configure() {
+ if (config == null)
+ loadProperties(binder());
+
+ bind(Cluster.class);
+
+ /* Configure static assignment using a configuration file. */
+ bind(Assignment.class).to(AssignmentFromFile.class);
+
+ /* Configure a static cluster topology using a configuration file. */
+ bind(Topology.class).to(TopologyFromFile.class);
+
+ // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
+ // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
+ //
+ // bind(Emitter.class).to(QueueingEmitter.class);
+ // bind(Listener.class).to(QueueingListener.class);
+
+ /* Use the Netty comm layer implementation. */
+ // bind(Emitter.class).to(NettyEmitter.class);
+ // bind(Listener.class).to(NettyListener.class);
+
+ /* Use a simple UDP comm layer implementation. */
+ bind(Emitter.class).to(UDPEmitter.class);
+ bind(Listener.class).to(UDPListener.class);
+
+ /* The hashing function to map keys top partitions. */
+ bind(Hasher.class).to(DefaultHasher.class);
+
+ /* Use Kryo to serialize events. */
+ bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/MyApp.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/MyApp.java
new file mode 100644
index 0000000..6e4a267
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/MyApp.java
@@ -0,0 +1,37 @@
+package org.apache.s4.edsl;
+
+import java.util.concurrent.TimeUnit;
+
+public class MyApp extends BuilderS4DSL {
+
+ @Override
+ public void onInit() {
+
+ pe("PEZ").type(PEZ.class).fireOn(EventA.class).afterInterval(5, TimeUnit.SECONDS).cache().size(1000)
+ .expires(3, TimeUnit.HOURS).emit(EventB.class).to("PEX").
+
+ pe("PEY").type(PEY.class).prop("duration", "4").prop("height", "99").timer()
+ .withPeriod(2, TimeUnit.MINUTES).emit(EventA.class).onField("stream3")
+ .withKeyFinder(new DurationKeyFinder()).to("PEZ").emit(EventA.class).onField("heightpez")
+ .withKeyFinder(new HeightKeyFinder()).to("PEZ").
+
+ pe("PEX").type(PEX.class).prop("query", "money").cache().size(100).expires(1, TimeUnit.MINUTES)
+ .asSingleton().emit(EventB.class).withKeyFinder(new QueryKeyFinder()).to("PEY").to("PEZ").
+
+ build();
+ }
+
+ // Make hooks public for testing. Normally this is handled by the container.
+ public void init() {
+ super.init();
+ }
+
+ public void start() {
+ super.start();
+ }
+
+ public void close() {
+ super.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEX.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEX.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEX.java
new file mode 100644
index 0000000..c142e03
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEX.java
@@ -0,0 +1,58 @@
+package org.apache.s4.edsl;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+
+public class PEX extends ProcessingElement {
+
+ private String query;
+ private Stream<EventB>[] someStream;
+ @SuppressWarnings("unused")
+ private Stream<EventA>[] streams;
+
+ public PEX(App app) {
+ super(app);
+ }
+
+ @Override
+ public void onCreate() {
+
+ }
+
+ @Override
+ public void onRemove() {
+
+ }
+
+ /**
+ * @return the keyword
+ */
+ String getKeyword() {
+ return query;
+ }
+
+ /**
+ * @param query
+ * the keyword to set
+ */
+ void setKeyword(String query) {
+ this.query = query;
+ }
+
+ /**
+ * @return the someStream
+ */
+ public Stream<EventB>[] getSomeStream() {
+ return someStream;
+ }
+
+ /**
+ * @param someStream
+ * the someStream to set
+ */
+ public void setSomeStream(Stream<EventB>[] someStream) {
+ this.someStream = someStream;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEY.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEY.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEY.java
new file mode 100644
index 0000000..ce9dbc7
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEY.java
@@ -0,0 +1,77 @@
+package org.apache.s4.edsl;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+
+public class PEY extends ProcessingElement {
+
+ private Stream<EventA>[] stream3;
+ @SuppressWarnings("unused")
+ private Stream<EventA>[] heightpez;
+
+ private int height;
+ private long duration;
+
+ public PEY(App app) {
+ super(app);
+ }
+
+ @Override
+ public void onCreate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void onRemove() {
+ // TODO Auto-generated method stub
+
+ }
+
+ /**
+ * @return the stream3
+ */
+ Stream<EventA>[] getStream3() {
+ return stream3;
+ }
+
+ /**
+ * @param stream3
+ * the stream3 to set
+ */
+ void setStream3(Stream<EventA>[] stream3) {
+ this.stream3 = stream3;
+ }
+
+ /**
+ * @return the height
+ */
+ int getHeight() {
+ return height;
+ }
+
+ /**
+ * @param height
+ * the height to set
+ */
+ void setHeight(int height) {
+ this.height = height;
+ }
+
+ /**
+ * @return the duration
+ */
+ long getDuration() {
+ return duration;
+ }
+
+ /**
+ * @param duration
+ * the duration to set
+ */
+ void setDuration(long duration) {
+ this.duration = duration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEZ.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEZ.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEZ.java
new file mode 100644
index 0000000..d494ae2
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/PEZ.java
@@ -0,0 +1,58 @@
+package org.apache.s4.edsl;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+
+public class PEZ extends ProcessingElement {
+
+ private Stream<EventA>[] stream1;
+ private Stream<EventB>[] stream2;
+
+ public PEZ(App app) {
+ super(app);
+ }
+
+ /**
+ * @return the stream1
+ */
+ Stream<EventA>[] getStream1() {
+ return stream1;
+ }
+
+ /**
+ * @param stream1
+ * the stream1 to set
+ */
+ void setStream1(Stream<EventA>[] stream1) {
+ this.stream1 = stream1;
+ }
+
+ /**
+ * @return the stream2
+ */
+ Stream<EventB>[] getStream2() {
+ return stream2;
+ }
+
+ /**
+ * @param stream2
+ * the stream2 to set
+ */
+ void setStream2(Stream<EventB>[] stream2) {
+ this.stream2 = stream2;
+ }
+
+ @Override
+ public void onCreate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void onRemove() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/QueryKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/QueryKeyFinder.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/QueryKeyFinder.java
new file mode 100644
index 0000000..1338432
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/QueryKeyFinder.java
@@ -0,0 +1,19 @@
+package org.apache.s4.edsl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.base.KeyFinder;
+
+public class QueryKeyFinder implements KeyFinder<EventB> {
+
+ public List<String> get(EventB event) {
+
+ List<String> results = new ArrayList<String>();
+
+ /* Retrieve the gender and add it to the list. */
+ results.add(event.getQuery());
+
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
new file mode 100644
index 0000000..dbff5f5
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
@@ -0,0 +1,38 @@
+package org.apache.s4.edsl;
+
+import java.lang.reflect.Field;
+
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TestEDSL {
+
+ @Test
+ public void test() throws Exception {
+ Injector injector = Guice.createInjector(new Module());
+ MyApp myApp = injector.getInstance(MyApp.class);
+
+ /* Normally. the container will handle this but this is just a test. */
+ myApp.init();
+ myApp.start();
+ myApp.close();
+ }
+
+ @Test
+ public void testReflection() {
+
+ try {
+ Class<?> c = PEY.class;
+ Field f = c.getDeclaredField("duration");
+ System.out.format("Type: %s%n", f.getType());
+ System.out.format("GenericType: %s%n", f.getGenericType());
+
+ // production code should handle these exceptions more gracefully
+ } catch (NoSuchFieldException x) {
+ x.printStackTrace();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/s4-example.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/s4-example.gradle b/subprojects/s4-example/s4-example.gradle
index bbeb11b..95f3a97 100644
--- a/subprojects/s4-example/s4-example.gradle
+++ b/subprojects/s4-example/s4-example.gradle
@@ -20,6 +20,7 @@ dependencies {
compile project( ":s4-base" )
compile project( ":s4-core" )
compile project( ":s4-comm" )
+ compile project( ":s4-edsl" )
compile libraries.ejml
compile libraries.junit
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/AgeKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/AgeKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/AgeKeyFinder.java
new file mode 100644
index 0000000..f4d894c
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/AgeKeyFinder.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.edsl.counter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.base.KeyFinder;
+
+public class AgeKeyFinder implements KeyFinder<UserEvent> {
+
+ public List<String> get(UserEvent event) {
+
+ List<String> results = new ArrayList<String>();
+
+ /* Retrieve the age and add it to the list. */
+ results.add(Integer.toString(event.getAge()));
+
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CountEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CountEvent.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CountEvent.java
new file mode 100644
index 0000000..ac8e8ac
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CountEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.edsl.counter;
+
+import org.apache.s4.base.Event;
+
+public class CountEvent extends Event {
+
+ private String key;
+ private long count;
+
+ public CountEvent() {
+
+ }
+
+ CountEvent(String key, long count) {
+ this.key = key;
+ this.count = count;
+ }
+
+ CountEvent(String key, long count, long time) {
+ super(time);
+ this.key = key;
+ this.count = count;
+ }
+
+
+ /**
+ * @return the key
+ */
+ public String getKey() {
+ return key;
+ }
+
+ /**
+ * @return the count
+ */
+ public long getCount() {
+ return count;
+ }
+
+ public String toString() {
+ return String.format("Key: " + key + ", Count: %08d", count);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CountKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CountKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CountKeyFinder.java
new file mode 100644
index 0000000..4fd5eb8
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CountKeyFinder.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.edsl.counter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.base.KeyFinder;
+
+public class CountKeyFinder implements KeyFinder<CountEvent> {
+
+ public List<String> get(CountEvent event) {
+
+ List<String> results = new ArrayList<String>();
+
+ /* Retrieve the user ID and add it to the list. */
+ results.add(event.getKey());
+
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterApp.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterApp.java
new file mode 100644
index 0000000..7012729
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterApp.java
@@ -0,0 +1,80 @@
+package org.apache.s4.example.edsl.counter;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.edsl.BuilderS4DSL;
+
+/**
+ * This is a sample application to test the S4 embedded domain-specific language (EDSL).
+ *
+ * <p>
+ * Grammar:
+ *
+ * <pre>
+ * (pe , type , prop* , (fireOn , afterInterval? , afterNumEvents?)? , (timer, withPeriod)? ,
+ * (cache, size , expires? )? , asSingleton? , (emit, onField?,
+ * (withKey|withKeyFinder)?, to+ )* )+ , build
+ * </pre>
+ *
+ * <p>
+ * See the <a href="http://code.google.com/p/diezel">Diezel</a> project for details.
+ *
+ */
+final public class CounterApp extends BuilderS4DSL {
+
+ @Override
+ protected void onInit() {
+
+ pe("Print").type(PrintPE.class).asSingleton().
+
+ pe("User Count").type(CounterPE.class).fireOn(Event.class).afterInterval(100, TimeUnit.MILLISECONDS)
+ .emit(CountEvent.class).withKeyFinder(new CountKeyFinder()).to("Print").
+
+ pe("Gender Count").type(CounterPE.class).fireOn(Event.class).afterInterval(100, TimeUnit.MILLISECONDS)
+ .emit(CountEvent.class).withKeyFinder(new CountKeyFinder()).to("Print").
+
+ pe("Age Count").type(CounterPE.class).fireOn(Event.class).afterInterval(100, TimeUnit.MILLISECONDS)
+ .emit(CountEvent.class).withKeyFinder(new CountKeyFinder()).to("Print").
+
+ pe("Generate User Event").type(GenerateUserEventPE.class).timer().withPeriod(1, TimeUnit.MILLISECONDS)
+ .asSingleton().
+
+ emit(UserEvent.class).withKeyFinder(new UserIDKeyFinder()).to("User Count").
+
+ emit(UserEvent.class).withKey("gender").to("Gender Count").
+
+ emit(UserEvent.class).withKeyFinder(new AgeKeyFinder()).to("Age Count").
+
+ build();
+ }
+
+ /*
+ * Create and send 200 dummy events of type UserEvent.
+ *
+ * @see io.s4.App#start()
+ */
+ @Override
+ protected void onStart() {
+
+ }
+
+ @Override
+ protected void onClose() {
+ System.out.println("Bye.");
+ }
+
+ // Make hooks public for testing. Normally this is handled by the container.
+ public void init() {
+ super.init();
+ }
+
+ public void start() {
+ super.start();
+ }
+
+ public void close() {
+ super.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterPE.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterPE.java
new file mode 100644
index 0000000..33e383b
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterPE.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.edsl.counter;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CounterPE extends ProcessingElement {
+
+ private static final Logger logger = LoggerFactory.getLogger(CounterPE.class);
+
+ private Stream<CountEvent>[] countStream = null;
+
+ public CounterPE(App app) {
+ super(app);
+ }
+
+ /**
+ * @return the countStream
+ */
+ public Stream<CountEvent>[] getCountStream() {
+ return countStream;
+ }
+
+ /**
+ * @param countStream
+ * the countStream to set
+ */
+ public void setCountStream(Stream<CountEvent>[] countStream) {
+ this.countStream = countStream;
+ }
+
+ private long counter = 0;
+
+ public void onEvent(Event event) {
+
+ counter += 1;
+ logger.trace("PE with id [{}] incremented counter to [{}].", getId(), counter);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see io.s4.ProcessingElement#sendOutputEvent()
+ */
+ public void onTrigger(Event event) {
+
+ logger.trace("Sending count event for PE id [{}] with count [{}].", getId(), counter);
+ CountEvent countEvent = new CountEvent(getId(), counter);
+ emit(countEvent, countStream);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see io.s4.ProcessingElement#init()
+ */
+ @Override
+ public void onCreate() {
+
+ }
+
+ @Override
+ public void onRemove() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/GenderKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/GenderKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/GenderKeyFinder.java
new file mode 100644
index 0000000..708f862
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/GenderKeyFinder.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.edsl.counter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.base.KeyFinder;
+
+public class GenderKeyFinder implements KeyFinder<UserEvent> {
+
+ public List<String> get(UserEvent event) {
+
+ List<String> results = new ArrayList<String>();
+
+ /* Retrieve the gender and add it to the list. */
+ results.add(Character.toString(event.getGender()));
+
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/GenerateUserEventPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/GenerateUserEventPE.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/GenerateUserEventPE.java
new file mode 100644
index 0000000..a68a048
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/GenerateUserEventPE.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.edsl.counter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenerateUserEventPE extends ProcessingElement {
+
+ private static final Logger logger = LoggerFactory.getLogger(GenerateUserEventPE.class);
+
+ static String userIds[] = { "pepe", "jose", "tito", "mr_smith", "joe" };
+ static int[] ages = { 25, 2, 33, 6, 67 };
+ static char[] genders = { 'f', 'm' };
+ private Stream<UserEvent>[] targetStreams;
+ final private Random generator = new Random(22);
+
+ public GenerateUserEventPE(App app) {
+ super(app);
+ }
+
+ /**
+ * @param targetStreams
+ * the {@link UserEvent} streams.
+ */
+ public void setStreams(Stream<UserEvent>... targetStreams) {
+ this.targetStreams = targetStreams;
+ }
+
+ protected void onTime() {
+ List<String> favorites = new ArrayList<String>();
+ favorites.add("dulce de leche");
+ favorites.add("strawberry");
+
+ int indexUserID = generator.nextInt(userIds.length);
+ int indexAge = generator.nextInt(ages.length);
+ int indexGender = generator.nextInt(2);
+
+ UserEvent userEvent = new UserEvent(userIds[indexUserID], ages[indexAge], favorites, genders[indexGender]);
+ logger.trace("Sending userID: [{}], age: [{}].", userIds[indexUserID], ages[indexAge]);
+ emit(userEvent, targetStreams);
+ }
+
+ @Override
+ protected void onRemove() {
+ }
+
+ static int pickRandom(int numElements) {
+ return 0;
+ }
+
+ @Override
+ protected void onCreate() {
+ // TODO Auto-generated method stub
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/Module.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/Module.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/Module.java
new file mode 100644
index 0000000..b7afc4e
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/Module.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.edsl.counter;
+
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+/**
+ * Configures the controller.
+ *
+ * Reads a properties file, provides a {@link Communicator} singleton, and configures Guice bindings.
+ *
+ * @author Leo Neumeyer
+ */
+public class Module extends AbstractModule {
+
+ protected PropertiesConfiguration config = null;
+
+ private void loadProperties(Binder binder) {
+
+ try {
+ InputStream is = this.getClass().getResourceAsStream("/s4-counter-example.properties");
+ config = new PropertiesConfiguration();
+ config.load(is);
+
+ System.out.println(ConfigurationUtils.toString(config));
+ // TODO - validate properties.
+
+ /* Make all properties injectable. Do we need this? */
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ protected void configure() {
+ if (config == null)
+ loadProperties(binder());
+
+ bind(Cluster.class);
+
+ /* Configure static assignment using a configuration file. */
+ bind(Assignment.class).to(AssignmentFromFile.class);
+
+ /* Configure a static cluster topology using a configuration file. */
+ bind(Topology.class).to(TopologyFromFile.class);
+
+ // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
+ // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
+ //
+ // bind(Emitter.class).to(QueueingEmitter.class);
+ // bind(Listener.class).to(QueueingListener.class);
+
+ /* Use the Netty comm layer implementation. */
+ // bind(Emitter.class).to(NettyEmitter.class);
+ // bind(Listener.class).to(NettyListener.class);
+
+ /* Use a simple UDP comm layer implementation. */
+ bind(Emitter.class).to(UDPEmitter.class);
+ bind(Listener.class).to(UDPListener.class);
+
+ /* The hashing function to map keys top partitions. */
+ bind(Hasher.class).to(DefaultHasher.class);
+
+ /* Use Kryo to serialize events. */
+ bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/PrintPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/PrintPE.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/PrintPE.java
new file mode 100644
index 0000000..bf841f4
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/PrintPE.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.edsl.counter;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PrintPE extends ProcessingElement {
+
+ private static final Logger logger = LoggerFactory.getLogger(PrintPE.class);
+
+ public PrintPE(App app) {
+ super(app);
+ }
+
+ public void onEvent(Event event) {
+
+ logger.info(">>> [{}].", event.toString());
+ }
+
+ @Override
+ protected void onCreate() {
+ }
+
+ @Override
+ protected void onRemove() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/TestCounterApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/TestCounterApp.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/TestCounterApp.java
new file mode 100644
index 0000000..29e086b
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/TestCounterApp.java
@@ -0,0 +1,26 @@
+package org.apache.s4.example.edsl.counter;
+
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TestCounterApp {
+
+ @Test
+ public void test() throws Exception {
+ Injector injector = Guice.createInjector(new Module());
+ CounterApp myApp = injector.getInstance(CounterApp.class);
+
+ /* Normally. the container will handle this but this is just a test. */
+ myApp.init();
+ myApp.start();
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ myApp.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/UserEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/UserEvent.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/UserEvent.java
new file mode 100644
index 0000000..f2e3bc6
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/UserEvent.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.edsl.counter;
+
+
+import java.util.List;
+
+import org.apache.s4.base.Event;
+
+public class UserEvent extends Event {
+
+ private String userID;
+ private int age;
+ private char gender;
+ private List<String> favorites;
+
+ public UserEvent() {
+
+ }
+
+ UserEvent(String userID, int age, List<String> favorites, char gender) {
+ this.userID = userID;
+ this.age = age;
+ this.favorites = favorites;
+ this.gender = gender;
+ }
+
+ /**
+ * @return the userID
+ */
+ public String getUserID() {
+ return userID;
+ }
+
+ /**
+ * @return the age
+ */
+ public int getAge() {
+ return age;
+ }
+
+ /**
+ * @return the favorites
+ */
+ public List<String> getFavorites() {
+ return favorites;
+ }
+
+ /**
+ * @return the gender
+ */
+ public char getGender() {
+ return gender;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/UserIDKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/UserIDKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/UserIDKeyFinder.java
new file mode 100644
index 0000000..3fded2a
--- /dev/null
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/UserIDKeyFinder.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.example.edsl.counter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.base.KeyFinder;
+
+public class UserIDKeyFinder implements KeyFinder<UserEvent> {
+
+ public List<String> get(UserEvent event) {
+
+ List<String> results = new ArrayList<String>();
+
+ /* Retrieve the user ID and add it to the list. */
+ results.add(event.getUserID());
+
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/AgeKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/AgeKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/AgeKeyFinder.java
deleted file mode 100644
index f8ee411..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/AgeKeyFinder.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.example.fluent.counter;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.s4.base.KeyFinder;
-
-public class AgeKeyFinder implements KeyFinder<UserEvent> {
-
- public List<String> get(UserEvent event) {
-
- List<String> results = new ArrayList<String>();
-
- /* Retrieve the age and add it to the list. */
- results.add(Integer.toString(event.getAge()));
-
- return results;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountEvent.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountEvent.java
deleted file mode 100644
index eea336f..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountEvent.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.example.fluent.counter;
-
-import org.apache.s4.base.Event;
-
-public class CountEvent extends Event {
-
- private String key;
- private long count;
-
- public CountEvent() {
-
- }
-
- CountEvent(String key, long count) {
- this.key = key;
- this.count = count;
- }
-
- CountEvent(String key, long count, long time) {
- super(time);
- this.key = key;
- this.count = count;
- }
-
-
- /**
- * @return the key
- */
- public String getKey() {
- return key;
- }
-
- /**
- * @return the count
- */
- public long getCount() {
- return count;
- }
-
- public String toString() {
- return String.format("Key: " + key + ", Count: %08d", count);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountKeyFinder.java
deleted file mode 100644
index 8991ddd..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CountKeyFinder.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.example.fluent.counter;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.s4.base.KeyFinder;
-
-public class CountKeyFinder implements KeyFinder<CountEvent> {
-
- public List<String> get(CountEvent event) {
-
- List<String> results = new ArrayList<String>();
-
- /* Retrieve the user ID and add it to the list. */
- results.add(event.getKey());
-
- return results;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CounterPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CounterPE.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CounterPE.java
deleted file mode 100644
index 87f1a6f..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/CounterPE.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.example.fluent.counter;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Stream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CounterPE extends ProcessingElement {
-
- private static final Logger logger = LoggerFactory.getLogger(CounterPE.class);
-
- private Stream<CountEvent>[] countStream = null;
-
- public CounterPE(App app) {
- super(app);
- }
-
- /**
- * @return the countStream
- */
- public Stream<CountEvent>[] getCountStream() {
- return countStream;
- }
-
- /**
- * @param countStream
- * the countStream to set
- */
- public void setCountStream(Stream<CountEvent>[] countStream) {
- this.countStream = countStream;
- }
-
- private long counter = 0;
-
- public void onEvent(Event event) {
-
- counter += 1;
- logger.trace("PE with id [{}] incremented counter to [{}].", getId(), counter);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see io.s4.ProcessingElement#sendOutputEvent()
- */
- public void onTrigger(Event event) {
-
- logger.trace("Sending count event for PE id [{}] with count [{}].", getId(), counter);
- CountEvent countEvent = new CountEvent(getId(), counter);
- emit(countEvent, countStream);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see io.s4.ProcessingElement#init()
- */
- @Override
- public void onCreate() {
-
- }
-
- @Override
- public void onRemove() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenderKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenderKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenderKeyFinder.java
deleted file mode 100644
index 06875a6..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenderKeyFinder.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.example.fluent.counter;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.s4.base.KeyFinder;
-
-public class GenderKeyFinder implements KeyFinder<UserEvent> {
-
- public List<String> get(UserEvent event) {
-
- List<String> results = new ArrayList<String>();
-
- /* Retrieve the gender and add it to the list. */
- results.add(Character.toString(event.getGender()));
-
- return results;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenerateUserEventPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenerateUserEventPE.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenerateUserEventPE.java
deleted file mode 100644
index 1d53355..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/GenerateUserEventPE.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.example.fluent.counter;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Stream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class GenerateUserEventPE extends ProcessingElement {
-
- private static final Logger logger = LoggerFactory.getLogger(GenerateUserEventPE.class);
-
- static String userIds[] = { "pepe", "jose", "tito", "mr_smith", "joe" };
- static int[] ages = { 25, 2, 33, 6, 67 };
- static char[] genders = { 'f', 'm' };
- private Stream<UserEvent>[] targetStreams;
- final private Random generator = new Random(22);
-
- public GenerateUserEventPE(App app) {
- super(app);
- }
-
- /**
- * @param targetStreams
- * the {@link UserEvent} streams.
- */
- public void setStreams(Stream<UserEvent>... targetStreams) {
- this.targetStreams = targetStreams;
- }
-
- protected void onTime() {
- List<String> favorites = new ArrayList<String>();
- favorites.add("dulce de leche");
- favorites.add("strawberry");
-
- int indexUserID = generator.nextInt(userIds.length);
- int indexAge = generator.nextInt(ages.length);
- int indexGender = generator.nextInt(2);
-
- UserEvent userEvent = new UserEvent(userIds[indexUserID], ages[indexAge], favorites, genders[indexGender]);
- logger.trace("Sending userID: [{}], age: [{}].", userIds[indexUserID], ages[indexAge]);
- emit(userEvent, targetStreams);
- }
-
- @Override
- protected void onRemove() {
- }
-
- static int pickRandom(int numElements) {
- return 0;
- }
-
- @Override
- protected void onCreate() {
- // TODO Auto-generated method stub
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java
deleted file mode 100644
index 7c05836..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Main.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.example.fluent.counter;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.Receiver;
-import org.apache.s4.core.Sender;
-import org.apache.s4.fluent.AppMaker;
-import org.apache.s4.fluent.FluentApp;
-import org.apache.s4.fluent.PEMaker;
-import org.junit.Test;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
-/*
- * This is a sample application to test a new S4 API.
- * See README file for details.
- *
- * */
-
-final public class Main extends AppMaker {
-
- public Main() {
- super();
- }
-
- private PEMaker generateUserEventPE;
-
- /*
- *
- *
- * The application graph itself is created in this Class. However, developers may provide tools for creating apps
- * which will generate the objects.
- *
- * IMPORTANT: we create a graph of PE prototypes. The prototype is a class instance that is used as a prototype from
- * which all PE instance will be created. The prototype itself is not used as an instance. (Except when the PE is of
- * type Singleton PE). To create a data structure for each PE instance you must do it in the method
- * ProcessingElement.onCreate().
- */
-
- /*
- * Build the application graph using POJOs. Don't like it? Write a nice tool.
- *
- * @see io.s4.App#init()
- */
- @Override
- public void configure() {
-
- /* PE that prints counts to console. */
- PEMaker printPE = addPE(PrintPE.class);
-
- /* PEs that count events by user, gender, and age. */
- PEMaker userCountPE = addPE(CounterPE.class);
- userCountPE.addTrigger().fireOn(Event.class).ifInterval(100l, TimeUnit.MILLISECONDS);
-
- PEMaker genderCountPE = addPE(CounterPE.class);
- genderCountPE.addTrigger().fireOn(Event.class).ifInterval(100l, TimeUnit.MILLISECONDS);
-
- PEMaker ageCountPE = addPE(CounterPE.class);
- ageCountPE.addTrigger().fireOn(Event.class).ifInterval(100l, TimeUnit.MILLISECONDS);
-
- generateUserEventPE = addPE(GenerateUserEventPE.class).asSingleton();
- generateUserEventPE.addTimer().withDuration(1, TimeUnit.MILLISECONDS);
-
- ageCountPE.emit(CountEvent.class).onKey(new CountKeyFinder()).to(printPE);
- genderCountPE.emit(CountEvent.class).onKey(new CountKeyFinder()).to(printPE);
- userCountPE.emit(CountEvent.class).onKey(new CountKeyFinder()).to(printPE);
-
- generateUserEventPE.emit(UserEvent.class).onKey(new AgeKeyFinder()).to(ageCountPE);
- generateUserEventPE.emit(UserEvent.class).onKey("gender").to(genderCountPE);
- generateUserEventPE.emit(UserEvent.class).onKey(new UserIDKeyFinder()).to(userCountPE);
- }
-
- /*
- * Create and send 200 dummy events of type UserEvent.
- *
- * @see io.s4.App#start()
- */
- @Override
- public void start() {
-
- }
-
- @Override
- public void close() {
- System.out.println("Bye.");
-
- }
-
- // public static void main(String[] args) {
- @Test
- public void test() throws Exception {
-
- Injector injector = Guice.createInjector(new Module());
- FluentApp myApp = injector.getInstance(FluentApp.class);
- Sender sender = injector.getInstance(Sender.class);
- Receiver receiver = injector.getInstance(Receiver.class);
- myApp.setCommLayer(sender, receiver);
-
- /* Normally. the container will handle this but this is just a test. */
- myApp.init();
- myApp.start();
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- myApp.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
deleted file mode 100644
index a8f034b..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.example.fluent.counter;
-
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-import org.apache.s4.fluent.AppMaker;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-/**
- * Configures the controller.
- *
- * Reads a properties file, provides a {@link Communicator} singleton, and configures Guice bindings.
- *
- * @author Leo Neumeyer
- */
-public class Module extends AbstractModule {
-
- protected PropertiesConfiguration config = null;
-
- private void loadProperties(Binder binder) {
-
- try {
- InputStream is = this.getClass().getResourceAsStream("/s4-counter-example.properties");
- config = new PropertiesConfiguration();
- config.load(is);
-
- System.out.println(ConfigurationUtils.toString(config));
- // TODO - validate properties.
-
- /* Make all properties injectable. Do we need this? */
- Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
-
- @Override
- protected void configure() {
- if (config == null)
- loadProperties(binder());
-
- bind(AppMaker.class).to(Main.class);
-
- bind(Cluster.class);
-
- /* Configure static assignment using a configuration file. */
- bind(Assignment.class).to(AssignmentFromFile.class);
-
- /* Configure a static cluster topology using a configuration file. */
- bind(Topology.class).to(TopologyFromFile.class);
-
- // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
- // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
- //
- // bind(Emitter.class).to(QueueingEmitter.class);
- // bind(Listener.class).to(QueueingListener.class);
-
- /* Use the Netty comm layer implementation. */
- // bind(Emitter.class).to(NettyEmitter.class);
- // bind(Listener.class).to(NettyListener.class);
-
- /* Use a simple UDP comm layer implementation. */
- bind(Emitter.class).to(UDPEmitter.class);
- bind(Listener.class).to(UDPListener.class);
-
- /* The hashing function to map keys top partitions. */
- bind(Hasher.class).to(DefaultHasher.class);
-
- /* Use Kryo to serialize events. */
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/PrintPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/PrintPE.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/PrintPE.java
deleted file mode 100644
index 1a5b2de..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/PrintPE.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.example.fluent.counter;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PrintPE extends ProcessingElement {
-
- private static final Logger logger = LoggerFactory.getLogger(PrintPE.class);
-
- public PrintPE(App app) {
- super(app);
- }
-
- public void onEvent(Event event) {
-
- logger.info(">>> [{}].", event.toString());
- }
-
- @Override
- protected void onCreate() {
- }
-
- @Override
- protected void onRemove() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/README.md
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/README.md b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/README.md
deleted file mode 100644
index 72e0a3d..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/README.md
+++ /dev/null
@@ -1,17 +0,0 @@
-S4 Counter Example
-==================
-
-In this example we do the following:
-
-- Generate dummy events (UserEvent).
-- Key events by user, gender, age.
-- Count by user, gender, age.
-- Print partial counts.
-
-The following diagram shows the application graph:
-
-![S4 Counter](https://github.com/leoneu/s4-piper/raw/master/etc/s4-counter-example.png)
-
-In in following diagram I show how Classes, PE Prototypes, PE instances, and Streams are related.
-
-![S4 Objects](https://github.com/leoneu/s4-piper/raw/master/etc/s4-objects-example.png)
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserEvent.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserEvent.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserEvent.java
deleted file mode 100644
index b33adc0..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserEvent.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.example.fluent.counter;
-
-
-import java.util.List;
-
-import org.apache.s4.base.Event;
-
-public class UserEvent extends Event {
-
- private String userID;
- private int age;
- private char gender;
- private List<String> favorites;
-
- public UserEvent() {
-
- }
-
- UserEvent(String userID, int age, List<String> favorites, char gender) {
- this.userID = userID;
- this.age = age;
- this.favorites = favorites;
- this.gender = gender;
- }
-
- /**
- * @return the userID
- */
- public String getUserID() {
- return userID;
- }
-
- /**
- * @return the age
- */
- public int getAge() {
- return age;
- }
-
- /**
- * @return the favorites
- */
- public List<String> getFavorites() {
- return favorites;
- }
-
- /**
- * @return the gender
- */
- public char getGender() {
- return gender;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserIDKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserIDKeyFinder.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserIDKeyFinder.java
deleted file mode 100644
index dd81715..0000000
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/UserIDKeyFinder.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.example.fluent.counter;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.s4.base.KeyFinder;
-
-public class UserIDKeyFinder implements KeyFinder<UserEvent> {
-
- public List<String> get(UserEvent event) {
-
- List<String> results = new ArrayList<String>();
-
- /* Retrieve the user ID and add it to the list. */
- results.add(event.getUserID());
-
- return results;
- }
-}