You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by le...@apache.org on 2012/02/14 19:36:13 UTC
[1/4] git commit: Committing the first working version of the S4
embedded domain-specific language.
Updated Branches:
refs/heads/s4-5 [created] 6e9736673
Committing the first working version of the S4 embedded domain-specific language.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/6e973667
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/6e973667
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/6e973667
Branch: refs/heads/s4-5
Commit: 6e97366735b3363a2d56dbf6b316681d4d0a928a
Parents: 7c2688f
Author: Leo Neumeyer <le...@apache.org>
Authored: Tue Feb 14 11:35:35 2012 -0800
Committer: Leo Neumeyer <le...@apache.org>
Committed: Tue Feb 14 11:35:35 2012 -0800
----------------------------------------------------------------------
build.gradle | 3 +-
.../src/main/java/org/apache/s4/core/App.java | 38 ++-
.../java/org/apache/s4/core/ProcessingElement.java | 58 ++-
.../src/main/java/org/apache/s4/core/Stream.java | 6 +
.../main/java/org/apache/s4/fluent/AppMaker.java | 418 ---------------
.../main/java/org/apache/s4/fluent/FluentApp.java | 49 --
.../main/java/org/apache/s4/fluent/PEMaker.java | 257 ---------
.../java/org/apache/s4/fluent/StreamMaker.java | 168 ------
.../java/org/apache/s4/fluent/AppMakerTest.java | 34 --
.../org/apache/s4/fluent/DurationKeyFinder.java | 19 -
.../src/test/java/org/apache/s4/fluent/EventA.java | 39 --
.../src/test/java/org/apache/s4/fluent/EventB.java | 24 -
.../java/org/apache/s4/fluent/HeightKeyFinder.java | 19 -
.../src/test/java/org/apache/s4/fluent/MyApp.java | 41 --
.../src/test/java/org/apache/s4/fluent/PEX.java | 58 --
.../src/test/java/org/apache/s4/fluent/PEY.java | 77 ---
.../src/test/java/org/apache/s4/fluent/PEZ.java | 58 --
.../java/org/apache/s4/fluent/QueryKeyFinder.java | 19 -
subprojects/s4-edsl/s4-edsl.gradle | 9 +
subprojects/s4-edsl/src/main/diezel/s4/s4-impl.xml | 72 ++-
subprojects/s4-edsl/src/main/diezel/s4/s4.xml | 46 +-
subprojects/s4-edsl/src/main/java/Main.java | 15 -
.../main/java/org/apache/s4/edsl/AppBuilder.java | 330 ++++++++++++
.../java/org/apache/s4/edsl/StreamBuilder.java | 75 +++
.../java/org/apache/s4/edsl/DurationKeyFinder.java | 19 +
.../src/test/java/org/apache/s4/edsl/EventA.java | 40 ++
.../src/test/java/org/apache/s4/edsl/EventB.java | 24 +
.../java/org/apache/s4/edsl/HeightKeyFinder.java | 19 +
.../src/test/java/org/apache/s4/edsl/Module.java | 105 ++++
.../src/test/java/org/apache/s4/edsl/MyApp.java | 37 ++
.../src/test/java/org/apache/s4/edsl/PEX.java | 58 ++
.../src/test/java/org/apache/s4/edsl/PEY.java | 77 +++
.../src/test/java/org/apache/s4/edsl/PEZ.java | 58 ++
.../java/org/apache/s4/edsl/QueryKeyFinder.java | 19 +
.../src/test/java/org/apache/s4/edsl/TestEDSL.java | 38 ++
subprojects/s4-example/s4-example.gradle | 1 +
.../s4/example/edsl/counter/AgeKeyFinder.java | 34 ++
.../apache/s4/example/edsl/counter/CountEvent.java | 58 ++
.../s4/example/edsl/counter/CountKeyFinder.java | 34 ++
.../apache/s4/example/edsl/counter/CounterApp.java | 80 +++
.../apache/s4/example/edsl/counter/CounterPE.java | 84 +++
.../s4/example/edsl/counter/GenderKeyFinder.java | 34 ++
.../example/edsl/counter/GenerateUserEventPE.java | 77 +++
.../org/apache/s4/example/edsl/counter/Module.java | 105 ++++
.../apache/s4/example/edsl/counter/PrintPE.java | 44 ++
.../s4/example/edsl/counter/TestCounterApp.java | 26 +
.../apache/s4/example/edsl/counter/UserEvent.java | 68 +++
.../s4/example/edsl/counter/UserIDKeyFinder.java | 34 ++
.../s4/example/fluent/counter/AgeKeyFinder.java | 34 --
.../s4/example/fluent/counter/CountEvent.java | 58 --
.../s4/example/fluent/counter/CountKeyFinder.java | 34 --
.../s4/example/fluent/counter/CounterPE.java | 84 ---
.../s4/example/fluent/counter/GenderKeyFinder.java | 34 --
.../fluent/counter/GenerateUserEventPE.java | 77 ---
.../org/apache/s4/example/fluent/counter/Main.java | 127 -----
.../apache/s4/example/fluent/counter/Module.java | 108 ----
.../apache/s4/example/fluent/counter/PrintPE.java | 44 --
.../org/apache/s4/example/fluent/counter/README.md | 17 -
.../s4/example/fluent/counter/UserEvent.java | 68 ---
.../s4/example/fluent/counter/UserIDKeyFinder.java | 34 --
60 files changed, 1758 insertions(+), 2066 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 106dd2b..d67e54d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -73,7 +73,7 @@ libraries = [
jcip: 'net.jcip:jcip-annotations:1.0',
junit: 'junit:junit:4.10',
zkclient: 'com.github.sgroschupf:zkclient:0.1',
- diezel: 'net.ericaro:diezel-maven-plugin:1.0.0-beta-3'
+ diezel: 'net.ericaro:diezel-maven-plugin:1.0.0-beta-4-SNAPSHOT'
]
subprojects {
@@ -129,6 +129,7 @@ subprojects {
)
}
+
dependsOnChildren()
platformProjects = [project(':s4-base'), project(':s4-core'), project(':s4-comm'), project(':s4-edsl')]
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 4a562f9..5822b89 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
@@ -42,9 +43,12 @@ public abstract class App {
/* PE prototype to streams relations. */
final private Multimap<ProcessingElement, Streamable<? extends Event>> pe2stream = LinkedListMultimap.create();
- /* Stream prototype to PE relations. */
+ /* Stream to PE prototype relations. */
final private Multimap<Streamable<? extends Event>, ProcessingElement> stream2pe = LinkedListMultimap.create();
+ /* Pes indexed by name. */
+ Map<String, ProcessingElement> peByName = Maps.newHashMap();
+
private ClockType clockType = ClockType.WALL_CLOCK;
private int id = -1;
@Inject
@@ -104,6 +108,10 @@ public abstract class App {
protected abstract void onStart();
+ /**
+ * This method is called by the container after initialization. Once this method is called, threads get started and
+ * events start flowing.
+ */
protected void start() {
logger.info("Prepare to start App [{}].", getClass().getName());
@@ -122,6 +130,9 @@ public abstract class App {
onStart();
}
+ /**
+ * This method is called by the container to initialize applications.
+ */
protected abstract void onInit();
protected void init() {
@@ -129,6 +140,9 @@ public abstract class App {
onInit();
}
+ /**
+ * This method is called by the container before unloading the application.
+ */
protected abstract void onClose();
protected void close() {
@@ -164,7 +178,11 @@ public abstract class App {
logger.info("Add PE prototype [{}] with stream [{}].", toString(pePrototype), toString(stream));
pe2stream.put(pePrototype, stream);
+ }
+
+ public ProcessingElement getPE(String name) {
+ return peByName.get(name);
}
void addStream(Streamable<? extends Event> stream, ProcessingElement pePrototype) {
@@ -312,14 +330,17 @@ public abstract class App {
*
* @param type
* the processing element type.
+ * @param name
+ * a name for this PE prototype.
* @return the processing element prototype.
*/
- public <T extends ProcessingElement> T createPE(Class<T> type) {
+ public <T extends ProcessingElement> T createPE(Class<T> type, String name) {
try {
// TODO: make sure this doesn't crash if PE has a constructor other than with App as argument.
Class<?>[] types = new Class<?>[] { App.class };
T pe = type.getDeclaredConstructor(types).newInstance(this);
+ pe.setName(name);
return pe;
} catch (Exception e) {
@@ -328,6 +349,19 @@ public abstract class App {
}
}
+ /**
+ * Creates a {@link ProcessingElement} prototype.
+ *
+ * @param type
+ * the processing element type.
+ * @return the processing element prototype.
+ */
+ public <T extends ProcessingElement> T createPE(Class<T> type) {
+
+ return createPE(type, null);
+
+ }
+
static private String toString(ProcessingElement pe) {
return pe != null ? pe.getClass().getName() + " " : "null ";
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index befe94f..152f24e 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -76,7 +76,6 @@ import com.google.common.collect.Maps;
* objects in the {@link #onCreate()} method. For example, if each instance requires a
* <tt>List<tt/> object the PE should implement the following:
* <pre>
- * {@code
* public class MyPE extends ProcessingElement {
*
* private Map<String, Integer> wordCount;
@@ -88,7 +87,6 @@ import com.google.common.collect.Maps;
* logger.trace("Created a map for instance PE with id {}, getId());
* }
* }
- * }
* </pre>
*
*
@@ -124,7 +122,7 @@ abstract public class ProcessingElement implements Cloneable {
private Timer timer;
private boolean isPrototype = true;
private boolean isThreadSafe = false;
- private boolean isFirst = true;
+ private String name = null;
private boolean isSingleton = false;
private transient OverloadDispatcher overloadDispatcher;
@@ -246,6 +244,34 @@ abstract public class ProcessingElement implements Cloneable {
}
/**
+ * Sets the max size of the PE cache.
+ *
+ * <p>
+ * Least accessed PEs will automatically be removed from the cache when the number of PEs approaches maximumSize.
+ * <p>
+ * When this method is called all existing PE instances are destroyed.
+ *
+ *
+ * @param maximumSize
+ * the approximate maximum number of PEs in the cache.
+ * @return the PE prototype
+ */
+ public ProcessingElement setPECache(int maximumSize) {
+
+ Preconditions.checkArgument(isPrototype, "This method can only be used on the PE prototype. Trigger not set.");
+
+ peInstances = CacheBuilder.newBuilder().maximumSize(maximumSize)
+ .build(new CacheLoader<String, ProcessingElement>() {
+ @Override
+ public ProcessingElement load(String key) throws Exception {
+ return createPE(key);
+ }
+ });
+
+ return this;
+ }
+
+ /**
* This trigger is fired when the following conditions occur:
*
* <ul>
@@ -502,9 +528,6 @@ abstract public class ProcessingElement implements Cloneable {
/* Check if instance for key exists, otherwise create one. */
try {
if (isSingleton) {
- logger.trace(
- "Requested a PE instance with key [{}]. The instance is a singleton and will ignore the key. The key should be set to null when requesting a singleton.",
- id);
return peInstances.get(SINGLETON);
}
return peInstances.get(id);
@@ -623,6 +646,29 @@ abstract public class ProcessingElement implements Cloneable {
}
}
+ /**
+ * @return the PE name
+ */
+ protected String getName() {
+ return name;
+ }
+
+ /**
+ * @param name
+ * PE name
+ */
+ protected void setName(String name) {
+
+ if (name == null)
+ return;
+
+ this.name = name;
+ if (app.peByName.containsKey(name)) {
+ logger.warn("Using a duplicate PE name: [{}]. This is probbaly not what you wanted.", name);
+ }
+ app.peByName.put(name, this);
+ }
+
class Trigger {
final long intervalInMilliseconds;
final int intervalInEvents;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index c05d81f..1790e44 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -88,6 +88,12 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
targetPEs = new ProcessingElement[pes.size()];
pes.toArray(targetPEs);
+ if (logger.isTraceEnabled()) {
+ for (ProcessingElement pe : pes) {
+ logger.trace("Starting stream [{}] with target PE [{}].", this.getName(), pe.getName());
+ }
+ }
+
/* Start streaming. */
thread = new Thread(this, name);
thread.start();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
deleted file mode 100644
index bfeabd1..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
+++ /dev/null
@@ -1,418 +0,0 @@
-package org.apache.s4.fluent;
-
-import java.lang.reflect.Array;
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Event;
-import org.apache.s4.base.KeyFinder;
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Stream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.Multimap;
-
-/**
- * A fluent API to build S4 applications.
- *
- * *
- * <p>
- * Usage example:
- *
- * <pre>
- *
- * @Override
- * public void configure() {
- *
- * PEMaker pez, pey, pex;
- *
- * pez = addPE(PEZ.class);
- * pez.addTrigger().fireOn(EventA.class).ifInterval(5, TimeUnit.SECONDS);
- * pez.addCache().ofSize(1000).withDuration(3, TimeUnit.HOURS);
- *
- * pey = addPE(PEY.class).with("duration", 4).with("height", 99);
- * pey.addTimer().withDuration(2, TimeUnit.MINUTES);
- *
- * pex = addPE(PEX.class).with("query", "money").asSingleton();
- * pex.addCache().ofSize(100).withDuration(1, TimeUnit.MINUTES);
- *
- * pey.emit(EventA.class).withField("stream3").onKey(new DurationKeyFinder()).to(pez);
- * pey.emit(EventA.class).withField("heightpez").onKey(new HeightKeyFinder()).to(pez);
- * pez.emit(EventB.class).to(pex);
- * pex.emit(EventB.class).onKey(new QueryKeyFinder()).to(pey).to(pez);
- * }
- *
- *
- * </pre>
- */
-abstract public class AppMaker {
-
- private static final Logger logger = LoggerFactory.getLogger(AppMaker.class);
-
- /* Use multi-maps to save the graph. */
- private Multimap<PEMaker, StreamMaker> pe2stream = LinkedListMultimap.create();
- private Multimap<StreamMaker, PEMaker> stream2pe = LinkedListMultimap.create();
-
- private FluentApp app;
-
- public AppMaker() {
-
- }
-
- public void setApp(FluentApp app) {
- this.app = app;
- }
-
- /**
- * Configure the application.
- */
- protected abstract void start();
-
- protected abstract void configure();
-
- protected abstract void close();
-
- /**
- * @return the app
- */
- public FluentApp getApp() {
- return app;
- }
-
- /* Used internally to build the graph. */
- void add(PEMaker pem, StreamMaker stream) {
-
- pe2stream.put(pem, stream);
- logger.debug("Adding pe [{}] to stream [{}].", pem != null ? pem.getType().getName() : "null",
- stream != null ? stream.getName() : "null");
- }
-
- /* Used internally to build the graph. */
- void add(StreamMaker stream, PEMaker pem) {
-
- stream2pe.put(stream, pem);
- logger.debug("Adding stream [{}] to pe [{}].", stream != null ? stream.getName() : "null", pem != null ? pem
- .getType().getName() : "null");
- }
-
- protected PEMaker addPE(Class<? extends ProcessingElement> type) {
- PEMaker pe = new PEMaker(this, type);
- return pe;
- }
-
- App make() {
-
- logger.debug("Start MAKE.");
-
- /* Loop PEMaker objects to create PEs. */
- for (PEMaker key : pe2stream.keySet()) {
- if (key != null) {
- try {
- key.setPe(makePE(key, key.getType()));
- } catch (NoSuchFieldException e) {
- logger.error("Couldn't make PE.", e);
- } catch (IllegalAccessException e) {
- logger.error("Couldn't make PE.", e);
- }
- }
-
- }
- /* Loop StreamMaker objects to create Streams. */
- for (StreamMaker key : stream2pe.keySet()) {
- if (key != null) {
- key.setStream(makeStream(key, key.getType()));
- }
- }
-
- /* PE to Stream wiring. */
- Map<PEMaker, Collection<StreamMaker>> pe2streamMap = pe2stream.asMap();
- for (Map.Entry<PEMaker, Collection<StreamMaker>> entry : pe2streamMap.entrySet()) {
- PEMaker pm = entry.getKey();
- Collection<StreamMaker> streams = entry.getValue();
-
- if (pm != null && streams != null) {
- try {
- setStreamField(pm, streams);
- } catch (Exception e) {
- logger.error("Couldn't make Stream.", e);
- }
- }
- }
-
- /* Stream to PE wiring. */
- Map<StreamMaker, Collection<PEMaker>> stream2peMap = stream2pe.asMap();
- for (Map.Entry<StreamMaker, Collection<PEMaker>> entry : stream2peMap.entrySet()) {
- StreamMaker sm = entry.getKey();
- for (PEMaker pm : entry.getValue()) {
- if (pm != null && sm != null) {
- sm.getStream().setPE(pm.getPe());
- }
- }
- }
-
- return app;
- }
-
- /* Do the magic to create a Stream from a StreamMaker. */
- @SuppressWarnings("unchecked")
- private <T extends Event> Stream<T> makeStream(StreamMaker sm, Class<T> type) {
-
- Stream<T> stream = app.createStream(type);
- stream.setName(sm.getName());
-
- if (sm.getKeyFinder() != null)
- stream.setKey((KeyFinder<T>) sm.getKeyFinder());
- else if (sm.getKeyDescriptor() != null)
- stream.setKey(sm.getKeyDescriptor());
-
- return stream;
- }
-
- /* Do the magic to create a PE from a PEMaker. */
- private <T extends ProcessingElement> T makePE(PEMaker pem, Class<T> type) throws NoSuchFieldException,
- IllegalAccessException {
- T pe = app.createPE(type);
- pe.setSingleton(pem.isSingleton());
-
- if (pem.getCacheMaximumSize() > 0)
- pe.setPECache(pem.getCacheMaximumSize(), pem.getCacheDuration(), TimeUnit.MILLISECONDS);
-
- if (pem.getTimerInterval() > 0)
- pe.setTimerInterval(pem.getTimerInterval(), TimeUnit.MILLISECONDS);
-
- if (pem.getTriggerEventType() != null) {
- if (pem.getTriggerNumEvents() > 0 || pem.getTriggerInterval() > 0) {
- pe.setTrigger(pem.getTriggerEventType(), pem.getTriggerNumEvents(), pem.getTriggerInterval(),
- TimeUnit.MILLISECONDS);
- }
- }
-
- /* Use introspection to match properties to class fields. */
- setPEAttributes(pe, pem, type);
- return pe;
- }
-
- private <T extends ProcessingElement> void setPEAttributes(T pe, PEMaker pem, Class<T> type)
- throws NoSuchFieldException, IllegalAccessException {
-
- PropertiesConfiguration properties = pem.getProperties();
- @SuppressWarnings("unchecked")
- Iterator<String> iter = properties.getKeys();
-
- while (iter.hasNext()) {
- String property = iter.next();
- logger.debug("Adding property [{}] to PE of type [{}].", property, type.getName());
- setField(property, pe, pem, type);
- }
- }
-
- private <T extends ProcessingElement> void setField(String fieldName, T pe, PEMaker pm, Class<T> type)
- throws NoSuchFieldException, IllegalAccessException {
- try {
- Field f = type.getDeclaredField(fieldName);
- f.setAccessible(true);
- logger.trace("Type: {}.", f.getType());
- logger.trace("GenericType: {}.", f.getGenericType());
-
- /* Set the field. */
- if (f.getType().getCanonicalName() == "long") {
- f.setLong(pe, pm.getProperties().getLong(fieldName));
- return;
- } else if (f.getType().getCanonicalName() == "int") {
- f.setInt(pe, pm.getProperties().getInt(fieldName));
- return;
- } else if (f.getType().getCanonicalName() == "float") {
- f.setFloat(pe, pm.getProperties().getFloat(fieldName));
- return;
- } else if (f.getType().getCanonicalName() == "double") {
- f.setDouble(pe, pm.getProperties().getDouble(fieldName));
- return;
- } else if (f.getType().getCanonicalName() == "short") {
- f.setShort(pe, pm.getProperties().getShort(fieldName));
- return;
- } else if (f.getType().getCanonicalName() == "byte") {
- f.setByte(pe, pm.getProperties().getByte(fieldName));
- return;
- } else if (f.getType().getCanonicalName() == "boolean") {
- f.setBoolean(pe, pm.getProperties().getBoolean(fieldName));
- return;
- } else if (f.getType().getCanonicalName() == "char") {
- f.setChar(pe, (char) pm.getProperties().getByte(fieldName));
- return;
- } else if (f.getType().getCanonicalName() == "java.lang.String") {
- f.set(pe, pm.getProperties().getString(fieldName));
- return;
- }
-
- logger.error("Unable to set field named [{}] in PE of type [{}].", fieldName, type);
- throw new IllegalArgumentException();
-
- // production code should handle these exceptions more gracefully
- } catch (NoSuchFieldException e) {
- logger.error("There is no field named [{}] in PE of type [{}].", fieldName, type);
- throw e;
- } catch (IllegalArgumentException e) {
- logger.error("Couldn't set value for field [{}] in PE of type [{}].", fieldName, type);
- throw e;
- }
- }
-
- /* Set the stream fields in PE classes. Infer the field by checking the stream parameter type <? extends Event>. */
- private void setStreamField(PEMaker pm, Collection<StreamMaker> streams) throws Exception {
-
- /*
- * Create a map of the stream fields to the corresponding generic type. We will use this info to assign the
- * streams. If the field type matches the stream type and there is no ambiguity, then the assignment is easy. If
- * more than one field has the same type, then then we need to do more work.
- */
- Field[] fields = pm.getPe().getClass().getDeclaredFields();
- Multimap<String, Field> typeMap = LinkedListMultimap.create();
- logger.debug("Analyzing PE [{}].", pm.getPe().getClass().getName());
- for (Field field : fields) {
- logger.trace("Field [{}] is of generic type [{}].", field.getName(), field.getGenericType());
-
- if (field.getType() == Stream[].class) {
- logger.debug("Found stream field: {}", field.getGenericType());
-
- /* Track what fields have streams with the same event type. */
- String key = field.getGenericType().toString();
- typeMap.put(key, field);
- }
- }
-
- /* Assign streams to stream fields. */
- Multimap<Field, Stream<? extends Event>> assignment = LinkedListMultimap.create();
- for (StreamMaker sm : streams) {
-
- if (sm == null)
- continue;
-
- Stream<? extends Event> stream = sm.getStream();
- Class<? extends Event> eventType = sm.getType();
- String key = Stream.class.getCanonicalName() + "<" + eventType.getCanonicalName() + ">[]";
- if (typeMap.containsKey(key)) {
- String fieldName;
- Field field;
- Collection<Field> streamFields = typeMap.get(key);
- int numStreamFields = streamFields.size();
- logger.debug("Found [{}] stream fields for type [{}].", numStreamFields, key);
-
- if (numStreamFields > 1) {
-
- /*
- * There is more than one field that can be used for this stream type. To resolve the ambiguity we
- * need additional information. The app graph should include the name of the field that should be
- * used to assign this stream. If the name is missing we bail out.
- */
- fieldName = sm.getFieldName();
-
- /* Bail out. */
- if (fieldName == null) {
- String msg = String
- .format("There are [%d] stream fields in PE [%s]. To assign stream [%s] you need to provide the field name in the application graph using the method withFiled(). See Javadocs for an example.",
- numStreamFields, pm.getPe().getClass().getName(), stream.getName());
- logger.error(msg);
- throw new Exception(msg);
- }
-
- /* Use the provided field name to choose the PE field. */
- field = pm.getPe().getClass().getDeclaredField(fieldName);
-
- } else {
-
- /*
- * The easy case, no ambiguity, we don't need an explicit field name to be provided. We have the
- * field that matches the stream type.
- */
- Iterator<Field> iter = streamFields.iterator();
- field = iter.next(); // Note that numStreamFields == 1, the size of this collection is 1.
- logger.debug("Using field [{}].", field.getName());
- }
-
- /*
- * By now, we found the field to use for this stream or we bailed out. We are not ready to finish yet.
- * There may be more than one stream that needs to be assigned to this field. The stream fields must be
- * arrays by convention and there may be more than one stream assigned to this fields. For now we create
- * a multimap from field to streams so we can construct the array in the next pass.
- */
- assignment.put(field, stream);
-
- } else {
-
- /* We couldn't find a match. Tell user to fix the application. */
- String msg = String.format(
- "There is no stream of type [%s] in PE [%s]. I was unable to assign stream [%s].", key, pm
- .getPe().getClass().getName(), stream.getName());
- logger.error(msg);
- throw new Exception(msg);
-
- }
- }
- /* Now we construct the array and do the final assignment. */
-
- Map<Field, Collection<Stream<? extends Event>>> assignmentMap = assignment.asMap();
- for (Map.Entry<Field, Collection<Stream<? extends Event>>> entry : assignmentMap.entrySet()) {
- Field f = entry.getKey();
-
- int arraySize = entry.getValue().size();
- @SuppressWarnings("unchecked")
- Stream<? extends Event> streamArray[] = (Stream<? extends Event>[]) Array.newInstance(Stream.class,
- arraySize);
- int i = 0;
- for (Stream<? extends Event> s : entry.getValue()) {
- streamArray[i++] = s;
-
- f.setAccessible(true);
- f.set(pm.getPe(), streamArray);
- logger.debug("Assigned [{}] streams to field [{}].", streamArray.length, f.getName());
- }
- }
- }
-
- static private String toString(PEMaker pm) {
- return pm != null ? pm.getType().getName() + " " : "null ";
- }
-
- static private String toString(StreamMaker sm) {
- return sm != null ? sm.getName() + " " : "null ";
- }
-
- /**
- * A printable representation of the application graph.
- *
- * @return the application graph.
- */
- public String toString() {
-
- StringBuilder sb = new StringBuilder();
- sb.append("\nApplication Graph for " + this.getClass().getCanonicalName() + "\n");
- Map<PEMaker, Collection<StreamMaker>> pe2streamMap = pe2stream.asMap();
- for (Map.Entry<PEMaker, Collection<StreamMaker>> entry : pe2streamMap.entrySet()) {
- sb.append(toString(entry.getKey()) + "=> ");
- for (StreamMaker sm : entry.getValue()) {
- sb.append(toString(sm));
- }
- sb.append("\n");
- }
-
- Map<StreamMaker, Collection<PEMaker>> stream2peMap = stream2pe.asMap();
- for (Map.Entry<StreamMaker, Collection<PEMaker>> entry : stream2peMap.entrySet()) {
- sb.append(toString(entry.getKey()) + "=> ");
- for (PEMaker pm : entry.getValue()) {
- sb.append(toString(pm));
- }
- sb.append("\n");
- }
-
- return sb.toString();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
deleted file mode 100644
index 9f66e0d..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package org.apache.s4.fluent;
-
-import org.apache.s4.core.App;
-
-import com.google.inject.Inject;
-
-/**
- * The Fluent API uses this class to construct apps automatically. Users should not have to use this class directly.
- *
- */
-public class FluentApp extends App {
-
- final private AppMaker appMaker;
-
- @Inject
- public FluentApp(AppMaker appMaker) {
- super();
- this.appMaker = appMaker;
- appMaker.setApp(this);
- }
-
- @Override
- protected void onStart() {
- appMaker.start();
- }
-
- @Override
- protected void onInit() {
- appMaker.configure();
- appMaker.make();
- }
-
- @Override
- protected void onClose() {
- appMaker.close();
- }
-
- public void start() {
- super.start();
- }
-
- public void init() {
- super.init();
- }
-
- public void close() {
- super.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/main/java/org/apache/s4/fluent/PEMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/PEMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/PEMaker.java
deleted file mode 100644
index e5de2cb..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/PEMaker.java
+++ /dev/null
@@ -1,257 +0,0 @@
-package org.apache.s4.fluent;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Event;
-import org.apache.s4.core.ProcessingElement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Helper class to add a processing element to an S4 application.
- *
- * @see example {@link S4Maker}
- *
- */
-public class PEMaker {
-
- private static final Logger logger = LoggerFactory.getLogger(PEMaker.class);
-
- final private Class<? extends ProcessingElement> type;
- final private AppMaker app;
- private ProcessingElement pe = null;
-
- private long timerInterval = 0;
-
- private long triggerInterval = 0;
- private Class<? extends Event> triggerEventType = null;
- private int triggerNumEvents = 0;
-
- private int cacheMaximumSize = 0;
- private long cacheDuration = 0;
-
- private PropertiesConfiguration properties = new PropertiesConfiguration();
-
- private boolean isSingleton = false;
-
- PEMaker(AppMaker app, Class<? extends ProcessingElement> type) {
- Preconditions.checkNotNull(type);
- this.type = type;
- this.app = app;
- app.add(this, null);
- }
-
- /**
- * Configure the PE expiration and cache size.
- * <p>
- * PE instances will be automatically removed from the cache once a fixed duration has elapsed after the PEs
- * creation, or last access.
- * <p>
- * Least accessed PEs will automatically be removed from the cache when the number of PEs approaches maximumSize.
- * <p>
- * When this method is called all existing PE instances are destroyed.
- *
- *
- * @param maximumSize
- * the approximate maximum number of PEs in the cache.
- * @param duration
- * the PE duration
- * @param timeUnit
- * the time unit
- * @return the PEMaker
- */
- public CacheMaker addCache() {
-
- return new CacheMaker();
- }
-
- /**
- * Configure a trigger that is fired when the following conditions occur:
- *
- * <ul>
- * <li>An event of eventType arrived to the PE instance
- * <li>numEvents have arrived since the last time this trigger was fired -OR- time since last event is greater than
- * interval.
- * </ul>
- *
- * <p>
- * When the trigger fires, the method <tt>trigger(EventType event)</tt> is called. Where <tt>EventType</tt> matches
- * the argument eventType.
- *
- * @param eventType
- * the type of event on which this trigger will fire.
- * @param numEvents
- * number of events since last trigger activation. Must be greater than zero. (Set to one to trigger on
- * every input event.)
- * @param interval
- * minimum time between triggers. Set to zero if no time interval needed.
- * @param timeUnit
- * the TimeUnit for the argument interval. Can set to null if no time interval needed.
- * @return the PEMaker
- */
- public TriggerMaker addTrigger() {
-
- return new TriggerMaker();
- }
-
- /**
- * Set a timer that calls {@link ProcessingElement#onTime()}.
- *
- * If {@code interval==0} the timer is disabled.
- *
- * @param interval
- * in timeUnit
- * @param timeUnit
- * the timeUnit of interval
- * @return the PEMaker
- */
- public TimerMaker addTimer() {
- return new TimerMaker();
- }
-
- public StreamMaker emit(Class<? extends Event> type) {
-
- logger.debug("PE [{}] emits event of type [{}].", this.getType().getName(), type.getCanonicalName());
- StreamMaker stream = new StreamMaker(app, type);
- app.add(this, stream);
- return stream;
- }
-
- public PEMaker withKey(String key) {
-
- return this;
- }
-
- public PEMaker with(String key, Object value) {
-
- properties.addProperty(key, value);
- return this;
- }
-
- /**
- * @return the timerInterval
- */
- long getTimerInterval() {
- return timerInterval;
- }
-
- /**
- * @return the triggerInterval
- */
- long getTriggerInterval() {
- return triggerInterval;
- }
-
- /**
- * @return the triggerEventType
- */
- Class<? extends Event> getTriggerEventType() {
- return triggerEventType;
- }
-
- /**
- * @return the triggerNumEvents
- */
- int getTriggerNumEvents() {
- return triggerNumEvents;
- }
-
- /**
- * @return the cacheMaximumSize
- */
- int getCacheMaximumSize() {
- return cacheMaximumSize;
- }
-
- /**
- * @return the cacheDuration
- */
- long getCacheDuration() {
- return cacheDuration;
- }
-
- /**
- * @return the type
- */
- Class<? extends ProcessingElement> getType() {
- return type;
- }
-
- /**
- * @return the properties
- */
- PropertiesConfiguration getProperties() {
- return properties;
- }
-
- /**
- * @return the pe
- */
- public ProcessingElement getPe() {
- return pe;
- }
-
- /**
- * @param pe
- * the pe to set
- */
- public void setPe(ProcessingElement pe) {
- this.pe = pe;
- }
-
- public class TriggerMaker {
-
- public TriggerMaker fireOn(Class<? extends Event> eventType) {
-
- triggerEventType = eventType;
- return this;
- }
-
- public TriggerMaker ifNumEvents(int numEvents) {
-
- triggerNumEvents = numEvents;
- return this;
- }
-
- public TriggerMaker ifInterval(long interval, TimeUnit timeUnit) {
-
- if (timeUnit != null)
- triggerInterval = timeUnit.convert(interval, TimeUnit.MILLISECONDS);
- return this;
- }
- }
-
- public class CacheMaker {
-
- public CacheMaker ofSize(int maxSize) {
- cacheMaximumSize = maxSize;
- return this;
- }
-
- public CacheMaker withDuration(long duration, TimeUnit timeUnit) {
- cacheDuration = timeUnit.convert(duration, TimeUnit.MILLISECONDS);
- return this;
- }
- }
-
- public class TimerMaker {
-
- public TimerMaker withDuration(long duration, TimeUnit timeUnit) {
- timerInterval = TimeUnit.MILLISECONDS.convert(duration, timeUnit);
- timerInterval = duration;
- return this;
- }
- }
-
- public PEMaker asSingleton() {
- this.isSingleton = true;
- return this;
- }
-
- public boolean isSingleton() {
- return isSingleton;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/main/java/org/apache/s4/fluent/StreamMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/StreamMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/StreamMaker.java
deleted file mode 100644
index 3d3ea0e..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/StreamMaker.java
+++ /dev/null
@@ -1,168 +0,0 @@
-package org.apache.s4.fluent;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.base.KeyFinder;
-import org.apache.s4.core.Stream;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Helper class to add a stream to an S4 application.
- *
- * @see example {@link S4Maker}
- *
- */
-public class StreamMaker {
-
- final private AppMaker app;
- final private Class<? extends Event> type;
- private String name = null;
- private KeyFinder<? extends Event> keyFinder;
- private String keyDescriptor = null;
- private String fieldName;
- private Stream<? extends Event> stream = null;
-
- StreamMaker(AppMaker app, Class<? extends Event> type) {
-
- Preconditions.checkNotNull(type);
- this.app = app;
- this.type = type;
- app.add(null, this);
- }
-
- /**
- * Name the stream.
- *
- * @param name
- * the stream name, default is an empty string.
- * @return the stream maker object
- */
- public StreamMaker withName(String name) {
- this.name = name;
- return this;
- }
-
- /**
- * Define the key finder for this stream.
- *
- * @param keyFinder
- * a function to lookup the value of the key.
- * @return the stream maker.
- */
- public <T extends Event> StreamMaker onKey(KeyFinder<T> keyFinder) {
- this.keyFinder = keyFinder;
- return this;
- }
-
- /**
- * Define the key finder for this stream using a descriptor.
- *
- * @param keyFinderString
- * a descriptor to lookup the value of the key.
- * @return the stream maker.
- */
- public StreamMaker onKey(String keyDescriptor) {
-
- this.keyDescriptor = keyDescriptor;
- return this;
- }
-
- /**
- * Send events from this stream to a PE.
- *
- * @param pe
- * a target PE.
- *
- * @return the stream maker.
- */
- public StreamMaker to(PEMaker pe) {
- app.add(this, pe);
- return this;
- }
-
- /**
- * Send events from this stream to various PEs.
- *
- * @param pe
- * a target PE array.
- *
- * @return the stream maker.
- */
- public StreamMaker to(PEMaker[] pes) {
- for (int i = 0; i < pes.length; i++)
- app.add(this, pes[i]);
- return this;
- }
-
- /**
- * @return the app
- */
- AppMaker getApp() {
- return app;
- }
-
- /**
- * @return the type
- */
- Class<? extends Event> getType() {
- return type;
- }
-
- /**
- * @return the name
- */
- String getName() {
-
- if (name != null) {
- return name;
- }
-
- if (keyDescriptor != null) {
- return type.getCanonicalName() + "," + keyDescriptor;
- } else if (keyFinder != null) {
- return type.getCanonicalName() + "," + keyFinder.getClass().getCanonicalName();
- } else
- return type.getCanonicalName();
- }
-
- /**
- * @return the keyFinder
- */
- KeyFinder<? extends Event> getKeyFinder() {
- return keyFinder;
- }
-
- /**
- * @return the keyDescriptor
- */
- String getKeyDescriptor() {
- return keyDescriptor;
- }
-
- /**
- * @return the field name
- */
- String getFieldName() {
- return fieldName;
- }
-
- /**
- * @return the stream
- */
- public Stream<? extends Event> getStream() {
- return stream;
- }
-
- /**
- * @param stream
- * the stream to set
- */
- public void setStream(Stream<? extends Event> stream) {
- this.stream = stream;
- }
-
- public StreamMaker withField(String fieldName) {
- this.fieldName = fieldName;
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/AppMakerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/AppMakerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/AppMakerTest.java
deleted file mode 100644
index 742eb7e..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/AppMakerTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.s4.fluent;
-
-import java.lang.reflect.Field;
-
-import org.junit.Test;
-
-public class AppMakerTest {
-
- @Test
- public void test() throws Exception {
-
- MyApp myApp = new MyApp();
- myApp.setApp(new FluentApp(myApp));
- myApp.configure();
- System.out.println(myApp.toString());
- myApp.make();
- }
-
- @Test
- public void testReflection() {
-
- try {
- Class<?> c = PEY.class;
- Field f = c.getDeclaredField("duration");
- System.out.format("Type: %s%n", f.getType());
- System.out.format("GenericType: %s%n", f.getGenericType());
-
- // production code should handle these exceptions more gracefully
- } catch (NoSuchFieldException x) {
- x.printStackTrace();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/DurationKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/DurationKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/DurationKeyFinder.java
deleted file mode 100644
index c1831e5..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/DurationKeyFinder.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.s4.fluent;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.s4.base.KeyFinder;
-
-public class DurationKeyFinder implements KeyFinder<EventA> {
-
- public List<String> get(EventA event) {
-
- List<String> results = new ArrayList<String>();
-
- /* Retrieve the gender and add it to the list. */
- results.add(Long.toString(event.getDuration()));
-
- return results;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventA.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventA.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventA.java
deleted file mode 100644
index ccf37e2..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventA.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package org.apache.s4.fluent;
-
-import org.apache.s4.base.Event;
-
-public class EventA extends Event {
-
- private long duration;
- private int height;
-
- /**
- * @return the duration
- */
- public long getDuration() {
- return duration;
- }
-
- /**
- * @param duration
- * the duration to set
- */
- public void setDuration(long duration) {
- this.duration = duration;
- }
-
- /**
- * @return the height
- */
- public int getHeight() {
- return height;
- }
-
- /**
- * @param height the height to set
- */
- public void setHeight(int height) {
- this.height = height;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventB.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventB.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventB.java
deleted file mode 100644
index 3e285e3..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/EventB.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.s4.fluent;
-
-import org.apache.s4.base.Event;
-
-public class EventB extends Event {
-
- private String query;
-
- /**
- * @return the query
- */
- public String getQuery() {
- return query;
- }
-
- /**
- * @param query
- * the query to set
- */
- public void setQuery(String query) {
- this.query = query;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/HeightKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/HeightKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/HeightKeyFinder.java
deleted file mode 100644
index 4bf4afe..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/HeightKeyFinder.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.s4.fluent;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.s4.base.KeyFinder;
-
-public class HeightKeyFinder implements KeyFinder<EventA> {
-
- public List<String> get(EventA event) {
-
- List<String> results = new ArrayList<String>();
-
- /* Retrieve the gender and add it to the list. */
- results.add(Integer.toString(event.getHeight()));
-
- return results;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/MyApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/MyApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/MyApp.java
deleted file mode 100644
index b6678c5..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/MyApp.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.apache.s4.fluent;
-
-import java.util.concurrent.TimeUnit;
-
-public class MyApp extends AppMaker {
-
- @Override
- public void configure() {
-
- PEMaker pez, pey, pex;
-
- /* Configure processing element pez. */
- pez = addPE(PEZ.class);
- pez.addTrigger().fireOn(EventA.class).ifInterval(5, TimeUnit.SECONDS);
- pez.addCache().ofSize(1000).withDuration(3, TimeUnit.HOURS);
-
- /* Configure processing element pey. */
- pey = addPE(PEY.class).with("duration", 4).with("height", 99);
- pey.addTimer().withDuration(2, TimeUnit.MINUTES);
-
- /* Configure processing element pex. */
- pex = addPE(PEX.class).with("query", "money").asSingleton();
- pex.addCache().ofSize(100).withDuration(1, TimeUnit.MINUTES);
-
- /* Construct the graph. */
- pey.emit(EventA.class).withField("stream3").onKey(new DurationKeyFinder()).to(pez);
- pey.emit(EventA.class).withField("heightpez").onKey(new HeightKeyFinder()).to(pez);
- pez.emit(EventB.class).to(pex);
- pex.emit(EventB.class).onKey(new QueryKeyFinder()).to(pey).to(pez);
- }
-
- @Override
- public void start() {
- // TODO Auto-generated method stub
- }
-
- @Override
- public void close() {
- // TODO Auto-generated method stub
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEX.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEX.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEX.java
deleted file mode 100644
index 4ebc581..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEX.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package org.apache.s4.fluent;
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Stream;
-
-public class PEX extends ProcessingElement {
-
- private String query;
- private Stream<EventB>[] someStream;
- @SuppressWarnings("unused")
- private Stream<EventA>[] streams;
-
- public PEX(App app) {
- super(app);
- }
-
- @Override
- public void onCreate() {
-
- }
-
- @Override
- public void onRemove() {
-
- }
-
- /**
- * @return the keyword
- */
- String getKeyword() {
- return query;
- }
-
- /**
- * @param query
- * the keyword to set
- */
- void setKeyword(String query) {
- this.query = query;
- }
-
- /**
- * @return the someStream
- */
- public Stream<EventB>[] getSomeStream() {
- return someStream;
- }
-
- /**
- * @param someStream
- * the someStream to set
- */
- public void setSomeStream(Stream<EventB>[] someStream) {
- this.someStream = someStream;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEY.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEY.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEY.java
deleted file mode 100644
index 4c03cd9..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEY.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package org.apache.s4.fluent;
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Stream;
-
-public class PEY extends ProcessingElement {
-
- private Stream<EventA>[] stream3;
- @SuppressWarnings("unused")
- private Stream<EventA>[] heightpez;
-
- private int height;
- private long duration;
-
- public PEY(App app) {
- super(app);
- }
-
- @Override
- public void onCreate() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void onRemove() {
- // TODO Auto-generated method stub
-
- }
-
- /**
- * @return the stream3
- */
- Stream<EventA>[] getStream3() {
- return stream3;
- }
-
- /**
- * @param stream3
- * the stream3 to set
- */
- void setStream3(Stream<EventA>[] stream3) {
- this.stream3 = stream3;
- }
-
- /**
- * @return the height
- */
- int getHeight() {
- return height;
- }
-
- /**
- * @param height
- * the height to set
- */
- void setHeight(int height) {
- this.height = height;
- }
-
- /**
- * @return the duration
- */
- long getDuration() {
- return duration;
- }
-
- /**
- * @param duration
- * the duration to set
- */
- void setDuration(long duration) {
- this.duration = duration;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEZ.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEZ.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEZ.java
deleted file mode 100644
index 25a08fc..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/PEZ.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package org.apache.s4.fluent;
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Stream;
-
-public class PEZ extends ProcessingElement {
-
- private Stream<EventA>[] stream1;
- private Stream<EventB>[] stream2;
-
- public PEZ(App app) {
- super(app);
- }
-
- /**
- * @return the stream1
- */
- Stream<EventA>[] getStream1() {
- return stream1;
- }
-
- /**
- * @param stream1
- * the stream1 to set
- */
- void setStream1(Stream<EventA>[] stream1) {
- this.stream1 = stream1;
- }
-
- /**
- * @return the stream2
- */
- Stream<EventB>[] getStream2() {
- return stream2;
- }
-
- /**
- * @param stream2
- * the stream2 to set
- */
- void setStream2(Stream<EventB>[] stream2) {
- this.stream2 = stream2;
- }
-
- @Override
- public void onCreate() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void onRemove() {
- // TODO Auto-generated method stub
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-core/src/test/java/org/apache/s4/fluent/QueryKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/QueryKeyFinder.java b/subprojects/s4-core/src/test/java/org/apache/s4/fluent/QueryKeyFinder.java
deleted file mode 100644
index 050427b..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fluent/QueryKeyFinder.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.s4.fluent;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.s4.base.KeyFinder;
-
-public class QueryKeyFinder implements KeyFinder<EventB> {
-
- public List<String> get(EventB event) {
-
- List<String> results = new ArrayList<String>();
-
- /* Retrieve the gender and add it to the list. */
- results.add(event.getQuery());
-
- return results;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/s4-edsl.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/s4-edsl.gradle b/subprojects/s4-edsl/s4-edsl.gradle
index 4568799..7dc2c07 100644
--- a/subprojects/s4-edsl/s4-edsl.gradle
+++ b/subprojects/s4-edsl/s4-edsl.gradle
@@ -19,6 +19,13 @@ sourceSets {
buildscript {
repositories {
mavenCentral()
+
+ maven {
+ url "http://oss.sonatype.org/content/repositories/snapshots"
+ }
+ maven {
+ url "http://oss.sonatype.org/content/repositories/releases"
+ }
}
dependencies {
classpath libraries.diezel
@@ -39,3 +46,5 @@ task generateSources << {
compileJava.source generateSources.outputs.files, sourceSets.main.java
+eclipseClasspath.dependsOn generateSources
+
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/main/diezel/s4/s4-impl.xml
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/main/diezel/s4/s4-impl.xml b/subprojects/s4-edsl/src/main/diezel/s4/s4-impl.xml
index 4e2cdbe..94e0dde 100644
--- a/subprojects/s4-edsl/src/main/diezel/s4/s4-impl.xml
+++ b/subprojects/s4-edsl/src/main/diezel/s4/s4-impl.xml
@@ -1,83 +1,109 @@
<?xml version="1.0" encoding="UTF-8"?>
<diezelImplementation xmlns="http://diezel.ericaro.net/2.0.0/">
- <package>org.apache.s4.core.edsl</package>
+ <package>org.apache.s4.edsl</package>
<name>Builder</name>
- <implements>org.apache.s4.core.edsl.S4DSL</implements>
+ <extends>AppBuilder</extends>
+ <implements>org.apache.s4.edsl.S4DSL</implements>
<transitions>
<transitionImplementation name="pe">
<body>
- System.out.println("pe");
+ clearPEState();peName = name;logger.debug("PE name: " + peName);
</body>
</transitionImplementation>
<transitionImplementation name="type">
<body>
- System.out.println("type");
+ processingElement = createPE(peType, peName);
+ logger.debug("peType: " + peType);
+ </body>
+ </transitionImplementation>
+ <transitionImplementation name="prop">
+ <body>
+ addProperty(propName, propValue);
+ logger.debug("prop: " + propName + " = " + propValue);
</body>
</transitionImplementation>
<transitionImplementation name="fireOn">
<body>
- System.out.println("fireOn");
+ triggerEventType = eventType;
+ logger.debug("EventType: " + eventType);
+ </body>
+ </transitionImplementation>
+ <transitionImplementation name="afterInterval">
+ <body>
+ triggerInterval = interval; triggerTimeUnit = timeUnit; processingElement.setTrigger(triggerEventType, 1, triggerInterval, triggerTimeUnit); logger.debug("Interval: " + triggerInterval);
</body>
</transitionImplementation>
- <transitionImplementation name="ifInterval">
+ <transitionImplementation name="afterNumEvents">
<body>
- System.out.println("ifInterval");
+ processingElement.setTrigger(triggerEventType, numEvents, triggerInterval, triggerTimeUnit);
+ logger.debug("afterNumeEvents: " + numEvents);
</body>
</transitionImplementation>
<transitionImplementation name="timer">
<body>
- System.out.println("timer");
+ logger.debug("timer on");
</body>
</transitionImplementation>
<transitionImplementation name="withPeriod">
<body>
- System.out.println("withPeriod");
+ processingElement.setTimerInterval(interval, timeUnit);
+ logger.debug("withPeriod: " + interval);
</body>
</transitionImplementation>
<transitionImplementation name="cache">
<body>
- System.out.println("cache");
+ logger.debug("cache");
+ </body>
+ </transitionImplementation>
+ <transitionImplementation name="size">
+ <body>
+ cacheSize = size; processingElement.setPECache(size); logger.debug("cache size: " + cacheSize);
</body>
</transitionImplementation>
<transitionImplementation name="expires">
<body>
- System.out.println("expires");
+ processingElement.setPECache(cacheSize, duration, timeUnit); logger.debug("expires: " + duration);
</body>
</transitionImplementation>
<transitionImplementation name="asSingleton">
<body>
- System.out.println("asSingleton");
+ processingElement.setSingleton(true);
+ logger.debug("asSingleton");
</body>
</transitionImplementation>
- <transitionImplementation name="emitEvent">
+ <transitionImplementation name="emit">
<body>
- System.out.println("emitEvent");
+ streamBuilder = new StreamBuilder(app, event); streamBuilders.add(streamBuilder); addPe2Stream(processingElement, streamBuilder); logger.debug("emit event: " + event);
</body>
</transitionImplementation>
<transitionImplementation name="onField">
<body>
- System.out.println("onField");
+ streamBuilder.setFieldName(fieldName);
+ logger.debug("onField");
</body>
</transitionImplementation>
- <transitionImplementation name="onKey">
+ <transitionImplementation name="withKeyFinder">
<body>
- System.out.println("onKey");
+ streamBuilder.setKeyFinder(keyFinder);
+ logger.debug("withKeyFinder");
</body>
</transitionImplementation>
- <transitionImplementation name="to">
+ <transitionImplementation name="withKey">
<body>
- System.out.println("to");
+ streamBuilder.setKey(key);
+ logger.debug("withKey");
</body>
</transitionImplementation>
- <transitionImplementation name="info">
+ <transitionImplementation name="to">
<body>
- System.out.println("info");
+ streamBuilder.to(targetPeName);
+ logger.debug("to: " + targetPeName);
</body>
</transitionImplementation>
<transitionImplementation name="build">
<body>
- System.out.println("build");
- return "Done!";
+ logger.debug("build");
+ return buildApp();
</body>
</transitionImplementation>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/main/diezel/s4/s4.xml
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/main/diezel/s4/s4.xml b/subprojects/s4-edsl/src/main/diezel/s4/s4.xml
index 85d6bcb..008da11 100644
--- a/subprojects/s4-edsl/src/main/diezel/s4/s4.xml
+++ b/subprojects/s4-edsl/src/main/diezel/s4/s4.xml
@@ -1,56 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<diezel xmlns="http://diezel.ericaro.net/2.0.0/">
- <package>org.apache.s4.core.edsl</package>
+ <package>org.apache.s4.edsl</package>
<name>S4DSL</name>
- <expression>(pe , type , (fireOn , ifInterval? )? , (timer, withPeriod)? , (cache, size , expires? )? , asSingleton? , (emitEvent, onField?, onKey?, to )+ )+ , info?, build</expression>
+ <expression>(pe , type , prop* , (fireOn , afterInterval? , afterNumEvents?)? , (timer, withPeriod)? , (cache, size , expires? )? , asSingleton? , (emit, onField?, (withKey|withKeyFinder)?, to+ )* )+ , build</expression>
<transitions>
<transition name="pe">
- <signature>pe()</signature>
+ <signature>pe(java.lang.String name)</signature>
</transition>
<transition name="type">
- <signature>type()</signature>
+ <signature><![CDATA[type(java.lang.Class<? extends org.apache.s4.core.ProcessingElement> peType)]]></signature>
+ </transition>
+ <transition name="prop">
+ <signature>prop(java.lang.String propName, java.lang.String propValue)</signature>
</transition>
<transition name="fireOn">
- <signature>fireOn()</signature>
+ <signature><![CDATA[fireOn(java.lang.Class<? extends org.apache.s4.base.Event> eventType)]]></signature>
+ </transition>
+ <transition name="afterInterval">
+ <signature>afterInterval(long interval, java.util.concurrent.TimeUnit timeUnit)</signature>
</transition>
- <transition name="ifInterval">
- <signature>ifInterval()</signature>
+ <transition name="afterNumEvents">
+ <signature>afterNumEvents(int numEvents)</signature>
</transition>
<transition name="timer">
<signature>timer()</signature>
</transition>
<transition name="withPeriod">
- <signature>withPeriod()</signature>
+ <signature>withPeriod(long interval, java.util.concurrent.TimeUnit timeUnit)</signature>
</transition>
<transition name="cache">
<signature>cache()</signature>
</transition>
<transition name="size">
- <signature>size()</signature>
+ <signature>size(int size)</signature>
</transition>
<transition name="expires">
- <signature>expires()</signature>
+ <signature>expires(long duration, java.util.concurrent.TimeUnit timeUnit)</signature>
</transition>
<transition name="asSingleton">
<signature>asSingleton()</signature>
</transition>
- <transition name="emitEvent">
- <signature>emitEvent()</signature>
+ <transition name="emit">
+ <signature><![CDATA[emit(java.lang.Class<? extends org.apache.s4.base.Event> event)]]></signature>
</transition>
<transition name="onField">
- <signature>onField()</signature>
+ <signature>onField(java.lang.String fieldName)</signature>
</transition>
- <transition name="onKey">
- <signature>onKey()</signature>
+ <transition name="withKeyFinder">
+ <signature><![CDATA[withKeyFinder(org.apache.s4.base.KeyFinder<? extends org.apache.s4.base.Event> keyFinder)]]></signature>
</transition>
+ <transition name="withKey">
+ <signature>withKey(java.lang.String key)</signature>
+ </transition>
<transition name="to">
- <signature>to()</signature>
- </transition>
- <transition name="info">
- <signature>info()</signature>
+ <signature>to(java.lang.String targetPeName)</signature>
</transition>
<transition name="build">
- <return>java.lang.String</return>
+ <return>org.apache.s4.core.App</return>
<signature>build()</signature>
</transition>
</transitions>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/main/java/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/main/java/Main.java b/subprojects/s4-edsl/src/main/java/Main.java
deleted file mode 100644
index b2d50ba..0000000
--- a/subprojects/s4-edsl/src/main/java/Main.java
+++ /dev/null
@@ -1,15 +0,0 @@
-import org.apache.s4.core.edsl.BuilderS4DSL;
-
-public class Main {
- public static BuilderS4DSL build() {
- return new BuilderS4DSL();
- }
-
- public static void main(String[] args) {
-
- String app = new BuilderS4DSL().pe().type().fireOn().ifInterval().cache().size().asSingleton().emitEvent()
- .onField().to().build();
- System.out.println(app);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/AppBuilder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/AppBuilder.java b/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/AppBuilder.java
new file mode 100644
index 0000000..cf7fa69
--- /dev/null
+++ b/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/AppBuilder.java
@@ -0,0 +1,330 @@
+package org.apache.s4.edsl;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+/**
+ * Implementation of the S4 embedded domain-specific language (EDSL).
+ *
+ * <p>
+ * To write an app extend this class and define the application graph using a chain of methods as follows:
+ *
+ * <pre>
+ * final public class MyApp extends BuilderS4DSL {
+ *
+ * protected void onInit() {
+ *
+ * pe("Consumer").type(ConsumerPE.class).asSingleton().
+ * pe("Producer").type(ProducerPE.class).timer().withPeriod(1, TimeUnit.MILLISECONDS).asSingleton().
+ * emit(SomeEvent.class).withKey("someKey").to("Consumer").
+ * build()
+ * }
+ * </pre>
+ *
+ * <p>
+ * A few things to notice:
+ * <ul>
+ * <li>Applications must extend class {@link BuilderS4DSL}
+ * <li>The graph definition is implemented in the {@link App#onInit} method which is called by the container when the
+ * application is loaded.
+ * <li>PEs are defined using strings because they need to be referenced by other parts of the graph. By doing this, we
+ * can create the whole application in a single chain of methods.
+ * <li>To assign target streams to PE fields additional information may need to be provided using the {@code onField}
+ * grammar token when there is an ambiguity. This will happen when a PE has more than one targetStream field with the
+ * same {@link Event} type. Use the construct {@code emit(SomeEvent.class).onField("streamFieldName")}. If the PE
+ * doesn't have a field named {@code "streamField"} whose stream parameter type is {@code someEvent)} then the parser
+ * will fail to build the app.
+ * <li>To configure a PE, set property values by chaining any number of {@code prop(name, value)} methods. The name
+ * should match a PE field and the value will be parsed using the type of that field.
+ * </ul>
+ * <p>
+ * Grammar:
+ *
+ * <pre>
+ * (pe , type , prop* , (fireOn , afterInterval? , afterNumEvents?)? , (timer, withPeriod)? ,
+ * (cache, size , expires? )? , asSingleton? , (emit, onField?,
+ * (withKey|withKeyFinder)?, to+ )* )+ , build
+ * </pre>
+ *
+ * <p>
+ * See the <a href="http://code.google.com/p/diezel">Diezel</a> project for details.
+ *
+ * @author Leo Neumeyer (@leoneu)
+ */
+public class AppBuilder extends App {
+
+ protected App app = this;
+
+ static final Logger logger = LoggerFactory.getLogger(AppBuilder.class);
+
+ private Multimap<ProcessingElement, StreamBuilder<? extends Event>> pe2stream = LinkedListMultimap.create();
+ Set<StreamBuilder<? extends Event>> streamBuilders = Sets.newHashSet();
+
+ /* Variables used to hold values from state to state. */
+ ProcessingElement processingElement;
+ String peName;
+ Class<? extends Event> triggerEventType;
+ long triggerInterval = 0;
+ TimeUnit triggerTimeUnit;
+ int cacheSize;
+ StreamBuilder<? extends Event> streamBuilder;
+ String propertyName, propertyValue;
+
+ public static AppBuilder getAppBuilder() {
+ return new BuilderS4DSL();
+ }
+
+ void addProperty(String name, String value) {
+ propertyName = name;
+ propertyValue = value;
+ setField();
+ }
+
+ void addPe2Stream(ProcessingElement pe, StreamBuilder<? extends Event> st) {
+ pe2stream.put(pe, st);
+ }
+
+ App buildApp() {
+
+ /* Stream to PE writing. */
+ for (StreamBuilder<? extends Event> sb : streamBuilders) {
+ for (String peName : sb.pes) {
+ ProcessingElement pe = getPE(peName);
+ sb.stream.setPE(pe);
+ }
+ }
+
+ /* PE to Stream wiring. */
+ Map<ProcessingElement, Collection<StreamBuilder<? extends Event>>> pe2streamMap = pe2stream.asMap();
+ for (Map.Entry<ProcessingElement, Collection<StreamBuilder<? extends Event>>> entry : pe2streamMap.entrySet()) {
+ ProcessingElement pe = entry.getKey();
+ Collection<StreamBuilder<? extends Event>> streams = entry.getValue();
+
+ if (pe != null && streams != null) {
+ try {
+ setStreamField(pe, streams);
+ } catch (Exception e) {
+ logger.error("Unable to build app.", e);
+ return null;
+ }
+ }
+ }
+
+ return this;
+ }
+
+ /**
+ * @param peName
+ * the peName to set
+ */
+ protected void setPeName(String peName) {
+ this.peName = peName;
+ }
+
+ /*
+ * Cannot create an abstract class in Diezel so for now, I just implement the abstract methods here. They need to be
+ * overloaded by the app developer.
+ */
+ @Override
+ protected void onStart() {
+ }
+
+ @Override
+ protected void onInit() {
+ }
+
+ @Override
+ protected void onClose() {
+ }
+
+ private <T extends ProcessingElement> void setField() {
+
+ logger.debug("Adding property [{}] to PE of type [{}].", propertyName, processingElement.getClass().getName());
+
+ Class<? extends ProcessingElement> type = processingElement.getClass();
+
+ try {
+ Field f = type.getDeclaredField(propertyName);
+ f.setAccessible(true);
+ logger.trace("Type: {}.", f.getType());
+ logger.trace("GenericType: {}.", f.getGenericType());
+
+ /* Set the field. */
+ if (f.getType().getCanonicalName() == "long") {
+ f.setLong(processingElement, Long.parseLong(propertyValue));
+ return;
+ } else if (f.getType().getCanonicalName() == "int") {
+ f.setInt(processingElement, Integer.parseInt(propertyValue));
+ return;
+ } else if (f.getType().getCanonicalName() == "float") {
+ f.setFloat(processingElement, Float.parseFloat(propertyValue));
+ return;
+ } else if (f.getType().getCanonicalName() == "double") {
+ f.setDouble(processingElement, Double.parseDouble(propertyValue));
+ return;
+ } else if (f.getType().getCanonicalName() == "short") {
+ f.setShort(processingElement, Short.parseShort(propertyValue));
+ return;
+ } else if (f.getType().getCanonicalName() == "byte") {
+ f.setByte(processingElement, Byte.parseByte(propertyValue));
+ return;
+ } else if (f.getType().getCanonicalName() == "boolean") {
+ f.setBoolean(processingElement, Boolean.parseBoolean(propertyValue));
+ return;
+ } else if (f.getType().getCanonicalName() == "char") {
+ f.setChar(processingElement, (char) Byte.parseByte(propertyValue));
+ return;
+ } else if (f.getType().getCanonicalName() == "java.lang.String") {
+ f.set(processingElement, propertyValue);
+ return;
+ }
+
+ logger.error("Unable to set field named [{}] in PE of type [{}].", propertyName, type);
+ throw new IllegalArgumentException();
+
+ // production code should handle these exceptions more gracefully
+ } catch (NoSuchFieldException e) {
+ logger.error("There is no field named [{}] in PE of type [{}].", propertyName, type);
+ } catch (Exception e) {
+ logger.error("Couldn't set value for field [{}] in PE of type [{}].", propertyName, type);
+ }
+ }
+
+ /* Set the stream fields in PE classes. Infer the field by checking the stream parameter type <? extends Event>. */
+ private void setStreamField(ProcessingElement pe, Collection<StreamBuilder<? extends Event>> streams)
+ throws Exception {
+
+ /*
+ * Create a map of the stream fields to the corresponding generic type. We will use this info to assign the
+ * streams. If the field type matches the stream type and there is no ambiguity, then the assignment is easy. If
+ * more than one field has the same type, then then we need to do more work.
+ */
+ Field[] fields = pe.getClass().getDeclaredFields();
+ Multimap<String, Field> typeMap = LinkedListMultimap.create();
+ logger.debug("Analyzing PE [{}].", pe.getClass().getName());
+ for (Field field : fields) {
+ logger.trace("Field [{}] is of generic type [{}].", field.getName(), field.getGenericType());
+
+ if (field.getType() == Stream[].class) {
+ logger.debug("Found stream field: {}", field.getGenericType());
+
+ /* Track what fields have streams with the same event type. */
+ String key = field.getGenericType().toString();
+ typeMap.put(key, field);
+ }
+ }
+
+ /* Assign streams to stream fields. */
+ Multimap<Field, Stream<? extends Event>> assignment = LinkedListMultimap.create();
+ for (StreamBuilder<? extends Event> sm : streams) {
+
+ Stream<? extends Event> stream = sm.stream;
+ Class<? extends Event> eventType = sm.type;
+ String key = Stream.class.getCanonicalName() + "<" + eventType.getCanonicalName() + ">[]";
+ if (typeMap.containsKey(key)) {
+ String fieldName;
+ Field field;
+ Collection<Field> streamFields = typeMap.get(key);
+ int numStreamFields = streamFields.size();
+ logger.debug("Found [{}] stream fields for type [{}].", numStreamFields, key);
+
+ if (numStreamFields > 1) {
+
+ /*
+ * There is more than one field that can be used for this stream type. To resolve the ambiguity we
+ * need additional information. The app graph should include the name of the field that should be
+ * used to assign this stream. If the name is missing we bail out.
+ */
+ fieldName = sm.fieldName;
+
+ /* Bail out. */
+ if (fieldName == null) {
+ String msg = String
+ .format("There are [%d] stream fields in PE [%s]. To assign stream [%s] you need to provide the field name in the application graph using the method withFiled(). See Javadocs for an example.",
+ numStreamFields, pe.getClass().getName(), stream.getName());
+ logger.error(msg);
+ throw new Exception(msg);
+ }
+
+ /* Use the provided field name to choose the PE field. */
+ field = pe.getClass().getDeclaredField(fieldName);
+
+ } else {
+
+ /*
+ * The easy case, no ambiguity, we don't need an explicit field name to be provided. We have the
+ * field that matches the stream type.
+ */
+ Iterator<Field> iter = streamFields.iterator();
+ field = iter.next(); // Note that numStreamFields == 1, the size of this collection is 1.
+ logger.debug("Using field [{}].", field.getName());
+ }
+
+ /*
+ * By now, we found the field to use for this stream or we bailed out. We are not ready to finish yet.
+ * There may be more than one stream that needs to be assigned to this field. The stream fields must be
+ * arrays by convention and there may be more than one stream assigned to this fields. For now we create
+ * a multimap from field to streams so we can construct the array in the next step.
+ */
+ assignment.put(field, stream);
+
+ } else {
+
+ /* We couldn't find a match. Tell user to fix the EDSL code. */
+ String msg = String.format(
+ "There is no stream of type [%s] in PE [%s]. I was unable to assign stream [%s].", key, pe
+ .getClass().getName(), stream.getName());
+ logger.error(msg);
+ throw new Exception(msg);
+
+ }
+ }
+ /* Now we construct the array and do the final assignment. */
+
+ Map<Field, Collection<Stream<? extends Event>>> assignmentMap = assignment.asMap();
+ for (Map.Entry<Field, Collection<Stream<? extends Event>>> entry : assignmentMap.entrySet()) {
+ Field f = entry.getKey();
+
+ int arraySize = entry.getValue().size();
+ @SuppressWarnings("unchecked")
+ Stream<? extends Event> streamArray[] = (Stream<? extends Event>[]) Array.newInstance(Stream.class,
+ arraySize);
+ int i = 0;
+ for (Stream<? extends Event> s : entry.getValue()) {
+ streamArray[i++] = s;
+
+ f.setAccessible(true);
+ f.set(pe, streamArray);
+ logger.debug("Assigned [{}] streams to field [{}].", streamArray.length, f.getName());
+ }
+ }
+ }
+
+ void clearPEState() {
+ propertyName = null;
+ propertyValue = null;
+ processingElement = null;
+ peName = null;
+ triggerEventType = null;
+ triggerTimeUnit = null;
+ cacheSize = -1;
+ streamBuilder = null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/StreamBuilder.java b/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/StreamBuilder.java
new file mode 100644
index 0000000..9412855
--- /dev/null
+++ b/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/StreamBuilder.java
@@ -0,0 +1,75 @@
+package org.apache.s4.edsl;
+
+import java.util.Set;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+import org.apache.s4.core.Stream;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+/**
+ * Helper class to add a stream to an S4 application. This class and methods are private package. No need for app
+ * developers to see this class.
+ *
+ */
+class StreamBuilder<T extends Event> {
+
+ Class<T> type;
+ String fieldName;
+ Stream<T> stream;
+ Set<String> pes = Sets.newHashSet();
+
+ StreamBuilder(App app, Class<T> type) {
+
+ Preconditions.checkNotNull(type);
+ this.type = type;
+ stream = app.createStream(type);
+ stream.setName(type.getCanonicalName()); // Default name.
+ }
+
+ void setEventType(Class<T> type) {
+ this.type = type;
+ }
+
+ /**
+ * Name the stream.
+ *
+ * @param name
+ * the stream name, default is an empty string.
+ * @return the stream maker object
+ */
+ void setName(String name) {
+ stream.setName(name);
+ }
+
+ /**
+ * Define the key finder for this stream.
+ *
+ * @param keyFinder
+ * a function to lookup the value of the key.
+ */
+ @SuppressWarnings("unchecked")
+ void setKeyFinder(KeyFinder<?> keyFinder) {
+ stream.setKey((KeyFinder<T>) keyFinder);
+ stream.setName(type.getCanonicalName() + "," + keyFinder.getClass().getCanonicalName());
+ }
+
+ void setKey(String keyDescriptor) {
+
+ stream.setKey(keyDescriptor);
+ stream.setName(type.getCanonicalName() + "," + keyDescriptor);
+ }
+
+ // Not all PE may have been created, we use PE Name as a placeholder. The PE prototypes will be assigned in the
+ // buildApp() method in AppBuilder.
+ void to(String peName) {
+ pes.add(peName);
+ }
+
+ void setFieldName(String fieldName) {
+ this.fieldName = fieldName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/DurationKeyFinder.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/DurationKeyFinder.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/DurationKeyFinder.java
new file mode 100644
index 0000000..78bddb0
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/DurationKeyFinder.java
@@ -0,0 +1,19 @@
+package org.apache.s4.edsl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.base.KeyFinder;
+
+public class DurationKeyFinder implements KeyFinder<EventA> {
+
+ public List<String> get(EventA event) {
+
+ List<String> results = new ArrayList<String>();
+
+ /* Retrieve the gender and add it to the list. */
+ results.add(Long.toString(event.getDuration()));
+
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/EventA.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/EventA.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/EventA.java
new file mode 100644
index 0000000..c881ea5
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/EventA.java
@@ -0,0 +1,40 @@
+package org.apache.s4.edsl;
+
+import org.apache.s4.base.Event;
+
+public class EventA extends Event {
+
+ private long duration;
+ private int height;
+
+ /**
+ * @return the duration
+ */
+ public long getDuration() {
+ return duration;
+ }
+
+ /**
+ * @param duration
+ * the duration to set
+ */
+ public void setDuration(long duration) {
+ this.duration = duration;
+ }
+
+ /**
+ * @return the height
+ */
+ public int getHeight() {
+ return height;
+ }
+
+ /**
+ * @param height
+ * the height to set
+ */
+ public void setHeight(int height) {
+ this.height = height;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/6e973667/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/EventB.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/EventB.java b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/EventB.java
new file mode 100644
index 0000000..0ebe91e
--- /dev/null
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/EventB.java
@@ -0,0 +1,24 @@
+package org.apache.s4.edsl;
+
+import org.apache.s4.base.Event;
+
+public class EventB extends Event {
+
+ private String query;
+
+ /**
+ * @return the query
+ */
+ public String getQuery() {
+ return query;
+ }
+
+ /**
+ * @param query
+ * the query to set
+ */
+ public void setQuery(String query) {
+ this.query = query;
+ }
+
+}