You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/15 16:06:01 UTC
[2/22] Merge branch 'S4-22' into piper
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 6446a7c,935d4a5..fe4ceed
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@@ -15,9 -15,8 +15,9 @@@
*/
package org.apache.s4.core;
- import java.util.Arrays;
- import java.util.Collection;
+ import java.util.ArrayList;
+ import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.s4.base.Event;
@@@ -26,32 -29,28 +30,34 @@@ import org.apache.s4.core.App.ClockType
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
- import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Maps;
- import com.google.common.collect.Multimap;
- import com.google.inject.AbstractModule;
- import com.google.inject.Guice;
import com.google.inject.Inject;
- import com.google.inject.Injector;
+ import com.google.inject.name.Named;
/*
- * Container base class to hold all processing elements. We will implement administrative methods here.
+ * Container base class to hold all processing elements. We will implement administrative methods here.
*/
public abstract class App {
static final Logger logger = LoggerFactory.getLogger(App.class);
- /* PE prototype to streams relations. */
- final private Multimap<ProcessingElement, Streamable<? extends Event>> pe2stream = LinkedListMultimap.create();
+ /* All the PE prototypes in this app. */
+ final private List<ProcessingElement> pePrototypes = new ArrayList<ProcessingElement>();
- /* Stream to PE prototype relations. */
- final private Multimap<Streamable<? extends Event>, ProcessingElement> stream2pe = LinkedListMultimap.create();
++ // /* Stream to PE prototype relations. */
++ // final private Multimap<Streamable<? extends Event>, ProcessingElement> stream2pe = LinkedListMultimap.create();
+ /* All the internal streams in this app. */
+ final private List<Streamable<Event>> streams = new ArrayList<Streamable<Event>>();
+
+ /* All the the event sources exported by this app. */
+ final private List<EventSource> eventSources = new ArrayList<EventSource>();
+ /* Pes indexed by name. */
+ Map<String, ProcessingElement> peByName = Maps.newHashMap();
+
private ClockType clockType = ClockType.WALL_CLOCK;
private int id = -1;
+
@Inject
private Sender sender;
@Inject
@@@ -90,33 -103,44 +110,59 @@@
this.id = id;
}
- /**
- * @return all the pePrototypes
- */
- Collection<ProcessingElement> getPePrototypes() {
- return pe2stream.keySet();
+ /* Should only be used within the core package. */
+ void addPEPrototype(ProcessingElement pePrototype) {
+ pePrototypes.add(pePrototype);
}
- /**
- * @return all the pePrototypes
- */
- <T extends Event> Collection<ProcessingElement> getTargetPEs(Stream<T> stream) {
++ public ProcessingElement getPE(String name) {
++
++ return peByName.get(name);
++ }
++
+ /* Should only be used within the core package. */
+ public void addStream(Streamable stream) {
+ streams.add(stream);
+ }
+
+ /* Should only be used within the core package. */
+ void addEventSource(EventSource es) {
+ eventSources.add(es);
+ }
+
+ /* Returns list of PE prototypes. Should only be used within the core package. */
+ List<ProcessingElement> getPePrototypes() {
+ return pePrototypes;
+ }
- Map<Streamable<?>, Collection<ProcessingElement>> stream2peMap = stream2pe.asMap();
++ // void addStream(Streamable<? extends Event> stream, ProcessingElement pePrototype) {
++ // logger.info("Add Stream [{}] with PE prototype [{}].", toString(stream), toString(pePrototype));
++ // stream2pe.put(stream, pePrototype);
++ //
++ // }
++
+ /* Returns list of internal streams. Should only be used within the core package. */
+ // TODO visibility
+ public List<Streamable<Event>> getStreams() {
+ return streams;
+ }
- return stream2peMap.get(stream);
+ /* Returns list of the event sources to be exported. Should only be used within the core package. */
+ // TODO visibility
+ public List<EventSource> getEventSources() {
+ return eventSources;
}
protected abstract void onStart();
+ /**
+ * This method is called by the container after initialization. Once this method is called, threads get started and
+ * events start flowing.
+ */
- protected void start() {
-
- logger.info("Prepare to start App [{}].", getClass().getName());
+ public final void start() {
+ // logger.info("Prepare to start App [{}].", getClass().getName());
+ //
/* Start all streams. */
for (Streamable<? extends Event> stream : getStreams()) {
stream.start();
@@@ -131,22 -155,16 +177,22 @@@
onStart();
}
+ /**
+ * This method is called by the container to initialize applications.
+ */
protected abstract void onInit();
- protected void init() {
+ public final void init() {
onInit();
}
+ /**
+ * This method is called by the container before unloading the application.
+ */
protected abstract void onClose();
- protected void close() {
+ public final void close() {
onClose();
removeAll();
@@@ -169,31 -181,24 +215,18 @@@
}
- /* Get the set of streams and close them. */
- for (Streamable<?> stream : getStreams()) {
- logger.trace("Closing stream [{}].", stream.getName());
- stream.close();
- }
-
/* Finally remove the entire app graph. */
logger.trace("Clear app graph.");
- pe2stream.clear();
- stream2pe.clear();
- }
-
- void addPEPrototype(ProcessingElement pePrototype, Stream<? extends Event> stream) {
- logger.info("Add PE prototype [{}] with stream [{}].", toString(pePrototype), toString(stream));
- pe2stream.put(pePrototype, stream);
+ pePrototypes.clear();
+ streams.clear();
}
- public ProcessingElement getPE(String name) {
-
- return peByName.get(name);
- }
-
- void addStream(Streamable<? extends Event> stream, ProcessingElement pePrototype) {
- logger.info("Add Stream [{}] with PE prototype [{}].", toString(stream), toString(pePrototype));
- stream2pe.put(stream, pePrototype);
+ void addPEPrototype(ProcessingElement pePrototype, Stream<? extends Event> stream) {
- }
+ // logger.info("Add PE prototype [{}] with stream [{}].", toString(pePrototype), toString(stream));
+ pePrototypes.add(pePrototype);
- Collection<Streamable<? extends Event>> getStreams() {
- return stream2pe.keySet();
}
/**
@@@ -300,32 -309,30 +337,53 @@@
*/
protected <T extends Event> Stream<T> createStream(String name, ProcessingElement... processingElements) {
- return new Stream<T>(this, name, processingElements);
+ return new Stream<T>(this).setName(name).setPEs(processingElements);
+ }
+
+ /**
+ * Creates stream with default values. Use the builder methods to configure the stream. Example:
+ * <p>
+ *
+ * <pre>
+ * s1 = <SampleEvent> createStream().withName("My first stream.").withKey(new AKeyFinder()).to(somePE);
+ * </pre>
+ *
+ * <p>
+ *
+ * @param name
+ * the name of the stream
+ * @param processingElements
+ * the target processing elements
+ * @return the stream
+ */
+ public <T extends Event> Stream<T> createStream(Class<T> type) {
+
+ Stream<T> stream = new Stream<T>(this);
+ stream.setEventType(type);
+ return stream;
}
+ protected <T extends Event> RemoteStream createOutputStream(String name) {
+ return createOutputStream(name, null);
+ }
+
+ protected <T extends Event> RemoteStream createOutputStream(String name, KeyFinder<Event> finder) {
+ return new RemoteStream(this, name, finder, remoteSenders, hasher, remoteStreams, clusterName);
+ }
+
+ protected <T extends Event> Stream<T> createInputStream(String streamName, KeyFinder<T> finder,
+ ProcessingElement... processingElements) {
+ remoteStreams.addInputStream(getId(), clusterName, streamName);
+ return createStream(streamName, finder, processingElements);
+
+ }
+
+ protected <T extends Event> Stream<T> createInputStream(String streamName, ProcessingElement... processingElements) {
+ remoteStreams.addInputStream(getId(), clusterName, streamName);
+ return createStream(streamName, processingElements);
+
+ }
+
/**
* Creates a {@link ProcessingElement} prototype.
*
@@@ -340,29 -345,34 +398,49 @@@
try {
// TODO: make sure this doesn't crash if PE has a constructor other than with App as argument.
Class<?>[] types = new Class<?>[] { App.class };
- T pe = type.getDeclaredConstructor(types).newInstance(this);
- pe.setName(name);
- return pe;
-
+ try {
+ T pe = type.getDeclaredConstructor(types).newInstance(this);
++ pe.setName(name);
+ return pe;
+ } catch (NoSuchMethodException e) {
+ // no such constructor. Use the setter
+ T pe = type.getDeclaredConstructor(new Class[] {}).newInstance();
+ pe.setApp(this);
++ pe.setName(name);
+ return pe;
+ }
} catch (Exception e) {
logger.error(e.getMessage(), e);
return null;
}
}
+ /**
+ * Creates a {@link ProcessingElement} prototype.
+ *
+ * @param type
+ * the processing element type.
+ * @return the processing element prototype.
+ */
+ public <T extends ProcessingElement> T createPE(Class<T> type) {
+
+ return createPE(type, null);
+
+ }
+
+ public <T extends WindowingPE<?>> T createWindowingPE(Class<T> type, long slotDuration, TimeUnit timeUnit,
+ int numSlots) {
+ try {
+ Class<?>[] types = new Class<?>[] { App.class, long.class, TimeUnit.class, int.class };
+ T pe = type.getDeclaredConstructor(types).newInstance(
+ new Object[] { this, slotDuration, timeUnit, numSlots });
+ return pe;
+ } catch (Exception e) {
+ logger.error("Cannot instantiate pe for class [{}]", type.getName(), e);
+ return null;
+ }
+ }
+
static private String toString(ProcessingElement pe) {
return pe != null ? pe.getClass().getName() + " " : "null ";
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 152f24e,1576044..df1b720
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@@ -39,43 -39,40 +39,39 @@@ import com.google.common.collect.MapMak
import com.google.common.collect.Maps;
/**
- * @author Leo Neumeyer
- * @author Matthieu Morel
- * <p>
- * Base class for implementing processing in S4. All instances are organized as follows:
- * <ul>
- * <li>A PE prototype is a special type of instance that, along with {@link Stream} defines the topology of the
- * application graph.
- * <li>PE prototypes manage the creation and destruction of PE instances.
- * <li>All PE instances are clones of a PE prototype.
- * <li>PE instances are associated with a unique key.
- * <li>PE instances do the actual work by processing any number of input events of various types and emit output
- * events of various types.
- * <li>To process events, {@code ProcessingElement} dynamically matches an event type to a processing method.
- * See {@link org.apache.s4.core.gen.OverloadDispatcher} . There are two types of processing methods:
- * <ul>
- * <li>{@code onEvent(SomeEvent event)} When implemented, input events of type {@code SomeEvent} will be
- * dispatched to this method.
- * <li>{@code onTrigger(AnotherEvent event)} When implemented, input events of type {@code AnotherEvent} will be
- * dispatched to this method when certain conditions are met. See
- * {@link #setTrigger(Class, int, long, TimeUnit)}.
- * </ul>
- * <li>
- * A PE implementation must not create threads. A periodic task can be implemented by overloading the
- * {@link #onTime()} method. See {@link #setTimerInterval(long, TimeUnit)}
- * <li>If a reference in the PE prototype shared by the PE instances, the object must be thread safe.
- * <li>The code in a PE instance is synchronized by the framework to avoid concurrency problems.
- * <li>In some special cases, it may be desirable to allow concurrency in the PE instance. For example, there
- * may be several event processing methods that can safely run concurrently. To enable concurrency, annotate the
- * implementation of {@code ProcessingElement} with {@link ThreadSafe}.
- * <li>PE instances never use the constructor. They must be initialized by implementing the {@link #onCreate()}
- * method.
- * <li>PE class fields are cloned from the prototype. References are also copied which means that if the
- * prototype creates a collection object, all instances will be sharing the same collection object which is
- * usually <em>NOT</em> what the programmer intended . The application developer is responsible for initializing
- * objects in the {@link #onCreate()} method. For example, if each instance requires a
- * <tt>List<tt/> object the PE should implement the following:
+ * <p>
+ * Base class for implementing processing in S4. All instances are organized as follows:
+ * <ul>
+ * <li>A PE prototype is a special type of instance that, along with {@link Stream} defines the topology of the
+ * application graph.
+ * <li>PE prototypes manage the creation and destruction of PE instances.
+ * <li>All PE instances are clones of a PE prototype.
+ * <li>PE instances are associated with a unique key.
+ * <li>PE instances do the actual work by processing any number of input events of various types and emit output events
+ * of various types.
+ * <li>To process events, {@code ProcessingElement} dynamically matches an event type to a processing method. See
+ * {@link org.apache.s4.core.gen.OverloadDispatcher} . There are two types of processing methods:
+ * <ul>
+ * <li>{@code onEvent(SomeEvent event)} When implemented, input events of type {@code SomeEvent} will be dispatched to
+ * this method.
+ * <li>{@code onTrigger(AnotherEvent event)} When implemented, input events of type {@code AnotherEvent} will be
+ * dispatched to this method when certain conditions are met. See {@link #setTrigger(Class, int, long, TimeUnit)}.
+ * </ul>
+ * <li>
+ * A PE implementation must not create threads. A periodic task can be implemented by overloading the {@link #onTime()}
+ * method. See {@link #setTimerInterval(long, TimeUnit)}
+ * <li>If a reference in the PE prototype shared by the PE instances, the object must be thread safe.
+ * <li>The code in a PE instance is synchronized by the framework to avoid concurrency problems.
+ * <li>In some special cases, it may be desirable to allow concurrency in the PE instance. For example, there may be
+ * several event processing methods that can safely run concurrently. To enable concurrency, annotate the implementation
+ * of {@code ProcessingElement} with {@link ThreadSafe}.
+ * <li>PE instances never use the constructor. They must be initialized by implementing the {@link #onCreate()} method.
+ * <li>PE class fields are cloned from the prototype. References are also copied which means that if the prototype
+ * creates a collection object, all instances will be sharing the same collection object which is usually <em>NOT</em>
+ * what the programmer intended . The application developer is responsible for initializing objects in the
+ * {@link #onCreate()} method. For example, if each instance requires a
+ * <tt>List<tt/> object the PE should implement the following:
* <pre>
- * {@code
* public class MyPE extends ProcessingElement {
*
* private Map<String, Integer> wordCount;
@@@ -377,9 -356,6 +382,9 @@@ public abstract class ProcessingElemen
return this;
timer = new Timer();
+ logger.info("Created timer for PE prototype [{}] with interval [{}].", this.getClass().getName(),
+ timerIntervalInMilliseconds);
-
++ timer.schedule(new OnTimeTask(), 0, timerIntervalInMilliseconds);
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index 1790e44,8e04fc4..96b7b62
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@@ -21,15 -5,11 +5,14 @@@ import java.util.concurrent.ArrayBlocki
import java.util.concurrent.BlockingQueue;
import org.apache.s4.base.Event;
+ import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.GenericKeyFinder;
- import org.apache.s4.base.Key;
import org.apache.s4.base.KeyFinder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
- import com.google.common.collect.Sets;
+
/**
* {@link Stream} and {@link ProcessingElement} objects represent the links and nodes in the application graph. A stream
* sends an {@link Event} object to {@link ProcessingElement} instances located anywhere in a cluster.
@@@ -45,16 -25,15 +28,16 @@@ public class Stream<T extends Event> im
final static private String DEFAULT_SEPARATOR = "^";
final static private int CAPACITY = 1000;
private static int idCounter = 0;
- private String name = "";
- private Key<T> key = null;
- private ProcessingElement[] targetPEs = null;
- final private BlockingQueue<T> queue = new ArrayBlockingQueue<T>(CAPACITY);
- final private String name;
- final protected Key<T> key;
- final private ProcessingElement[] targetPEs;
++ private String name;
++ protected Key<T> key;
++ private ProcessingElement[] targetPEs;
+ protected final BlockingQueue<EventMessage> queue = new ArrayBlockingQueue<EventMessage>(CAPACITY);
private Thread thread;
final private Sender sender;
final private Receiver receiver;
- final private int id;
+ // final private int id;
final private App app;
+ private Class<T> eventType = null;
/**
* Send events using a {@link KeyFinder<T>}. The key finder extracts the value of the key which is used to determine
@@@ -62,38 -41,33 +45,27 @@@
*
* @param app
* we always register streams with the parent application.
-- * @param name
-- * give this stream a meaningful name in the context of your application.
-- * @param finder
-- * the finder object to find the value of the key in an event.
-- * @param processingElements
-- * the target PE prototypes for this stream.
*/
- public Stream(App app, String name, KeyFinder<T> finder, ProcessingElement... processingElements) {
+ public Stream(App app) {
- synchronized (Stream.class) {
- id = idCounter++;
- }
+ // synchronized (Stream.class) {
+ // id = idCounter++;
+ // }
this.app = app;
- app.addStream(this, null);
-
+ app.addStream(this);
- this.name = name;
-
- if (finder == null) {
- this.key = null;
- } else {
- this.key = new Key<T>(finder, DEFAULT_SEPARATOR);
- }
this.sender = app.getSender();
this.receiver = app.getReceiver();
- this.targetPEs = processingElements;
}
- void start() {
-
- /* Get target PE prototypes for this stream. Remove null key. */
- Set<? extends ProcessingElement> pes = Sets.newHashSet(app.getTargetPEs(this));
- pes.remove(null);
- targetPEs = new ProcessingElement[pes.size()];
- pes.toArray(targetPEs);
+ public void start() {
+ if (logger.isTraceEnabled()) {
- for (ProcessingElement pe : pes) {
- logger.trace("Starting stream [{}] with target PE [{}].", this.getName(), pe.getName());
++ if (targetPEs != null) {
++ for (ProcessingElement pe : targetPEs) {
++ logger.trace("Starting stream [{}] with target PE [{}].", this.getName(), pe.getName());
++ }
+ }
+ }
+
/* Start streaming. */
thread = new Thread(this, name);
thread.start();
@@@ -101,78 -75,18 +73,74 @@@
}
/**
- * Stop and close this stream.
- */
- void close() {
- thread.interrupt();
- }
-
- /**
- * Send events to all available {@link ProcessingElement} instances contained by the {@link ProcessingElement}
- * prototypes passed to this constructor.
+ * Name the stream.
*
- * @param app
- * we always register streams with the parent application.
* @param name
- * give this stream a meaningful name in the context of your application.
- * @param processingElements
- * the target PE prototypes for this stream.
+ * the stream name, default is an empty string.
+ * @return the stream object
+ */
+ public Stream<T> setName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ /**
+ * Define the key finder for this stream.
+ *
+ * @param keyFinder
+ * a function to lookup the value of the key.
+ * @return the stream object
+ */
+ public Stream<T> setKey(KeyFinder<T> keyFinder) {
+ this.key = new Key<T>(keyFinder, DEFAULT_SEPARATOR);
+ return this;
+ }
+
++ void setEventType(Class<T> type) {
++ this.eventType = type;
++ }
++
+ /**
+ * Define the key finder for this stream using a descriptor.
+ *
+ * @param keyFinderString
+ * a descriptor to lookup up the value of the key.
+ * @return the stream object
+ */
+ public Stream<T> setKey(String keyName) {
+
+ Preconditions.checkNotNull(eventType);
+
+ KeyFinder<T> kf = new GenericKeyFinder<T>(keyName, eventType);
+ setKey(kf);
+
+ return this;
+ }
+
+ /**
+ * Send events from this stream to a PE.
+ *
+ * @param pe
+ * a target PE.
+ *
+ * @return the stream object
+ */
+ public Stream<T> setPE(ProcessingElement pe) {
- app.addStream(this, pe);
++ app.addStream(this);
+ return this;
+ }
+
+ /**
+ * Send events from this stream to various PEs.
+ *
+ * @param pe
+ * a target PE array.
+ *
+ * @return the stream object
*/
- public Stream(App app, String name, ProcessingElement... processingElements) {
- this(app, name, null, processingElements);
+ public Stream<T> setPEs(ProcessingElement[] pes) {
- for (int i = 0; i < pes.length; i++)
- app.addStream(this, pes[i]);
++ this.targetPEs = pes;
+ return this;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 1e26512,4eae82b..8601d91
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@@ -141,11 -121,11 +121,11 @@@ public class TestAutomaticDeployment
if (!initial) {
ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
- zkClient.create("/" + clusterName + "/apps/testApp", record, CreateMode.PERSISTENT);
+ zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/testApp", record, CreateMode.PERSISTENT);
}
-- Assert.assertTrue(signalAppInitialized.await(10, TimeUnit.SECONDS));
-- Assert.assertTrue(signalAppStarted.await(10, TimeUnit.SECONDS));
++ Assert.assertTrue(signalAppInitialized.await(20, TimeUnit.SECONDS));
++ Assert.assertTrue(signalAppStarted.await(20, TimeUnit.SECONDS));
String time1 = String.valueOf(System.currentTimeMillis());
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
index 011f203,0000000..ff3fa02
mode 100644,000000..100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
@@@ -1,68 -1,0 +1,67 @@@
+package org.apache.s4.deploy;
+
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
- import org.apache.s4.comm.topology.Topology;
- import org.apache.s4.comm.topology.TopologyFromZK;
++import org.apache.s4.comm.topology.ClusterFromZK;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+public class TestModule extends AbstractModule {
+
+ private PropertiesConfiguration config;
+
+ private void loadProperties(Binder binder) {
+
+ try {
+ InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
+ config = new PropertiesConfiguration();
+ config.load(is);
+ System.out.println(ConfigurationUtils.toString(config));
+ // TODO - validate properties.
+
+ /* Make all properties injectable. Do we need this? */
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ protected void configure() {
+ if (config == null) {
+ loadProperties(binder());
+ }
+ bind(Cluster.class);
+ bind(Hasher.class).to(DefaultHasher.class);
+ bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+ bind(Assignment.class).to(AssignmentFromZK.class);
- bind(Topology.class).to(TopologyFromZK.class);
++ bind(Cluster.class).to(ClusterFromZK.class);
+ bind(Emitter.class).to(TCPEmitter.class);
+ bind(Listener.class).to(TCPListener.class);
+
+ bind(Integer.class).annotatedWith(Names.named("comm.retries")).toInstance(10);
+ bind(Integer.class).annotatedWith(Names.named("comm.retry_delay")).toInstance(10);
+ bind(Integer.class).annotatedWith(Names.named("comm.timeout")).toInstance(1000);
+
+ bind(Integer.class).annotatedWith(Names.named("tcp.partition.queue_size")).toInstance(256);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-edsl/s4-edsl.gradle
----------------------------------------------------------------------
diff --cc subprojects/s4-edsl/s4-edsl.gradle
index 208c9d4,0000000..71f232e
mode 100644,000000..100644
--- a/subprojects/s4-edsl/s4-edsl.gradle
+++ b/subprojects/s4-edsl/s4-edsl.gradle
@@@ -1,51 -1,0 +1,51 @@@
+// Use the Diezel Maven plugin to build the project.
+// Gradle doesn't support Maven plugin as of version 1.0
+// Useful article: http://forums.gradle.org/gradle/topics/how_to_download_and_evoke_a_maven_plugin
+
+def generatedSourceDir = "${buildDir}/generated-src/java"
+def diezelSrcDir = "${projectDir}/src/main/diezel";
+
+dependencies {
+ compile project(":s4-core")
- testCompile project(path: ':s4-core', configuration: 'tests')
++ //testCompile project(path: ':s4-core', configuration: 'tests')
+}
+
+
+sourceSets {
+ generated {
+ java { srcDir generatedSourceDir }
+ }
+}
+
+buildscript {
+ repositories {
+ mavenCentral()
+
+ maven {
+ url "http://oss.sonatype.org/content/repositories/snapshots"
+ }
+ maven {
+ url "http://oss.sonatype.org/content/repositories/releases"
+ }
+ }
+ dependencies {
+ classpath libraries.diezel
+ }
+}
+task generateSources << {
+ outputs.dir generatedSourceDir
+ def mojo = new net.ericaro.diezel.plugin.DiezelMojo()
+ mojo.sourceDirectory = new File(diezelSrcDir);
+ mojo.outputDirectory = new File(generatedSourceDir);
+ mojo.staleMillis = 0;
+ mojo.project = new org.apache.maven.project.MavenProject();
+ org.apache.maven.model.Build build = new org.apache.maven.model.Build();
+ build.setDirectory(mojo.sourceDirectory.getAbsolutePath());
+ mojo.project.setBuild(build);
+ mojo.execute();
+}
+
+compileJava.source generateSources.outputs.files, sourceSets.main.java
+
+eclipseClasspath.dependsOn generateSources
+
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/AppBuilder.java
----------------------------------------------------------------------
diff --cc subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/AppBuilder.java
index 181b0f8,0000000..e823101
mode 100644,000000..100644
--- a/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/AppBuilder.java
+++ b/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/AppBuilder.java
@@@ -1,330 -1,0 +1,330 @@@
+package org.apache.s4.edsl;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+/**
+ * Implementation of the S4 embedded domain-specific language (EDSL).
+ *
+ * <p>
+ * To write an app extend this class and define the application graph using a chain of methods as follows:
+ *
+ * <pre>
+ * final public class MyApp extends BuilderS4DSL {
+ *
+ * protected void onInit() {
+ *
+ * pe("Consumer").type(ConsumerPE.class).asSingleton().
+ * pe("Producer").type(ProducerPE.class).timer().withPeriod(1, TimeUnit.MILLISECONDS).asSingleton().
+ * emit(SomeEvent.class).withKey("someKey").to("Consumer").
+ * build()
+ * }
+ * </pre>
+ *
+ * <p>
+ * A few things to notice:
+ * <ul>
+ * <li>Applications must extend class {@link BuilderS4DSL}
+ * <li>The graph definition is implemented in the {@link App#onInit} method which is called by the container when the
+ * application is loaded.
+ * <li>PEs are defined using strings because they need to be referenced by other parts of the graph. By doing this, we
+ * can create the whole application in a single chain of methods.
+ * <li>To assign target streams to PE fields additional information may need to be provided using the {@code onField}
+ * grammar token when there is an ambiguity. This will happen when a PE has more than one targetStream field with the
+ * same {@link Event} type. Use the construct {@code emit(SomeEvent.class).onField("streamFieldName")}. If the PE
+ * doesn't have a field named {@code "streamField"} whose stream parameter type is {@code someEvent)} then the parser
+ * will fail to build the app.
+ * <li>To configure a PE, set property values by chaining any number of {@code prop(name, value)} methods. The name
+ * should match a PE field and the value will be parsed using the type of that field.
+ * </ul>
+ * <p>
+ * Grammar:
+ *
+ * <pre>
- * (pe , type , prop* , (fireOn , afterInterval? , afterNumEvents?)? , (timer, withPeriod)? ,
- * (cache, size , expires? )? , asSingleton? , (emit, onField?,
++ * (pe , type , prop* , (fireOn , afterInterval? , afterNumEvents?)? , (timer, withPeriod)? ,
++ * (cache, size , expires? )? , asSingleton? , (emit, onField?,
+ * (withKey|withKeyFinder)?, to )* )+ , build
+ * </pre>
+ *
+ * <p>
+ * See the <a href="http://code.google.com/p/diezel">Diezel</a> project for details.
+ *
+ * @author Leo Neumeyer (@leoneu)
+ */
+public class AppBuilder extends App {
+
+ protected App app = this;
+
+ static final Logger logger = LoggerFactory.getLogger(AppBuilder.class);
+
+ private Multimap<ProcessingElement, StreamBuilder<? extends Event>> pe2stream = LinkedListMultimap.create();
+ Set<StreamBuilder<? extends Event>> streamBuilders = Sets.newHashSet();
+
+ /* Variables used to hold values from state to state. */
+ ProcessingElement processingElement;
+ String peName;
+ Class<? extends Event> triggerEventType;
+ long triggerInterval = 0;
+ TimeUnit triggerTimeUnit;
+ int cacheSize;
+ StreamBuilder<? extends Event> streamBuilder;
+ String propertyName, propertyValue;
+
+ public static AppBuilder getAppBuilder() {
+ return new BuilderS4DSL();
+ }
+
+ void addProperty(String name, String value) {
+ propertyName = name;
+ propertyValue = value;
+ setField();
+ }
+
+ void addPe2Stream(ProcessingElement pe, StreamBuilder<? extends Event> st) {
+ pe2stream.put(pe, st);
+ }
+
+ App buildApp() {
+
+ /* Stream to PE writing. */
+ for (StreamBuilder<? extends Event> sb : streamBuilders) {
+ for (String peName : sb.pes) {
+ ProcessingElement pe = getPE(peName);
+ sb.stream.setPE(pe);
+ }
+ }
+
+ /* PE to Stream wiring. */
+ Map<ProcessingElement, Collection<StreamBuilder<? extends Event>>> pe2streamMap = pe2stream.asMap();
+ for (Map.Entry<ProcessingElement, Collection<StreamBuilder<? extends Event>>> entry : pe2streamMap.entrySet()) {
+ ProcessingElement pe = entry.getKey();
+ Collection<StreamBuilder<? extends Event>> streams = entry.getValue();
+
+ if (pe != null && streams != null) {
+ try {
+ setStreamField(pe, streams);
+ } catch (Exception e) {
+ logger.error("Unable to build app.", e);
+ return null;
+ }
+ }
+ }
+
+ return this;
+ }
+
+ /**
+ * @param peName
+ * the peName to set
+ */
+ protected void setPeName(String peName) {
+ this.peName = peName;
+ }
+
+ /*
+ * Cannot create an abstract class in Diezel so for now, I just implement the abstract methods here. They need to be
+ * overloaded by the app developer.
+ */
+ @Override
+ protected void onStart() {
+ }
+
+ @Override
+ protected void onInit() {
+ }
+
+ @Override
+ protected void onClose() {
+ }
+
+ private <T extends ProcessingElement> void setField() {
+
+ logger.debug("Adding property [{}] to PE of type [{}].", propertyName, processingElement.getClass().getName());
+
+ Class<? extends ProcessingElement> type = processingElement.getClass();
+
+ try {
+ Field f = type.getDeclaredField(propertyName);
+ f.setAccessible(true);
+ logger.trace("Type: {}.", f.getType());
+ logger.trace("GenericType: {}.", f.getGenericType());
+
+ /* Set the field. */
+ if (f.getType().getCanonicalName() == "long") {
+ f.setLong(processingElement, Long.parseLong(propertyValue));
+ return;
+ } else if (f.getType().getCanonicalName() == "int") {
+ f.setInt(processingElement, Integer.parseInt(propertyValue));
+ return;
+ } else if (f.getType().getCanonicalName() == "float") {
+ f.setFloat(processingElement, Float.parseFloat(propertyValue));
+ return;
+ } else if (f.getType().getCanonicalName() == "double") {
+ f.setDouble(processingElement, Double.parseDouble(propertyValue));
+ return;
+ } else if (f.getType().getCanonicalName() == "short") {
+ f.setShort(processingElement, Short.parseShort(propertyValue));
+ return;
+ } else if (f.getType().getCanonicalName() == "byte") {
+ f.setByte(processingElement, Byte.parseByte(propertyValue));
+ return;
+ } else if (f.getType().getCanonicalName() == "boolean") {
+ f.setBoolean(processingElement, Boolean.parseBoolean(propertyValue));
+ return;
+ } else if (f.getType().getCanonicalName() == "char") {
+ f.setChar(processingElement, (char) Byte.parseByte(propertyValue));
+ return;
+ } else if (f.getType().getCanonicalName() == "java.lang.String") {
+ f.set(processingElement, propertyValue);
+ return;
+ }
+
+ logger.error("Unable to set field named [{}] in PE of type [{}].", propertyName, type);
+ throw new IllegalArgumentException();
+
+ // production code should handle these exceptions more gracefully
+ } catch (NoSuchFieldException e) {
+ logger.error("There is no field named [{}] in PE of type [{}].", propertyName, type);
+ } catch (Exception e) {
+ logger.error("Couldn't set value for field [{}] in PE of type [{}].", propertyName, type);
+ }
+ }
+
+ /* Set the stream fields in PE classes. Infer the field by checking the stream parameter type <? extends Event>. */
+ private void setStreamField(ProcessingElement pe, Collection<StreamBuilder<? extends Event>> streams)
+ throws Exception {
+
+ /*
+ * Create a map of the stream fields to the corresponding generic type. We will use this info to assign the
+ * streams. If the field type matches the stream type and there is no ambiguity, then the assignment is easy. If
+ * more than one field has the same type, then then we need to do more work.
+ */
+ Field[] fields = pe.getClass().getDeclaredFields();
+ Multimap<String, Field> typeMap = LinkedListMultimap.create();
+ logger.debug("Analyzing PE [{}].", pe.getClass().getName());
+ for (Field field : fields) {
+ logger.trace("Field [{}] is of generic type [{}].", field.getName(), field.getGenericType());
+
+ if (field.getType() == Stream[].class) {
+ logger.debug("Found stream field: {}", field.getGenericType());
+
+ /* Track what fields have streams with the same event type. */
+ String key = field.getGenericType().toString();
+ typeMap.put(key, field);
+ }
+ }
+
+ /* Assign streams to stream fields. */
+ Multimap<Field, Stream<? extends Event>> assignment = LinkedListMultimap.create();
+ for (StreamBuilder<? extends Event> sm : streams) {
+
+ Stream<? extends Event> stream = sm.stream;
+ Class<? extends Event> eventType = sm.type;
+ String key = Stream.class.getCanonicalName() + "<" + eventType.getCanonicalName() + ">[]";
+ if (typeMap.containsKey(key)) {
+ String fieldName;
+ Field field;
+ Collection<Field> streamFields = typeMap.get(key);
+ int numStreamFields = streamFields.size();
+ logger.debug("Found [{}] stream fields for type [{}].", numStreamFields, key);
+
+ if (numStreamFields > 1) {
+
+ /*
+ * There is more than one field that can be used for this stream type. To resolve the ambiguity we
+ * need additional information. The app graph should include the name of the field that should be
+ * used to assign this stream. If the name is missing we bail out.
+ */
+ fieldName = sm.fieldName;
+
+ /* Bail out. */
+ if (fieldName == null) {
+ String msg = String
+ .format("There are [%d] stream fields in PE [%s]. To assign stream [%s] you need to provide the field name in the application graph using the method withFiled(). See Javadocs for an example.",
+ numStreamFields, pe.getClass().getName(), stream.getName());
+ logger.error(msg);
+ throw new Exception(msg);
+ }
+
+ /* Use the provided field name to choose the PE field. */
+ field = pe.getClass().getDeclaredField(fieldName);
+
+ } else {
+
+ /*
+ * The easy case, no ambiguity, we don't need an explicit field name to be provided. We have the
+ * field that matches the stream type.
+ */
+ Iterator<Field> iter = streamFields.iterator();
+ field = iter.next(); // Note that numStreamFields == 1, the size of this collection is 1.
+ logger.debug("Using field [{}].", field.getName());
+ }
+
+ /*
+ * By now, we found the field to use for this stream or we bailed out. We are not ready to finish yet.
+ * There may be more than one stream that needs to be assigned to this field. The stream fields must be
+ * arrays by convention and there may be more than one stream assigned to this fields. For now we create
+ * a multimap from field to streams so we can construct the array in the next step.
+ */
+ assignment.put(field, stream);
+
+ } else {
+
+ /* We couldn't find a match. Tell user to fix the EDSL code. */
+ String msg = String.format(
+ "There is no stream of type [%s] in PE [%s]. I was unable to assign stream [%s].", key, pe
+ .getClass().getName(), stream.getName());
+ logger.error(msg);
+ throw new Exception(msg);
+
+ }
+ }
+ /* Now we construct the array and do the final assignment. */
+
+ Map<Field, Collection<Stream<? extends Event>>> assignmentMap = assignment.asMap();
+ for (Map.Entry<Field, Collection<Stream<? extends Event>>> entry : assignmentMap.entrySet()) {
+ Field f = entry.getKey();
+
+ int arraySize = entry.getValue().size();
+ @SuppressWarnings("unchecked")
+ Stream<? extends Event> streamArray[] = (Stream<? extends Event>[]) Array.newInstance(Stream.class,
+ arraySize);
+ int i = 0;
+ for (Stream<? extends Event> s : entry.getValue()) {
+ streamArray[i++] = s;
+
+ f.setAccessible(true);
+ f.set(pe, streamArray);
+ logger.debug("Assigned [{}] streams to field [{}].", streamArray.length, f.getName());
+ }
+ }
+ }
+
+ void clearPEState() {
+ propertyName = null;
+ propertyValue = null;
+ processingElement = null;
+ peName = null;
+ triggerEventType = null;
+ triggerTimeUnit = null;
+ cacheSize = -1;
+ streamBuilder = null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/MyApp.java
----------------------------------------------------------------------
diff --cc subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/MyApp.java
index 75fea88,0000000..ad40e86
mode 100644,000000..100644
--- a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/MyApp.java
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/MyApp.java
@@@ -1,37 -1,0 +1,24 @@@
+package org.apache.s4.edsl;
+
+import java.util.concurrent.TimeUnit;
+
+public class MyApp extends BuilderS4DSL {
+
+ @Override
+ public void onInit() {
+
+ pe("PEZ").type(PEZ.class).fireOn(EventA.class).afterInterval(5, TimeUnit.SECONDS).cache().size(1000)
+ .expires(3, TimeUnit.HOURS).emit(EventB.class).to("PEX").
+
+ pe("PEY").type(PEY.class).prop("duration", "4").prop("height", "99").timer()
+ .withPeriod(2, TimeUnit.MINUTES).emit(EventA.class).onField("stream3")
+ .withKeyFinder(DurationKeyFinder.class).to("PEZ").emit(EventA.class).onField("heightpez")
+ .withKeyFinder(HeightKeyFinder.class).to("PEZ").
+
+ pe("PEX").type(PEX.class).prop("query", "money").cache().size(100).expires(1, TimeUnit.MINUTES)
+ .asSingleton().emit(EventB.class).withKeyFinder(QueryKeyFinder.class).to("PEY", "PEZ").
+
+ build();
+ }
+
- // Make hooks public for testing. Normally this is handled by the container.
- public void init() {
- super.init();
- }
-
- public void start() {
- super.start();
- }
-
- public void close() {
- super.close();
- }
-
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
----------------------------------------------------------------------
diff --cc subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
index dbff5f5,0000000..8225ee4
mode 100644,000000..100644
--- a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
@@@ -1,38 -1,0 +1,46 @@@
+package org.apache.s4.edsl;
+
+import java.lang.reflect.Field;
+
++import org.apache.s4.comm.DefaultCommModule;
++import org.apache.s4.core.DefaultCoreModule;
++import org.apache.s4.fixtures.ZkBasedTest;
+import org.junit.Test;
+
++import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
- public class TestEDSL {
++public class TestEDSL extends ZkBasedTest {
++
++ public final static String CLUSTER_NAME = "cluster1";
+
+ @Test
+ public void test() throws Exception {
- Injector injector = Guice.createInjector(new Module());
++ Injector injector = Guice.createInjector(
++ new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), CLUSTER_NAME),
++ new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+ MyApp myApp = injector.getInstance(MyApp.class);
+
+ /* Normally. the container will handle this but this is just a test. */
+ myApp.init();
+ myApp.start();
+ myApp.close();
+ }
+
+ @Test
+ public void testReflection() {
+
+ try {
+ Class<?> c = PEY.class;
+ Field f = c.getDeclaredField("duration");
+ System.out.format("Type: %s%n", f.getType());
+ System.out.format("GenericType: %s%n", f.getGenericType());
+
+ // production code should handle these exceptions more gracefully
+ } catch (NoSuchFieldException x) {
+ x.printStackTrace();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterApp.java
----------------------------------------------------------------------
diff --cc subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterApp.java
index 2019a1e,0000000..ef5cd94
mode 100644,000000..100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterApp.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterApp.java
@@@ -1,99 -1,0 +1,100 @@@
- package org.apache.s4.example.edsl.counter;
-
- import java.util.concurrent.TimeUnit;
-
- import org.apache.s4.base.Event;
- import org.apache.s4.edsl.BuilderS4DSL;
-
- import com.google.inject.Guice;
- import com.google.inject.Injector;
-
- /**
- * This is a sample application to test the S4 embedded domain-specific language (EDSL).
- *
- * <p>
- * Grammar:
- *
- * <pre>
- * (pe , type , prop* , (fireOn , afterInterval? , afterNumEvents?)? , (timer, withPeriod)? ,
- * (cache, size , expires? )? , asSingleton? , (emit, onField?,
- * (withKey|withKeyFinder)?, to )* )+ , build
- * </pre>
- *
- * <p>
- * See the <a href="http://code.google.com/p/diezel">Diezel</a> project for details.
- *
- */
- final public class CounterApp extends BuilderS4DSL {
-
- public static void main(String[] args) {
- Injector injector = Guice.createInjector(new Module());
- CounterApp myApp = injector.getInstance(CounterApp.class);
-
- /* Normally. the container will handle this but this is just a test. */
- myApp.init();
- myApp.start();
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- myApp.close();
- }
-
- @Override
- protected void onInit() {
-
- pe("Print").type(PrintPE.class).asSingleton().
-
- pe("User Count").type(CounterPE.class).fireOn(Event.class).afterInterval(100, TimeUnit.MILLISECONDS)
- .emit(CountEvent.class).withKeyFinder(CountKeyFinder.class).to("Print").
-
- pe("Gender Count").type(CounterPE.class).fireOn(Event.class).afterInterval(100, TimeUnit.MILLISECONDS)
- .emit(CountEvent.class).withKeyFinder(CountKeyFinder.class).to("Print").
-
- pe("Age Count").type(CounterPE.class).fireOn(Event.class).afterInterval(100, TimeUnit.MILLISECONDS)
- .emit(CountEvent.class).withKeyFinder(CountKeyFinder.class).to("Print").
-
- pe("Generate User Event").type(GenerateUserEventPE.class).timer().withPeriod(1, TimeUnit.MILLISECONDS)
- .asSingleton().
-
- emit(UserEvent.class).withKeyFinder(UserIDKeyFinder.class).to("User Count").
-
- emit(UserEvent.class).withKey("gender").to("Gender Count").
-
- emit(UserEvent.class).withKeyFinder(AgeKeyFinder.class).to("Age Count").
-
- build();
- }
-
- /*
- * Create and send 200 dummy events of type UserEvent.
- *
- * @see io.s4.App#start()
- */
- @Override
- protected void onStart() {
-
- }
-
- @Override
- protected void onClose() {
- System.out.println("Bye.");
- }
-
- // Make hooks public for testing. Normally this is handled by the container.
- public void init() {
- super.init();
- }
-
- public void start() {
- super.start();
- }
-
- public void close() {
- super.close();
- }
-
- }
++// NOTE: this is commented until we fix the dependency to the classes generated by the edsl subproject
++
++//package org.apache.s4.example.edsl.counter;
++//
++//import java.util.concurrent.TimeUnit;
++//
++//import org.apache.s4.base.Event;
++//
++//import com.google.inject.Guice;
++//import com.google.inject.Injector;
++//
++///**
++// * This is a sample application to test the S4 embedded domain-specific language (EDSL).
++// *
++// * <p>
++// * Grammar:
++// *
++// * <pre>
++// * (pe , type , prop* , (fireOn , afterInterval? , afterNumEvents?)? , (timer, withPeriod)? ,
++// * (cache, size , expires? )? , asSingleton? , (emit, onField?,
++// * (withKey|withKeyFinder)?, to )* )+ , build
++// * </pre>
++// *
++// * <p>
++// * See the <a href="http://code.google.com/p/diezel">Diezel</a> project for details.
++// *
++// */
++//final public class CounterApp extends BuilderS4DSL {
++//
++// public static void main(String[] args) {
++// Injector injector = Guice.createInjector(new Module());
++// CounterApp myApp = injector.getInstance(CounterApp.class);
++//
++// /* Normally. the container will handle this but this is just a test. */
++// myApp.init();
++// myApp.start();
++//
++// try {
++// Thread.sleep(1000);
++// } catch (InterruptedException e) {
++// e.printStackTrace();
++// }
++// myApp.close();
++// }
++//
++// @Override
++// protected void onInit() {
++//
++// pe("Print").type(PrintPE.class).asSingleton().
++//
++// pe("User Count").type(CounterPE.class).fireOn(Event.class).afterInterval(100, TimeUnit.MILLISECONDS)
++// .emit(CountEvent.class).withKeyFinder(CountKeyFinder.class).to("Print").
++//
++// pe("Gender Count").type(CounterPE.class).fireOn(Event.class).afterInterval(100, TimeUnit.MILLISECONDS)
++// .emit(CountEvent.class).withKeyFinder(CountKeyFinder.class).to("Print").
++//
++// pe("Age Count").type(CounterPE.class).fireOn(Event.class).afterInterval(100, TimeUnit.MILLISECONDS)
++// .emit(CountEvent.class).withKeyFinder(CountKeyFinder.class).to("Print").
++//
++// pe("Generate User Event").type(GenerateUserEventPE.class).timer().withPeriod(1, TimeUnit.MILLISECONDS)
++// .asSingleton().
++//
++// emit(UserEvent.class).withKeyFinder(UserIDKeyFinder.class).to("User Count").
++//
++// emit(UserEvent.class).withKey("gender").to("Gender Count").
++//
++// emit(UserEvent.class).withKeyFinder(AgeKeyFinder.class).to("Age Count").
++//
++// build();
++// }
++//
++// /*
++// * Create and send 200 dummy events of type UserEvent.
++// *
++// * @see io.s4.App#start()
++// */
++// @Override
++// protected void onStart() {
++//
++// }
++//
++// @Override
++// protected void onClose() {
++// System.out.println("Bye.");
++// }
++//
++// // Make hooks public for testing. Normally this is handled by the container.
++// public void init() {
++// super.init();
++// }
++//
++// public void start() {
++// super.start();
++// }
++//
++// public void close() {
++// super.close();
++// }
++//
++// }
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/test-apps/twitter-counter/src/main/resources/default.s4.properties
----------------------------------------------------------------------
diff --cc test-apps/twitter-counter/src/main/resources/default.s4.properties
index 0000000,d5da3f3..16fc4ba
mode 000000,100644..100644
--- a/test-apps/twitter-counter/src/main/resources/default.s4.properties
+++ b/test-apps/twitter-counter/src/main/resources/default.s4.properties
@@@ -1,0 -1,15 +1,19 @@@
+ comm.queue_emmiter_size = 8000
+ comm.queue_listener_size = 8000
++comm.retries=10
++comm.retry_delay=10
++comm.timeout=1000
++tcp.partition.queue_size=256
+ cluster.hosts = localhost
+ cluster.ports = 5077
+ cluster.name = s4-adapter-cluster
+ cluster.zk_address = localhost:21810
+ cluster.zk_session_timeout = 10000
+ cluster.zk_connection_timeout = 10000
+ comm.module = org.apache.s4.core.CustomModule
+ s4.logger_level = DEBUG
+ appsDir=/tmp/deploy-test
+ tcp.partition.queue_size=1000
+ comm.timeout=100
+ comm.retry_delay=100
+ comm.retries=10