You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/15 16:06:01 UTC
[22/22] git commit: S4-22 automating tests
S4-22 automating tests
TODO:
- fix hardcoded code in Server.java
- complete tests
- deployment on multiple nodes
- deployment across logical clusters
- fluent API code was temporarily removed (AppMaker)
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/403162eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/403162eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/403162eb
Branch: refs/heads/piper
Commit: 403162eb91aa37a78cba646105463453f5de4d21
Parents: 7eb4882
Author: Matthieu Morel <mm...@apache.org>
Authored: Sun Jan 15 23:31:47 2012 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Sun Jan 15 23:52:06 2012 +0100
----------------------------------------------------------------------
.../src/main/java/org/apache/s4/core/App.java | 122 ++++-----
.../main/java/org/apache/s4/core/EventSource.java | 62 ++---
.../src/main/java/org/apache/s4/core/Key.java | 52 ++++
.../java/org/apache/s4/core/ProcessingElement.java | 7 +-
.../src/main/java/org/apache/s4/core/Server.java | 30 ++
.../src/main/java/org/apache/s4/core/Stream.java | 152 +++-------
.../main/java/org/apache/s4/core/Streamable.java | 13 +-
.../main/java/org/apache/s4/fluent/AppMaker.java | 22 +-
.../s4/deploy/prodcon/TestProducerConsumer.java | 173 +++++++++++
test-apps/s4-counter/build.gradle | 223 +++++++++++++++
.../s4-counter/src/main/java/s4app/ClockApp.java | 55 ++++
.../s4-counter/src/main/java/s4app/ClockPE.java | 49 ++++
test-apps/s4-showtime/build.gradle | 223 +++++++++++++++
.../s4-showtime/src/main/java/s4app/ShowPE.java | 33 +++
.../src/main/java/s4app/ShowTimeApp.java | 47 +++
15 files changed, 1026 insertions(+), 237 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 4a562f9..6249306 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -15,9 +15,9 @@
*/
package org.apache.s4.core;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.s4.base.Event;
@@ -25,8 +25,6 @@ import org.apache.s4.base.KeyFinder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.Multimap;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Inject;
@@ -39,11 +37,14 @@ public abstract class App {
static final Logger logger = LoggerFactory.getLogger(App.class);
- /* PE prototype to streams relations. */
- final private Multimap<ProcessingElement, Streamable<? extends Event>> pe2stream = LinkedListMultimap.create();
+ /* All the PE prototypes in this app. */
+ final private List<ProcessingElement> pePrototypes = new ArrayList<ProcessingElement>();
- /* Stream prototype to PE relations. */
- final private Multimap<Streamable<? extends Event>, ProcessingElement> stream2pe = LinkedListMultimap.create();
+ /* All the internal streams in this app. */
+ final private List<Streamable> streams = new ArrayList<Streamable>();
+
+ /* All the the event sources exported by this app. */
+ final private List<EventSource> eventSources = new ArrayList<EventSource>();
private ClockType clockType = ClockType.WALL_CLOCK;
private int id = -1;
@@ -85,39 +86,52 @@ public abstract class App {
this.id = id;
}
- /**
- * @return all the pePrototypes
- */
- Collection<ProcessingElement> getPePrototypes() {
- return pe2stream.keySet();
+ /* Should only be used within the core package. */
+ void addPEPrototype(ProcessingElement pePrototype) {
+ pePrototypes.add(pePrototype);
}
- /**
- * @return all the pePrototypes
- */
- <T extends Event> Collection<ProcessingElement> getTargetPEs(Stream<T> stream) {
+ /* Should only be used within the core package. */
+ void addStream(Streamable stream) {
+ streams.add(stream);
+ }
- Map<Streamable<?>, Collection<ProcessingElement>> stream2peMap = stream2pe.asMap();
+ /* Should only be used within the core package. */
+ void addEventSource(EventSource es) {
+ eventSources.add(es);
+ }
- return stream2peMap.get(stream);
+ /* Returns list of PE prototypes. Should only be used within the core package. */
+ List<ProcessingElement> getPePrototypes() {
+ return pePrototypes;
}
- protected abstract void onStart();
+ /* Returns list of internal streams. Should only be used within the core package. */
+ List<Streamable> getStreams() {
+ return streams;
+ }
- protected void start() {
+ /* Returns list of the event sources to be exported. Should only be used within the core package. */
+ List<EventSource> getEventSources() {
+ return eventSources;
+ }
- logger.info("Prepare to start App [{}].", getClass().getName());
+ protected abstract void onStart();
- /* Start all streams. */
- for (Streamable<? extends Event> stream : getStreams()) {
- stream.start();
- }
+ protected void start() {
- /* Allow abstract PE to initialize. */
- for (ProcessingElement pe : getPePrototypes()) {
- logger.info("Init prototype [{}].", pe.getClass().getName());
- pe.initPEPrototypeInternal();
- }
+ // logger.info("Prepare to start App [{}].", getClass().getName());
+ //
+ // /* Start all streams. */
+ // for (Streamable<? extends Event> stream : getStreams()) {
+ // stream.start();
+ // }
+ //
+ // /* Allow abstract PE to initialize. */
+ // for (ProcessingElement pe : getPePrototypes()) {
+ // logger.info("Init prototype [{}].", pe.getClass().getName());
+ // pe.initPEPrototypeInternal();
+ // }
onStart();
}
@@ -156,25 +170,16 @@ public abstract class App {
/* Finally remove the entire app graph. */
logger.trace("Clear app graph.");
- pe2stream.clear();
- stream2pe.clear();
- }
-
- void addPEPrototype(ProcessingElement pePrototype, Stream<? extends Event> stream) {
-
- logger.info("Add PE prototype [{}] with stream [{}].", toString(pePrototype), toString(stream));
- pe2stream.put(pePrototype, stream);
+ pePrototypes.clear();
+ streams.clear();
}
- void addStream(Streamable<? extends Event> stream, ProcessingElement pePrototype) {
- logger.info("Add Stream [{}] with PE prototype [{}].", toString(stream), toString(pePrototype));
- stream2pe.put(stream, pePrototype);
+ void addPEPrototype(ProcessingElement pePrototype, Stream<? extends Event> stream) {
- }
+ // logger.info("Add PE prototype [{}] with stream [{}].", toString(pePrototype), toString(stream));
+ pePrototypes.add(pePrototype);
- Collection<Streamable<? extends Event>> getStreams() {
- return stream2pe.keySet();
}
/**
@@ -264,7 +269,7 @@ public abstract class App {
protected <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder,
ProcessingElement... processingElements) {
- return new Stream<T>(this).setName(name).setKey(finder).setPEs(processingElements);
+ return new Stream<T>(this, name, finder, processingElements);
}
/**
@@ -281,30 +286,7 @@ public abstract class App {
*/
protected <T extends Event> Stream<T> createStream(String name, ProcessingElement... processingElements) {
- return new Stream<T>(this).setName(name).setPEs(processingElements);
- }
-
- /**
- * Creates stream with default values. Use the builder methods to configure the stream. Example:
- * <p>
- *
- * <pre>
- * s1 = <SampleEvent> createStream().withName("My first stream.").withKey(new AKeyFinder()).to(somePE);
- * </pre>
- *
- * <p>
- *
- * @param name
- * the name of the stream
- * @param processingElements
- * the target processing elements
- * @return the stream
- */
- public <T extends Event> Stream<T> createStream(Class<T> type) {
-
- Stream<T> stream = new Stream<T>(this);
- stream.setEventType(type);
- return stream;
+ return new Stream<T>(this, name, processingElements);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
index 21ea23b..edd64ef 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
@@ -9,76 +9,76 @@ import org.slf4j.LoggerFactory;
/**
*
- * A producer app uses one or more EventSource classes to provide events to streams. AT runtime, consumer apps subscribe
- * to an event source by providing a stream object. Each EventSource instance may correspond to a different type of
- * event stream. Each EventSource may have an unlimited number of subscribers.
+ * A producer app uses one or more EventSource classes to provide events to streamables. At runtime, consumer apps
+ * subscribe to an event source by providing a streamable object. Each EventSource instance may correspond to a
+ * different type of event stream. Each EventSource may have an unlimited number of subscribers.
*
*/
-class EventSource<T extends Event> extends Streamable<T> {
+public class EventSource implements Streamable {
/* No need to synchronize this object because we expect a single thread. */
- private Set<Stream<T>> streams = new HashSet<Stream<T>>();
+ private Set<Streamable> streamables = new HashSet<Streamable>();
private static final Logger logger = LoggerFactory.getLogger(EventSource.class);
final private String name;
- EventSource(App app, String name) {
+ public EventSource(App app, String name) {
this.name = name;
- app.addStream(this, null);
+ app.addEventSource(this);
}
/**
- * Subscribe a stream to this event source.
+ * Subscribe a streamable to this event source.
*
- * @param stream
+ * @param aStream
*/
- void subscribeStream(Stream<T> stream) {
- logger.info("Subscribing stream: {} to event source: {}.", stream.getName(), getName());
- streams.add(stream);
+ public void subscribeStream(Streamable aStream) {
+ logger.info("Subscribing stream: {} to event source: {}.", aStream.getName(), getName());
+ streamables.add(aStream);
}
/**
- * Unsubscribe a stream from this event source.
+ * Unsubscribe a streamable from this event source.
*
* @param stream
*/
- void unsubscribeStream(Stream<T> stream) {
+ public void unsubscribeStream(Streamable stream) {
logger.info("Unsubsubscribing stream: {} to event source: {}.", stream.getName(), getName());
- streams.remove(stream);
+ streamables.remove(stream);
}
/**
- * Send an event to all the subscribed streams.
+ * Send an event to all the subscribed streamables.
*
* @param event
*/
@Override
- public void put(T event) {
- for (Stream<T> stream : streams) {
+ public void put(Event event) {
+ for (Streamable stream : streamables) {
stream.put(event);
}
}
/**
*
- * @return the number of streams subscribed to this event source.
+ * @return the number of streamables subscribed to this event source.
*/
- int getNumSubscribers() {
- return streams.size();
+ public int getNumSubscribers() {
+ return streamables.size();
}
/**
* @return the name of this event source.
*/
- String getName() {
+ public String getName() {
return name;
}
/**
- * Close all the streams subscribed to this event source.
+ * Close all the streamables subscribed to this event source.
*/
@Override
- void close() {
- for (Stream<T> stream : streams) {
+ public void close() {
+ for (Streamable stream : streamables) {
logger.info("Closing stream: {} in event source: {}.", stream.getName(), getName());
stream.close();
}
@@ -86,15 +86,9 @@ class EventSource<T extends Event> extends Streamable<T> {
/**
*
- * @return the set of streams subscribed to this event source.
+ * @return the set of streamables subscribed to this event source.
*/
- Set<Stream<T>> getStreams() {
- return streams;
- }
-
- @Override
- void start() {
- // TODO Auto-generated method stub
-
+ public Set<Streamable> getStreamables() {
+ return streamables;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/subprojects/s4-core/src/main/java/org/apache/s4/core/Key.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Key.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Key.java
new file mode 100644
index 0000000..49b3766
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Key.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.core;
+
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+
+/*
+ * The Key class is used to get the value of the key on a specific type of
+ * event. This is done to abstract all the complexity required to get the
+ * value. The method for getting the value is implemented in a method of
+ * an object of type KeyFinder<T>.
+ *
+ * The application programmer provides the events and the corresponding
+ * finders. The framework will use it to key on events.
+ */
+public class Key<T extends Event> {
+
+ final private KeyFinder<T> finder;
+ final private String separator;
+
+ public Key(KeyFinder<T> finder, String separator) {
+ this.finder = finder;
+ this.separator = separator;
+ }
+
+ public List<String> getList(T event) {
+ return finder.get(event);
+ }
+
+ public String get(T event) {
+ List<String> keys = getList(event);
+
+ return StringUtils.join(keys, separator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index befe94f..ee4c72f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -98,7 +98,7 @@ import com.google.common.collect.Maps;
*
*
*/
-abstract public class ProcessingElement implements Cloneable {
+public abstract class ProcessingElement implements Cloneable {
private static final Logger logger = LoggerFactory.getLogger(ProcessingElement.class);
private static final String SINGLETON = "singleton";
@@ -109,7 +109,7 @@ abstract public class ProcessingElement implements Cloneable {
* This maps holds all the instances. We make it package private to prevent concrete classes from updating the
* collection.
*/
- private Cache<String, ProcessingElement> peInstances;
+ Cache<String, ProcessingElement> peInstances;
/* This map is initialized in the prototype and cloned to instances. */
Map<Class<? extends Event>, Trigger> triggers;
@@ -336,7 +336,6 @@ abstract public class ProcessingElement implements Cloneable {
* in timeUnit
* @param timeUnit
* the timeUnit of interval
- * @return the PE prototype
*/
public ProcessingElement setTimerInterval(long interval, TimeUnit timeUnit) {
timerIntervalInMilliseconds = TimeUnit.MILLISECONDS.convert(interval, timeUnit);
@@ -353,7 +352,7 @@ abstract public class ProcessingElement implements Cloneable {
timer = new Timer();
logger.info("Created timer for PE prototype [{}] with interval [{}].", this.getClass().getName(),
timerIntervalInMilliseconds);
-
+ timer.schedule(new OnTimeTask(), 0, timerIntervalInMilliseconds);
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index 31513d1..35328aa 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -85,6 +85,36 @@ public class Server {
startApp(app);
}
+ EventSource savedES = null;
+ App consumerApp = null;
+ for (App app : apps) {
+ logger.info("Resolving dependencies for " + app.getClass().getName());
+ List<EventSource> eventSources = app.getEventSources();
+ if (eventSources.size() > 0) {
+ EventSource es = eventSources.get(0);
+ logger.info("App [{}] exports event source [{}].", app.getClass().getName(), es.getName());
+ savedES = es; // hardcoded
+ } else {
+
+ // hardcoded (one app has event source the other one doesn't.
+ consumerApp = app;
+ }
+ }
+ // hardcoded: make savedApp subscribe to savedES
+ logger.info("The consumer app is [{}].", consumerApp.getClass().getName());
+ // get the list of streams and find the one we are looking for that has name: "I need the time."
+ List<Streamable> streams = consumerApp.getStreams();
+ for (Streamable aStream : streams) {
+
+ String streamName = aStream.getName();
+
+ if (streamName.contentEquals("I need the time.")) {
+ logger.info("Subscribing stream [{}] from app [{}] to event source.", streamName, consumerApp
+ .getClass().getName());
+ savedES.subscribeStream(aStream);
+ }
+ }
+
logger.info("Completed local applications startup.");
if (deploymentManager != null) {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index c05d81f..bd1d351 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -16,20 +16,14 @@
package org.apache.s4.core;
import java.util.Collection;
-import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.s4.base.Event;
-import org.apache.s4.base.GenericKeyFinder;
-import org.apache.s4.base.Key;
import org.apache.s4.base.KeyFinder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-
/**
* {@link Stream} and {@link ProcessingElement} objects represent the links and nodes in the application graph. A stream
* sends an {@link Event} object to {@link ProcessingElement} instances located anywhere in a cluster.
@@ -38,23 +32,22 @@ import com.google.common.collect.Sets;
* <p>
* To build an application create stream objects using use the {@link StreamFactory} class.
*/
-public class Stream<T extends Event> extends Streamable<T> implements Runnable {
+public class Stream<T extends Event> implements Runnable, Streamable {
private static final Logger logger = LoggerFactory.getLogger(Stream.class);
final static private String DEFAULT_SEPARATOR = "^";
final static private int CAPACITY = 1000;
private static int idCounter = 0;
- private String name = "";
- private Key<T> key = null;
- private ProcessingElement[] targetPEs = null;
- final private BlockingQueue<T> queue = new ArrayBlockingQueue<T>(CAPACITY);
- private Thread thread;
+ final private String name;
+ final private Key<T> key;
+ final private ProcessingElement[] targetPEs;
+ final private BlockingQueue<Event> queue = new ArrayBlockingQueue<Event>(CAPACITY);
+ final private Thread thread;
final private Sender sender;
final private Receiver receiver;
final private int id;
final private App app;
- private Class<T> eventType = null;
/**
* Send events using a {@link KeyFinder<T>}. The key finder extracts the value of the key which is used to determine
@@ -69,24 +62,22 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
* @param processingElements
* the target PE prototypes for this stream.
*/
- public Stream(App app) {
+ public Stream(App app, String name, KeyFinder<T> finder, ProcessingElement... processingElements) {
synchronized (Stream.class) {
id = idCounter++;
}
this.app = app;
- app.addStream(this, null);
+ app.addStream(this);
+ this.name = name;
+ if (finder == null) {
+ this.key = null;
+ } else {
+ this.key = new Key<T>(finder, DEFAULT_SEPARATOR);
+ }
this.sender = app.getSender();
this.receiver = app.getReceiver();
- }
-
- void start() {
-
- /* Get target PE prototypes for this stream. Remove null key. */
- Set<? extends ProcessingElement> pes = Sets.newHashSet(app.getTargetPEs(this));
- pes.remove(null);
- targetPEs = new ProcessingElement[pes.size()];
- pes.toArray(targetPEs);
+ this.targetPEs = processingElements;
/* Start streaming. */
thread = new Thread(this, name);
@@ -95,78 +86,18 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
}
/**
- * Stop and close this stream.
- */
- void close() {
- thread.interrupt();
- }
-
- /**
- * Name the stream.
+ * Send events to all available {@link ProcessingElement} instances contained by the {@link ProcessingElement}
+ * prototypes passed to this constructor.
*
+ * @param app
+ * we always register streams with the parent application.
* @param name
- * the stream name, default is an empty string.
- * @return the stream object
- */
- public Stream<T> setName(String name) {
- this.name = name;
- return this;
- }
-
- /**
- * Define the key finder for this stream.
- *
- * @param keyFinder
- * a function to lookup the value of the key.
- * @return the stream object
- */
- public Stream<T> setKey(KeyFinder<T> keyFinder) {
- this.key = new Key<T>(keyFinder, DEFAULT_SEPARATOR);
- return this;
- }
-
- /**
- * Define the key finder for this stream using a descriptor.
- *
- * @param keyFinderString
- * a descriptor to lookup up the value of the key.
- * @return the stream object
- */
- public Stream<T> setKey(String keyName) {
-
- Preconditions.checkNotNull(eventType);
-
- KeyFinder<T> kf = new GenericKeyFinder<T>(keyName, eventType);
- setKey(kf);
-
- return this;
- }
-
- /**
- * Send events from this stream to a PE.
- *
- * @param pe
- * a target PE.
- *
- * @return the stream object
- */
- public Stream<T> setPE(ProcessingElement pe) {
- app.addStream(this, pe);
- return this;
- }
-
- /**
- * Send events from this stream to various PEs.
- *
- * @param pe
- * a target PE array.
- *
- * @return the stream object
+ * give this stream a meaningful name in the context of your application.
+ * @param processingElements
+ * the target PE prototypes for this stream.
*/
- public Stream<T> setPEs(ProcessingElement[] pes) {
- for (int i = 0; i < pes.length; i++)
- app.addStream(this, pes[i]);
- return this;
+ public Stream(App app, String name, ProcessingElement... processingElements) {
+ this(app, name, null, processingElements);
}
/**
@@ -174,7 +105,8 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
*
* @param event
*/
- public void put(T event) {
+ @SuppressWarnings("unchecked")
+ public void put(Event event) {
try {
event.setStreamId(getId());
event.setAppId(app.getId());
@@ -188,7 +120,7 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
* We send to a specific PE instance using the key but we don't know if the target partition is remote
* or local. We need to ask the sender.
*/
- if (sender.sendAndCheckIfLocal(key.get(event), event)) {
+ if (sender.sendAndCheckIfLocal(key.get((T) event), event)) {
/*
* Sender checked and decided that the target is local so we simply put the event in the queue and
@@ -208,11 +140,9 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
queue.put(event);
}
} catch (InterruptedException e) {
- if (logger.isTraceEnabled()) {
- e.printStackTrace();
- }
- logger.debug("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
- // System.exit(-1);
+ e.printStackTrace();
+ logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
+ System.exit(-1);
}
}
@@ -226,8 +156,9 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
try {
queue.put((T) event);
} catch (InterruptedException e) {
- logger.debug("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
- // System.exit(-1);
+ e.printStackTrace();
+ logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
+ System.exit(-1);
}
}
@@ -267,6 +198,13 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
}
/**
+ * Stop and close this stream.
+ */
+ public void close() {
+ thread.interrupt();
+ }
+
+ /**
* @return the sender object
*/
public Sender getSender() {
@@ -280,16 +218,13 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
return receiver;
}
- void setEventType(Class<T> type) {
- this.eventType = type;
- }
-
@Override
public void run() {
while (true) {
try {
/* Get oldest event in queue. */
- T event = queue.take();
+ @SuppressWarnings("unchecked")
+ T event = (T) queue.take();
/* Send event to each target PE. */
for (int i = 0; i < targetPEs.length; i++) {
@@ -313,8 +248,7 @@ public class Stream<T extends Event> extends Streamable<T> implements Runnable {
/* We have a key, send to target PE. */
/* STEP 1: find the PE instance for key. */
- ProcessingElement pe;
- pe = targetPEs[i].getInstanceForKey(key.get(event));
+ ProcessingElement pe = targetPEs[i].getInstanceForKey(key.get(event));
/* STEP 2: pass event to PE instance. */
pe.handleInputEvent(event);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java
index 68b4bcf..ecbae5e 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java
@@ -7,27 +7,22 @@ import org.apache.s4.base.Event;
*
* @param <T>
*/
-abstract class Streamable<T extends Event> {
+public interface Streamable<T extends Event> {
/**
* Put an event into the streams.
*
* @param event
*/
- abstract void put(T event);
+ public void put(Event event);
/**
* Stop and close all the streams.
*/
- abstract void close();
+ public void close();
/**
* @return the name of this streamable object.
*/
- abstract String getName();
-
- /**
- * Start all streams;
- */
- abstract void start();
+ public String getName();
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
index bfeabd1..436ca4c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/AppMaker.java
@@ -9,7 +9,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.s4.base.Event;
-import org.apache.s4.base.KeyFinder;
import org.apache.s4.core.App;
import org.apache.s4.core.ProcessingElement;
import org.apache.s4.core.Stream;
@@ -152,7 +151,7 @@ abstract public class AppMaker {
StreamMaker sm = entry.getKey();
for (PEMaker pm : entry.getValue()) {
if (pm != null && sm != null) {
- sm.getStream().setPE(pm.getPe());
+ // sm.getStream().setPE(pm.getPe());
}
}
}
@@ -164,15 +163,16 @@ abstract public class AppMaker {
@SuppressWarnings("unchecked")
private <T extends Event> Stream<T> makeStream(StreamMaker sm, Class<T> type) {
- Stream<T> stream = app.createStream(type);
- stream.setName(sm.getName());
-
- if (sm.getKeyFinder() != null)
- stream.setKey((KeyFinder<T>) sm.getKeyFinder());
- else if (sm.getKeyDescriptor() != null)
- stream.setKey(sm.getKeyDescriptor());
-
- return stream;
+ // Stream<T> stream = app.createStream(type);
+ // stream.setName(sm.getName());
+ //
+ // if (sm.getKeyFinder() != null)
+ // stream.setKey((KeyFinder<T>) sm.getKeyFinder());
+ // else if (sm.getKeyDescriptor() != null)
+ // stream.setKey(sm.getKeyDescriptor());
+ //
+ // return stream;
+ return null;
}
/* Do the magic to create a PE from a PEMaker. */
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
new file mode 100644
index 0000000..1418396
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
@@ -0,0 +1,173 @@
+package org.apache.s4.deploy.prodcon;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.sun.net.httpserver.HttpServer;
+
+public class TestProducerConsumer {
+
+ private Factory zookeeperServerConnectionFactory;
+ private Process forkedNode;
+ private ZkClient zkClient;
+ private String clusterName;
+ private HttpServer httpServer;
+ private static File tmpAppsDir;
+
+ @BeforeClass
+ public static void createS4RFiles() throws Exception {
+ tmpAppsDir = Files.createTempDir();
+
+ Assert.assertTrue(tmpAppsDir.exists());
+ File gradlewFile = CoreTestUtils.findGradlewInRootDir();
+
+ CoreTestUtils.callGradleTask(gradlewFile, new File(gradlewFile.getParentFile().getAbsolutePath()
+ + "/test-apps/s4-showtime/build.gradle"), "installS4R",
+ new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
+
+ CoreTestUtils.callGradleTask(gradlewFile, new File(gradlewFile.getParentFile().getAbsolutePath()
+ + "/test-apps/s4-counter/build.gradle"), "installS4R",
+ new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
+ }
+
+ @Before
+ public void cleanLocalAppsDir() throws ConfigurationException {
+ PropertiesConfiguration config = loadConfig();
+
+ if (!new File(config.getString("appsDir")).exists()) {
+ Assert.assertTrue(new File(config.getString("appsDir")).mkdirs());
+ } else {
+ if (!config.getString("appsDir").startsWith("/tmp")) {
+ Assert.fail("apps dir should a subdir of /tmp for safety");
+ }
+ CommTestUtils.deleteDirectoryContents(new File(config.getString("appsDir")));
+ }
+ }
+
+ @Before
+ public void prepare() throws Exception {
+ CommTestUtils.cleanupTmpDirs();
+ zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
+ final ZooKeeper zk = CommTestUtils.createZkClient();
+ try {
+ zk.delete("/simpleAppCreated", -1);
+ } catch (Exception ignored) {
+ }
+
+ zk.close();
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ CommTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+ CommTestUtils.killS4App(forkedNode);
+ }
+
+ private PropertiesConfiguration loadConfig() throws org.apache.commons.configuration.ConfigurationException {
+ InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
+ PropertiesConfiguration config = new PropertiesConfiguration();
+ config.load(is);
+ return config;
+ }
+
+ @Test
+ public void testInitialDeploymentFromFileSystem() throws Exception {
+
+ File showtimeS4R = new File(loadConfig().getString("appsDir") + File.separator + "showtime"
+ + System.currentTimeMillis() + ".s4r");
+ System.out.println(tmpAppsDir.getAbsolutePath());
+ Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
+ + "/s4-showtime-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(showtimeS4R)) > 0);
+ String uriShowtime = showtimeS4R.toURI().toString();
+
+ File counterS4R = new File(loadConfig().getString("appsDir") + File.separator + "counter"
+ + System.currentTimeMillis() + ".s4r");
+ Assert.assertTrue(ByteStreams.copy(
+ Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath() + "/s4-counter-0.0.0-SNAPSHOT.s4r")),
+ Files.newOutputStreamSupplier(counterS4R)) > 0);
+
+ String uriCounter = counterS4R.toURI().toString();
+
+ initializeS4Node();
+
+ ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
+ record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uriShowtime);
+ zkClient.create("/" + clusterName + "/apps/showtime", record1, CreateMode.PERSISTENT);
+
+ ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
+ record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uriCounter);
+ zkClient.create("/" + clusterName + "/apps/counter", record2, CreateMode.PERSISTENT);
+
+ // TODO validate test through some Zookeeper notifications
+ Thread.sleep(10000);
+ }
+
+ private void initializeS4Node() throws ConfigurationException, IOException, InterruptedException {
+ // 0. package s4 app
+ // TODO this is currently done offline, and the app contains the TestApp class copied from the one in the
+ // current package .
+
+ // 1. start s4 nodes. Check that no app is deployed.
+ InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
+ PropertiesConfiguration config = new PropertiesConfiguration();
+ config.load(is);
+
+ clusterName = config.getString("cluster.name");
+ TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
+ taskSetup.clean(clusterName);
+ taskSetup.setup(clusterName, 1);
+
+ zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ List<String> processes = zkClient.getChildren("/" + clusterName + "/process");
+ Assert.assertTrue(processes.size() == 0);
+ final CountDownLatch signalProcessesReady = new CountDownLatch(1);
+
+ zkClient.subscribeChildChanges("/" + clusterName + "/process", new IZkChildListener() {
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+ if (currentChilds.size() == 2) {
+ signalProcessesReady.countDown();
+ }
+
+ }
+ });
+
+ File tmpConfig = File.createTempFile("tmp", "config");
+ Assert.assertTrue(ByteStreams.copy(getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties"),
+ Files.newOutputStreamSupplier(tmpConfig)) > 0);
+ forkedNode = CoreTestUtils.forkS4Node(new String[] { tmpConfig.getAbsolutePath() });
+
+ // TODO synchro with ready state from zk
+ Thread.sleep(10000);
+ // Assert.assertTrue(signalProcessesReady.await(10, TimeUnit.SECONDS));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/test-apps/s4-counter/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/s4-counter/build.gradle b/test-apps/s4-counter/build.gradle
new file mode 100644
index 0000000..37b1daf
--- /dev/null
+++ b/test-apps/s4-counter/build.gradle
@@ -0,0 +1,223 @@
+/*
+* Copyright 2010 the original author or authors.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/**
+* Apache S4 Application Build File
+*
+* Use this script to buils and package S4 apps.
+*
+* Run 'gradle install' on the s4 project to publish to your local maven repo.
+*
+* TODO: This should probably be distributed as an s4 plugin for Gradle.
+* TODO: There seem to be to be similarities with the war and jetty plugins. (war -> s4r, jetty -> s4Run).
+* We should make it easy to test the app from this script by a running a test task that starts and stops
+* an s4 server. See: http://www.gradle.org/releases/1.0-milestone-3/docs/userguide/userguide_single.html#war_plugin
+*
+* This is an interesting discussion:
+* http://gradle.1045684.n5.nabble.com/Exclude-properties-file-from-war-td3365147.html
+*
+*/
+
+/* Set the destination where we want to install the apps. */
+//s4AppInstallDir = "/tmp/s4Apps" // TODO: decide how to standarize dirs, use env var?
+
+s4AppInstallDir = hasProperty('appsDir') ? "$appsDir" : "/tmp/appsDir"
+
+s4Version = '0.5.0-SNAPSHOT'
+description = 'Apache S4 App'
+//defaultTasks 'installS4R'
+archivesBaseName = "$project.name"
+distRootFolder = "$archivesBaseName-${-> version}"
+
+
+// Append the suffix 'SNAPSHOT' when the build is not for release.
+version = new Version(major: 0, minor: 0, bugfix: 0, isRelease: false)
+group = 'org.apache.s4'
+
+apply plugin: 'java'
+apply plugin: 'eclipse'
+
+/* The app classname is set automatically from the source files. */
+def appClassname = ''
+
+/* Set Java version. */
+sourceCompatibility = 1.6
+targetCompatibility = 1.6
+
+
+
+/* All project libraries must be defined here. */
+libraries = [
+ json: 'org.json:json:20090211',
+ guava: 'com.google.guava:guava:10.0.1',
+ gson: 'com.google.code.gson:gson:1.6',
+ guice: 'com.google.inject:guice:3.0',
+ guice_assist: 'com.google.inject.extensions:guice-assistedinject:3.0',
+ guice_grapher: 'com.google.inject:guice-grapher:3.0',
+ flexjson: 'net.sf.flexjson:flexjson:2.1',
+ bcel: 'org.apache.bcel:bcel:5.2',
+ jakarta_regexp: 'jakarta-regexp:jakarta-regexp:1.4',
+ kryo: 'com.googlecode:kryo:1.04',
+ netty: 'org.jboss.netty:netty:3.2.5.Final',
+ reflectasm: 'com.esotericsoftware:reflectasm:0.8',
+ minlog: 'com.esotericsoftware:minlog:1.2',
+ asm: 'asm:asm:3.2',
+ commons_io: 'commons-io:commons-io:2.0.1',
+ commons_config: 'commons-configuration:commons-configuration:1.6',
+ commons_codec: 'commons-codec:commons-codec:1.4',
+ commons_httpclient: 'commons-httpclient:commons-httpclient:3.1',
+ commons_coll: 'net.sourceforge.collections:collections-generic:4.01', // Use this lib until the commons collection with Generics is released.
+ slf4j: 'org.slf4j:slf4j-api:1.6.1',
+ logback_core: 'ch.qos.logback:logback-core:0.9.29',
+ logback_classic: 'ch.qos.logback:logback-classic:0.9.29',
+ zk: 'org.apache.zookeeper:zookeeper:3.3.1',
+ jcip: 'net.jcip:jcip-annotations:1.0',
+ junit: 'junit:junit:4.10',
+ ]
+
+
+dependencies {
+
+ /* S4 Platform. We only need the API, not the transitive dependencies. */
+// s4Libs.each { module ->
+// compile( module ) //{ transitive = false }
+// s4API( module )
+// }
+
+ compile project(":s4-base")
+ compile project(":s4-comm")
+ compile project(":s4-core")
+
+ /* Logging. */
+ compile( libraries.slf4j )
+ compile( libraries.logback_core )
+ compile( libraries.logback_classic )
+
+ /* Commons. */
+ compile( libraries.commons_io )
+ compile( libraries.commons_config )
+ compile( libraries.commons_coll )
+
+ /* Misc. */
+ compile( libraries.jcip )
+
+ /* Testing. */
+ testCompile( libraries.junit )
+}
+
+/* Set the manifest attributes for the S4 archive here.
+* TODO: separate custom properties from std ones and set custom properties at the top of the build script.
+*/
+manifest.mainAttributes(
+ provider: 'gradle',
+ 'Implementation-Url': 'http://incubator.apache.org/projects/s4.html',
+ 'Implementation-Version': version,
+ 'Implementation-Vendor': 'Apache S4',
+ 'Implementation-Vendor-Id': 's4app',
+ 'S4-App-Class': appClassname, // gets set by the s4r task.
+ 'S4-Version': s4Version
+ )
+
+appDependencies = ( configurations.compile )
+
+/* This task will extract all the class files and create a fat jar. We set the manifest and the extension to make it an S4 archive file. */
+// TODO: exclude schenma files as needed (not critical) see: http://forums.gradle.org/gradle/topics/using_gradle_to_fat_jar_a_spring_project
+task s4r(type: Jar) {
+ dependsOn jar
+ from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
+ from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+ manifest = project.manifest
+ extension = 's4r'
+
+ /* Set class name in manifest. Parse source files until we find a class that extends App.
+ * Get fully qualified Java class name and set attribute in Manifest.
+ */
+ sourceSets.main.allSource.files.each { File file ->
+ if (appClassname =="" || appClassname == "UNKNOWN") {
+ // only execute the closure for this file if we haven't already found the app class name
+ appClassname = getAppClassname(file)
+ if(appClassname != "") {
+ manifest.mainAttributes('S4-App-Class': appClassname)
+ }
+ }
+ }
+
+ if (appClassname == "UNKNOWN") {
+
+ println "Couldn't find App class in source files...aborting."
+ exit(1)
+ }
+}
+
+/* List the artifacts that will br added to the s4 archive (and explode if needed). */
+s4r << {
+ appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
+ configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+
+ /* This is for debugging. */
+ //configurations.s4All.each { File file -> println 's4All: ' + file.name }
+ //deployableDependencies.each { File file -> println 'Deploy: ' + file.name }
+
+ // more debugging statements.
+ //sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
+
+}
+
+/* Install the S4 archive to the install directory. */
+task installS4R (type: Copy) {
+ dependsOn s4r
+ from s4r.archivePath
+ into s4AppInstallDir
+}
+
+/* Generates the gradlew scripts.
+http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
+task wrapper(type: Wrapper) { gradleVersion = '1.0-milestone-3' }
+
+
+/* Parse source file to get the app classname so we can use it in the manifest.
+* TODO: Use a real Java parser. (This is not skippong comments for example.)
+*/
+def getAppClassname(file) {
+ def classname = "UNKNOWN"
+ lines= file.readLines()
+ for(line in lines) {
+
+ def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
+ if(pn) {
+ packageName = pn[0][1] + "."
+ }
+
+ def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
+ if (an) {
+ classname = packageName + an[0][1]
+ println "Found app class name: " + classname
+ break
+ }
+ }
+ classname
+}
+
+class Version {
+ int major
+ int minor
+ int bugfix
+ boolean isRelease
+
+ String toString() {
+ "$major.$minor.$bugfix${isRelease ? '' : '-SNAPSHOT'}"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
----------------------------------------------------------------------
diff --git a/test-apps/s4-counter/src/main/java/s4app/ClockApp.java b/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
new file mode 100644
index 0000000..316b876
--- /dev/null
+++ b/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
@@ -0,0 +1,55 @@
+package s4app;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.EventSource;
+import org.apache.s4.core.Streamable;
+
+public class ClockApp extends App {
+
+ private EventSource eventSource;
+ private ClockPE clockPE;
+
+ @Override
+ protected void start() {
+ System.out.println("Starting CounterApp...");
+ clockPE.getInstanceForKey("single");
+ }
+
+ // generic array due to varargs generates a warning.
+ @Override
+ protected void init() {
+ System.out.println("Initing CounterApp...");
+
+ clockPE = new ClockPE(this);
+ clockPE.setTimerInterval(1, TimeUnit.SECONDS);
+
+ eventSource = new EventSource(this, "I can give you the time!");
+ clockPE.setStreams((Streamable) eventSource);
+ }
+
+ @Override
+ protected void close() {
+ System.out.println("Closing CounterApp...");
+ eventSource.close();
+ }
+
+ @Override
+ protected void onStart() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onInit() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onClose() {
+ // TODO Auto-generated method stub
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/test-apps/s4-counter/src/main/java/s4app/ClockPE.java
----------------------------------------------------------------------
diff --git a/test-apps/s4-counter/src/main/java/s4app/ClockPE.java b/test-apps/s4-counter/src/main/java/s4app/ClockPE.java
new file mode 100644
index 0000000..cc81672
--- /dev/null
+++ b/test-apps/s4-counter/src/main/java/s4app/ClockPE.java
@@ -0,0 +1,49 @@
+package s4app;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Streamable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClockPE extends ProcessingElement {
+
+ private static final Logger logger = LoggerFactory.getLogger(ClockPE.class);
+
+ private Streamable[] targetStreams;
+ private long tick = 0;
+
+ public ClockPE(App app) {
+ super(app);
+ }
+
+ /**
+ * @param targetStreams
+ * the {@link UserEvent} streams.
+ */
+ public void setStreams(Streamable... targetStreams) {
+ this.targetStreams = targetStreams;
+ }
+
+ public void onTime() {
+ Event event = new Event();
+ event.put("tick", Long.class, tick++);
+
+ logger.info("Sending event with tick {} and time {}.", event.get("tick", Long.class), event.getTime());
+ for (int i = 0; i < targetStreams.length; i++) {
+ targetStreams[i].put(event);
+ }
+ }
+
+ @Override
+ protected void onRemove() {
+
+ }
+
+ @Override
+ protected void onCreate() {
+ // TODO Auto-generated method stub
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/test-apps/s4-showtime/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/s4-showtime/build.gradle b/test-apps/s4-showtime/build.gradle
new file mode 100644
index 0000000..37b1daf
--- /dev/null
+++ b/test-apps/s4-showtime/build.gradle
@@ -0,0 +1,223 @@
+/*
+* Copyright 2010 the original author or authors.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/**
+* Apache S4 Application Build File
+*
+* Use this script to buils and package S4 apps.
+*
+* Run 'gradle install' on the s4 project to publish to your local maven repo.
+*
+* TODO: This should probably be distributed as an s4 plugin for Gradle.
+* TODO: There seem to be to be similarities with the war and jetty plugins. (war -> s4r, jetty -> s4Run).
+* We should make it easy to test the app from this script by a running a test task that starts and stops
+* an s4 server. See: http://www.gradle.org/releases/1.0-milestone-3/docs/userguide/userguide_single.html#war_plugin
+*
+* This is an interesting discussion:
+* http://gradle.1045684.n5.nabble.com/Exclude-properties-file-from-war-td3365147.html
+*
+*/
+
+/* Set the destination where we want to install the apps. */
+//s4AppInstallDir = "/tmp/s4Apps" // TODO: decide how to standarize dirs, use env var?
+
+s4AppInstallDir = hasProperty('appsDir') ? "$appsDir" : "/tmp/appsDir"
+
+s4Version = '0.5.0-SNAPSHOT'
+description = 'Apache S4 App'
+//defaultTasks 'installS4R'
+archivesBaseName = "$project.name"
+distRootFolder = "$archivesBaseName-${-> version}"
+
+
+// Append the suffix 'SNAPSHOT' when the build is not for release.
+version = new Version(major: 0, minor: 0, bugfix: 0, isRelease: false)
+group = 'org.apache.s4'
+
+apply plugin: 'java'
+apply plugin: 'eclipse'
+
+/* The app classname is set automatically from the source files. */
+def appClassname = ''
+
+/* Set Java version. */
+sourceCompatibility = 1.6
+targetCompatibility = 1.6
+
+
+
+/* All project libraries must be defined here. */
+libraries = [
+ json: 'org.json:json:20090211',
+ guava: 'com.google.guava:guava:10.0.1',
+ gson: 'com.google.code.gson:gson:1.6',
+ guice: 'com.google.inject:guice:3.0',
+ guice_assist: 'com.google.inject.extensions:guice-assistedinject:3.0',
+ guice_grapher: 'com.google.inject:guice-grapher:3.0',
+ flexjson: 'net.sf.flexjson:flexjson:2.1',
+ bcel: 'org.apache.bcel:bcel:5.2',
+ jakarta_regexp: 'jakarta-regexp:jakarta-regexp:1.4',
+ kryo: 'com.googlecode:kryo:1.04',
+ netty: 'org.jboss.netty:netty:3.2.5.Final',
+ reflectasm: 'com.esotericsoftware:reflectasm:0.8',
+ minlog: 'com.esotericsoftware:minlog:1.2',
+ asm: 'asm:asm:3.2',
+ commons_io: 'commons-io:commons-io:2.0.1',
+ commons_config: 'commons-configuration:commons-configuration:1.6',
+ commons_codec: 'commons-codec:commons-codec:1.4',
+ commons_httpclient: 'commons-httpclient:commons-httpclient:3.1',
+ commons_coll: 'net.sourceforge.collections:collections-generic:4.01', // Use this lib until the commons collection with Generics is released.
+ slf4j: 'org.slf4j:slf4j-api:1.6.1',
+ logback_core: 'ch.qos.logback:logback-core:0.9.29',
+ logback_classic: 'ch.qos.logback:logback-classic:0.9.29',
+ zk: 'org.apache.zookeeper:zookeeper:3.3.1',
+ jcip: 'net.jcip:jcip-annotations:1.0',
+ junit: 'junit:junit:4.10',
+ ]
+
+
+dependencies {
+
+ /* S4 Platform. We only need the API, not the transitive dependencies. */
+// s4Libs.each { module ->
+// compile( module ) //{ transitive = false }
+// s4API( module )
+// }
+
+ compile project(":s4-base")
+ compile project(":s4-comm")
+ compile project(":s4-core")
+
+ /* Logging. */
+ compile( libraries.slf4j )
+ compile( libraries.logback_core )
+ compile( libraries.logback_classic )
+
+ /* Commons. */
+ compile( libraries.commons_io )
+ compile( libraries.commons_config )
+ compile( libraries.commons_coll )
+
+ /* Misc. */
+ compile( libraries.jcip )
+
+ /* Testing. */
+ testCompile( libraries.junit )
+}
+
+/* Set the manifest attributes for the S4 archive here.
+* TODO: separate custom properties from std ones and set custom properties at the top of the build script.
+*/
+manifest.mainAttributes(
+ provider: 'gradle',
+ 'Implementation-Url': 'http://incubator.apache.org/projects/s4.html',
+ 'Implementation-Version': version,
+ 'Implementation-Vendor': 'Apache S4',
+ 'Implementation-Vendor-Id': 's4app',
+ 'S4-App-Class': appClassname, // gets set by the s4r task.
+ 'S4-Version': s4Version
+ )
+
+appDependencies = ( configurations.compile )
+
+/* This task will extract all the class files and create a fat jar. We set the manifest and the extension to make it an S4 archive file. */
+// TODO: exclude schenma files as needed (not critical) see: http://forums.gradle.org/gradle/topics/using_gradle_to_fat_jar_a_spring_project
+task s4r(type: Jar) {
+ dependsOn jar
+ from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
+ from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+ manifest = project.manifest
+ extension = 's4r'
+
+ /* Set class name in manifest. Parse source files until we find a class that extends App.
+ * Get fully qualified Java class name and set attribute in Manifest.
+ */
+ sourceSets.main.allSource.files.each { File file ->
+ if (appClassname =="" || appClassname == "UNKNOWN") {
+ // only execute the closure for this file if we haven't already found the app class name
+ appClassname = getAppClassname(file)
+ if(appClassname != "") {
+ manifest.mainAttributes('S4-App-Class': appClassname)
+ }
+ }
+ }
+
+ if (appClassname == "UNKNOWN") {
+
+ println "Couldn't find App class in source files...aborting."
+ exit(1)
+ }
+}
+
+/* List the artifacts that will br added to the s4 archive (and explode if needed). */
+s4r << {
+ appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
+ configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+
+ /* This is for debugging. */
+ //configurations.s4All.each { File file -> println 's4All: ' + file.name }
+ //deployableDependencies.each { File file -> println 'Deploy: ' + file.name }
+
+ // more debugging statements.
+ //sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
+
+}
+
+/* Install the S4 archive to the install directory. */
+task installS4R (type: Copy) {
+ dependsOn s4r
+ from s4r.archivePath
+ into s4AppInstallDir
+}
+
+/* Generates the gradlew scripts.
+http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
+task wrapper(type: Wrapper) { gradleVersion = '1.0-milestone-3' }
+
+
+/* Parse source file to get the app classname so we can use it in the manifest.
+* TODO: Use a real Java parser. (This is not skippong comments for example.)
+*/
+def getAppClassname(file) {
+ def classname = "UNKNOWN"
+ lines= file.readLines()
+ for(line in lines) {
+
+ def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
+ if(pn) {
+ packageName = pn[0][1] + "."
+ }
+
+ def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
+ if (an) {
+ classname = packageName + an[0][1]
+ println "Found app class name: " + classname
+ break
+ }
+ }
+ classname
+}
+
+class Version {
+ int major
+ int minor
+ int bugfix
+ boolean isRelease
+
+ String toString() {
+ "$major.$minor.$bugfix${isRelease ? '' : '-SNAPSHOT'}"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/test-apps/s4-showtime/src/main/java/s4app/ShowPE.java
----------------------------------------------------------------------
diff --git a/test-apps/s4-showtime/src/main/java/s4app/ShowPE.java b/test-apps/s4-showtime/src/main/java/s4app/ShowPE.java
new file mode 100644
index 0000000..f68516c
--- /dev/null
+++ b/test-apps/s4-showtime/src/main/java/s4app/ShowPE.java
@@ -0,0 +1,33 @@
+package s4app;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ShowPE extends ProcessingElement {
+
+ private static final Logger logger = LoggerFactory.getLogger(ShowPE.class);
+
+ public ShowPE(App app) {
+ super(app);
+ }
+
+ public void onEvent(Event event) {
+
+ logger.info("Received event with tick {} and time {}.", event.get("tick", Long.class), event.getTime());
+
+ }
+
+ @Override
+ protected void onRemove() {
+
+ }
+
+ @Override
+ protected void onCreate() {
+ // TODO Auto-generated method stub
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/403162eb/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
----------------------------------------------------------------------
diff --git a/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java b/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
new file mode 100644
index 0000000..1c92c57
--- /dev/null
+++ b/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
@@ -0,0 +1,47 @@
+package s4app;
+
+import org.apache.s4.core.App;
+
+public class ShowTimeApp extends App {
+
+ private ShowPE showPE;
+
+ @Override
+ protected void start() {
+ System.out.println("Starting ShowTimeApp...");
+ showPE.getInstanceForKey("single");
+ }
+
+ @Override
+ protected void init() {
+ System.out.println("Initing ShowTimeApp...");
+
+ showPE = new ShowPE(this);
+
+ /* This stream will receive events from another app. */
+ createStream("I need the time.", showPE);
+ }
+
+ @Override
+ protected void close() {
+ System.out.println("Closing ShowTimeApp...");
+ }
+
+ @Override
+ protected void onStart() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onInit() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onClose() {
+ // TODO Auto-generated method stub
+
+ }
+}