You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/15 16:06:01 UTC
[20/22] git commit: inter-cluster communication - added classes for
interacting with remote cluster : remote topology,
remote emitter - added twitter topic count example
inter-cluster communication
- added classes for interacting with remote cluster : remote topology, remote emitter
- added twitter topic count example
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/d11f7fbc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/d11f7fbc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/d11f7fbc
Branch: refs/heads/piper
Commit: d11f7fbc5b331a09465677f721fa8fab7bab0654
Parents: 17c5ab8
Author: Matthieu Morel <mm...@apache.org>
Authored: Sun Mar 25 15:53:05 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Mar 29 13:25:53 2012 +0200
----------------------------------------------------------------------
build.gradle | 10 +-
.../src/main/java/org/apache/s4/base/Event.java | 10 +-
.../java/org/apache/s4/base/RemoteEmitter.java | 5 +
.../org/apache/s4/base/util/MultiClassLoader.java | 31 +--
.../org/apache/s4/comm/tcp/TCPRemoteEmitter.java | 15 +
.../java/org/apache/s4/comm/tools/TaskSetup.java | 6 +-
.../apache/s4/comm/topology/RemoteTopology.java | 5 +
.../s4/comm/topology/RemoteTopologyFromZK.java | 16 ++
.../s4/comm/topology/AssignmentFromZKTest.java | 5 +-
.../s4/comm/topology/TopologyFromZKTest.java | 5 +-
.../java/org/apache/s4/fixtures/ZkBasedTest.java | 12 +-
subprojects/s4-core/s4-core.gradle | 2 +
.../src/main/java/org/apache/s4/core/App.java | 10 +-
.../src/main/java/org/apache/s4/core/Main.java | 2 +-
.../src/main/java/org/apache/s4/core/Receiver.java | 12 +-
.../main/java/org/apache/s4/core/RemoteSender.java | 18 ++
.../src/main/java/org/apache/s4/core/Server.java | 65 ++----
.../src/main/java/org/apache/s4/core/Stream.java | 16 +-
.../java/org/apache/s4/core/adapter/Adapter.java | 26 ++
.../org/apache/s4/core/adapter/AdapterMain.java | 59 +++++
.../org/apache/s4/core/adapter/AdapterModule.java | 97 +++++++
.../org/apache/s4/core/adapter/RemoteStream.java | 60 +++++
.../apache/s4/deploy/TestAutomaticDeployment.java | 2 +-
.../test/java/org/apache/s4/deploy/TestModule.java | 62 -----
.../s4/deploy/prodcon/TestProducerConsumer.java | 2 +-
.../test/java/org/apache/s4/fixtures/ZKServer.java | 24 ++-
.../resources/org.apache.s4.deploy.s4.properties | 6 +-
test-apps/twitter-adapter/build.gradle | 200 +++++++++++++++
.../s4/example/twitter/TwitterInputAdapter.java | 113 ++++++++
.../src/main/resources/default.s4.properties | 18 ++
.../src/main/resources/twitter4j.properties | 5 +
test-apps/twitter-counter/build.gradle | 200 +++++++++++++++
.../org/apache/s4/example/twitter/TopNTopicPE.java | 79 ++++++
.../s4/example/twitter/TopicCountAndReportPE.java | 47 ++++
.../s4/example/twitter/TopicExtractorPE.java | 49 ++++
.../apache/s4/example/twitter/TopicSeenEvent.java | 17 ++
.../s4/example/twitter/TwitterCounterApp.java | 77 ++++++
37 files changed, 1213 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 1bafbcf..7db43d7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -70,7 +70,9 @@ libraries = [
zk: 'org.apache.zookeeper:zookeeper:3.3.1',
jcip: 'net.jcip:jcip-annotations:1.0',
junit: 'junit:junit:4.10',
- zkclient: 'com.github.sgroschupf:zkclient:0.1'
+ zkclient: 'com.github.sgroschupf:zkclient:0.1',
+ diezel: 'net.ericaro:diezel-maven-plugin:1.0.0-beta-4',
+ jcommander: 'com.beust:jcommander:1.23'
]
subprojects {
@@ -84,10 +86,10 @@ subprojects {
targetCompatibility = 1.6
checkstyleConfigFileName = "$rootDir/config/checkstyle/s4-checkstyle.xml"
-
+
dependencies {
-
+
/* Google. */
compile( libraries.guava )
compile( libraries.guice )
@@ -105,7 +107,7 @@ subprojects {
/* Misc. */
compile( libraries.jcip )
compile( libraries.zk )
-
+
/* Testing. */
testCompile( libraries.junit )
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
index 2e023cb..ca6f607 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
@@ -27,7 +27,7 @@ public class Event {
private static final Logger logger = LoggerFactory.getLogger(Event.class);
final private long time;
- private int streamId;
+ private String streamName;
private int appId;
private Map<String, Data<?>> map;
@@ -56,8 +56,8 @@ public class Event {
*
* @return the target stream id
*/
- public int getStreamId() {
- return streamId;
+ public String getStreamName() {
+ return streamName;
}
/**
@@ -66,8 +66,8 @@ public class Event {
*
* @param targetStreamId
*/
- public void setStreamId(int streamId) {
- this.streamId = streamId;
+ public void setStreamId(String streamName) {
+ this.streamName = streamName;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-base/src/main/java/org/apache/s4/base/RemoteEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/RemoteEmitter.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/RemoteEmitter.java
new file mode 100644
index 0000000..59d9164
--- /dev/null
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/RemoteEmitter.java
@@ -0,0 +1,5 @@
+package org.apache.s4.base;
+
+public interface RemoteEmitter extends Emitter {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-base/src/main/java/org/apache/s4/base/util/MultiClassLoader.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/MultiClassLoader.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/MultiClassLoader.java
index 8c702df..0e942b6 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/MultiClassLoader.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/MultiClassLoader.java
@@ -17,16 +17,14 @@ import com.google.common.collect.MapMaker;
* The source code for this class was derived from <a href=
* "http://code.google.com/p/db4o-om/source/browse/trunk/objectmanager-api/src/com/db4o/objectmanager/configuration/MultiClassLoader.java"
* >this project</a> which was derived from this <a href=
- * "http://www.javaworld.com/javaworld/jw-10-1996/jw-10-indepth.html?page=1"
- * >article by Chuck McManis</a>.
+ * "http://www.javaworld.com/javaworld/jw-10-1996/jw-10-indepth.html?page=1" >article by Chuck McManis</a>.
*
*
* Thank you to the authors!
*/
abstract public class MultiClassLoader extends ClassLoader {
- private static final Logger logger = LoggerFactory
- .getLogger(MultiClassLoader.class);
+ private static final Logger logger = LoggerFactory.getLogger(MultiClassLoader.class);
private final Map<String, Class<?>> classes;
private char classNameReplacementChar;
@@ -37,8 +35,8 @@ abstract public class MultiClassLoader extends ClassLoader {
// ---------- Superclass Overrides ------------------------
/**
- * This is a simple version for external clients since they will always want
- * the class resolved before it is returned to them.
+ * This is a simple version for external clients since they will always want the class resolved before it is
+ * returned to them.
*/
@Override
public Class<?> loadClass(String className) throws ClassNotFoundException {
@@ -46,28 +44,26 @@ abstract public class MultiClassLoader extends ClassLoader {
}
@Override
- public synchronized Class<?> loadClass(String className, boolean resolveIt)
- throws ClassNotFoundException {
+ public synchronized Class<?> loadClass(String className, boolean resolveIt) throws ClassNotFoundException {
Class<?> result;
byte[] classBytes;
- logger.debug("MultiClassLoader loadClass - className: " + className
- + ", resolveIt: " + resolveIt);
+ logger.trace("MultiClassLoader loadClass - className: " + className + ", resolveIt: " + resolveIt);
/* Check our local cache of classes. */
result = classes.get(className);
if (result != null) {
- logger.debug("Returning cached result for class [{}]", className);
+ logger.trace("Returning cached result for class [{}]", className);
return result;
}
/* Check with the primordial class loader. */
try {
result = super.findSystemClass(className);
- logger.debug("Returning system class (in CLASSPATH) [{}]", className);
+ logger.trace("Returning system class (in CLASSPATH) [{}]", className);
return result;
} catch (ClassNotFoundException e) {
- logger.debug("Not a system class [{}]", className);
+ logger.trace("Not a system class [{}]", className);
}
classBytes = loadClassBytes(className);
@@ -94,15 +90,14 @@ abstract public class MultiClassLoader extends ClassLoader {
if (result == null)
return null;
classes.put(className, result);
- logger.debug("Returning newly loaded class [{}]", className);
+ logger.trace("Returning newly loaded class [{}]", className);
return result;
}
/**
- * This optional call allows a class name such as "COM.test.Hello" to be
- * changed to "COM_test_Hello", which is useful for storing classes from
- * different packages in the same retrieval directory. In the above example
- * the char would be '_'.
+ * This optional call allows a class name such as "COM.test.Hello" to be changed to "COM_test_Hello", which is
+ * useful for storing classes from different packages in the same retrieval directory. In the above example the char
+ * would be '_'.
*/
public void setClassNameReplacementChar(char replacement) {
classNameReplacementChar = replacement;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
new file mode 100644
index 0000000..0fa663f
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
@@ -0,0 +1,15 @@
+package org.apache.s4.comm.tcp;
+
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.comm.topology.RemoteTopology;
+
+import com.google.inject.Inject;
+
+public class TCPRemoteEmitter extends TCPEmitter implements RemoteEmitter {
+
+ @Inject
+ public TCPRemoteEmitter(RemoteTopology topology) throws InterruptedException {
+ super(topology);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
index 193daff..0466993 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
@@ -22,7 +22,7 @@ public class TaskSetup {
zkclient.deleteRecursive("/" + clusterName);
}
- public void setup(String clusterName, int tasks) {
+ public void setup(String clusterName, int tasks, int initialPort) {
zkclient.createPersistent("/" + clusterName + "/tasks", true);
zkclient.createPersistent("/" + clusterName + "/process", true);
zkclient.createPersistent("/" + clusterName + "/apps", true);
@@ -30,7 +30,7 @@ public class TaskSetup {
String taskId = "Task-" + i;
ZNRecord record = new ZNRecord(taskId);
record.putSimpleField("taskId", taskId);
- record.putSimpleField("port", String.valueOf(1300 + i));
+ record.putSimpleField("port", String.valueOf(initialPort + i));
record.putSimpleField("partition", String.valueOf(i));
record.putSimpleField("cluster", clusterName);
zkclient.createPersistent("/" + clusterName + "/tasks/" + taskId,
@@ -42,7 +42,7 @@ public class TaskSetup {
TaskSetup taskSetup = new TaskSetup("localhost:2181");
String clusterName = "test-s4-cluster";
taskSetup.clean(clusterName);
- taskSetup.setup(clusterName, 10);
+ taskSetup.setup(clusterName, 10, 1300);
String zookeeperAddress = "localhost:2181";
for (int i = 0; i < 10; i++) {
AssignmentFromZK assignmentFromZK = new AssignmentFromZK(
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopology.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopology.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopology.java
new file mode 100644
index 0000000..8f7cc1a
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopology.java
@@ -0,0 +1,5 @@
+package org.apache.s4.comm.topology;
+
+public interface RemoteTopology extends Topology {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopologyFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopologyFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopologyFromZK.java
new file mode 100644
index 0000000..39ede90
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteTopologyFromZK.java
@@ -0,0 +1,16 @@
+package org.apache.s4.comm.topology;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class RemoteTopologyFromZK extends TopologyFromZK implements RemoteTopology {
+
+ @Inject
+ public RemoteTopologyFromZK(@Named("cluster.remote.name") String remoteClusterName,
+ @Named("cluster.zk_address") String zookeeperAddress,
+ @Named("cluster.zk_session_timeout") int sessionTimeout,
+ @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ super(remoteClusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
index d733e9b..e709d68 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentFromZKTest.java
@@ -1,8 +1,9 @@
package org.apache.s4.comm.topology;
+import static org.junit.Assert.assertEquals;
+
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.*;
import org.apache.s4.comm.tools.TaskSetup;
import org.junit.Test;
@@ -14,7 +15,7 @@ public class AssignmentFromZKTest extends ZKBaseTest {
TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
final String clusterName = "test-s4-cluster";
taskSetup.clean(clusterName);
- taskSetup.setup(clusterName, 10);
+ taskSetup.setup(clusterName, 10, 1300);
final CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
Runnable runnable = new Runnable() {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
index eb5f42c..65eee28 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/TopologyFromZKTest.java
@@ -1,11 +1,12 @@
package org.apache.s4.comm.topology;
+import static org.junit.Assert.assertEquals;
+
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import static org.junit.Assert.*;
import org.apache.s4.comm.tools.TaskSetup;
import org.junit.Test;
@@ -17,7 +18,7 @@ public class TopologyFromZKTest extends ZKBaseTest {
TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
final String clusterName = "test-s4-cluster";
taskSetup.clean(clusterName);
- taskSetup.setup(clusterName, 10);
+ taskSetup.setup(clusterName, 10, 1300);
final TopologyFromZK topologyFromZK = new TopologyFromZK(clusterName, zookeeperAddress, 30000, 30000);
final Lock lock = new ReentrantLock();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
index 02c5467..bda3bd8 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java
@@ -6,6 +6,7 @@ import org.I0Itec.zkclient.IDefaultNameSpace;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkServer;
import org.apache.s4.comm.tools.TaskSetup;
+import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +44,15 @@ public abstract class ZkBasedTest {
TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
final String clusterName = "s4-test-cluster";
taskSetup.clean(clusterName);
- taskSetup.setup(clusterName, 1);
+ taskSetup.setup(clusterName, 1, 1300);
+ }
+
+ @After
+ public void cleanupZkBasedTest() {
+ if (zkServer != null) {
+ zkServer.shutdown();
+ zkServer = null;
+ }
+ CommTestUtils.cleanupTmpDirs();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/s4-core.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/s4-core.gradle b/subprojects/s4-core/s4-core.gradle
index 1bdbb6c..afa85c9 100644
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@ -19,6 +19,8 @@ description = 'The S4 core platform.'
dependencies {
compile project(":s4-base")
compile project(":s4-comm")
+ compile project(path: ':s4-comm', configuration: 'tests')
+ compile libraries.jcommander
testCompile project(path: ':s4-comm', configuration: 'tests')
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 0939d8e..2575013 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -92,7 +92,7 @@ public abstract class App {
}
/* Should only be used within the core package. */
- void addStream(Streamable stream) {
+ public void addStream(Streamable stream) {
streams.add(stream);
}
@@ -130,10 +130,10 @@ public abstract class App {
}
//
// /* Allow abstract PE to initialize. */
- // for (ProcessingElement pe : getPePrototypes()) {
- // logger.info("Init prototype [{}].", pe.getClass().getName());
- // pe.initPEPrototypeInternal();
- // }
+ for (ProcessingElement pe : getPePrototypes()) {
+ logger.info("Init prototype [{}].", pe.getClass().getName());
+ pe.initPEPrototypeInternal();
+ }
onStart();
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index a55511c..71756a4 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -63,7 +63,7 @@ public class Main {
private static void startServer(final Logger logger, Injector injector) {
Server server = injector.getInstance(Server.class);
try {
- server.start();
+ server.start(injector);
} catch (Exception e) {
logger.error("Failed to start the controller.", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
index 90ae0bf..259496b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
@@ -31,7 +31,7 @@ public class Receiver implements Runnable {
final private Listener listener;
final private SerializerDeserializer serDeser;
- private Map<Integer, Map<Integer, Stream<? extends Event>>> streams;
+ private Map<Integer, Map<String, Stream<? extends Event>>> streams;
private Thread thread;
@Inject
@@ -52,23 +52,23 @@ public class Receiver implements Runnable {
/** Save stream keyed by app id and stream id. */
void addStream(Stream<? extends Event> stream) {
int appId = stream.getApp().getId();
- Map<Integer, Stream<? extends Event>> appMap = streams.get(appId);
+ Map<String, Stream<? extends Event>> appMap = streams.get(appId);
if (appMap == null) {
appMap = new MapMaker().makeMap();
streams.put(appId, appMap);
}
- appMap.put(stream.getId(), stream);
+ appMap.put(stream.getName(), stream);
}
/** Remove stream when it is no longer needed. */
void removeStream(Stream<? extends Event> stream) {
int appId = stream.getApp().getId();
- Map<Integer, Stream<? extends Event>> appMap = streams.get(appId);
+ Map<String, Stream<? extends Event>> appMap = streams.get(appId);
if (appMap == null) {
logger.error("Tried to remove a stream that is not registered in the receiver.");
return;
}
- appMap.remove(stream.getId());
+ appMap.remove(stream.getName());
}
public void run() {
@@ -79,7 +79,7 @@ public class Receiver implements Runnable {
Event event = (Event) serDeser.deserialize(message);
int appId = event.getAppId();
- int streamId = event.getStreamId();
+ String streamId = event.getStreamName();
/*
* Match appId and streamId in event to the target stream and pass the event to the target stream. TODO:
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
new file mode 100644
index 0000000..48b980b
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -0,0 +1,18 @@
+package org.apache.s4.core;
+
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.base.SerializerDeserializer;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class RemoteSender extends Sender {
+
+ @Inject
+ public RemoteSender(RemoteEmitter emitter, SerializerDeserializer serDeser, Hasher hasher) {
+ super(emitter, serDeser, hasher);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index dd80e51..0d7711b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -8,14 +8,12 @@ import java.util.concurrent.CountDownLatch;
import java.util.jar.Attributes.Name;
import java.util.jar.JarFile;
-import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkException;
import org.apache.s4.base.Event;
import org.apache.s4.base.util.S4RLoader;
import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.core.adapter.Adapter;
import org.apache.s4.deploy.DeploymentManager;
-import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -24,7 +22,6 @@ import ch.qos.logback.classic.Level;
import com.google.common.collect.Maps;
import com.google.common.io.PatternFilenameFilter;
import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.name.Named;
@@ -72,44 +69,11 @@ public class Server {
zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
zkClient.setZkSerializer(new ZNRecordSerializer());
- initializeZkStreams(clusterName);
- watchZkStreams();
}
- private void watchZkStreams() {
- zkClient.subscribeChildChanges("/" + clusterName + "/streams/producers", new IZkChildListener() {
-
- @Override
- public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
- // synchronized (streams) {
- // for (String child : currentChilds) {
- // if (apps.containsKey(child)) {
- // eventSources.put(paramK, paramV)
- // }
- // }
- // }
-
- }
- });
-
- }
-
- private void initializeZkStreams(String clusterName) {
- try {
- zkClient.createPersistent("/" + clusterName + "/streams");
- zkClient.createPersistent("/" + clusterName + "/streams/producers");
- zkClient.createPersistent("/" + clusterName + "/streams/consumers");
- } catch (ZkException e) {
- if (e.getCause() instanceof KeeperException.NodeExistsException) {
- // ignore: this stream already exists
- } else {
- throw e;
- }
- }
- }
-
- public void start() throws Exception {
+ public void start(Injector injector) throws Exception {
+ this.injector = injector;
/* Set up logger basic configuration. */
ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory
.getLogger(Logger.ROOT_LOGGER_NAME);
@@ -118,14 +82,15 @@ public class Server {
AbstractModule module = null;
/* Initialize communication layer module. */
- try {
- module = (AbstractModule) Class.forName(commModuleName).newInstance();
- } catch (Exception e) {
- logger.error("Unable to instantiate communication layer module.", e);
- }
-
- /* After some indirection we get the injector. */
- injector = Guice.createInjector(module);
+ // TODO do we need a separate comm layer?
+ // try {
+ // module = (AbstractModule) Class.forName(commModuleName).newInstance();
+ // } catch (Exception e) {
+ // logger.error("Unable to instantiate communication layer module.", e);
+ // }
+ //
+ // /* After some indirection we get the injector. */
+ // injector = Guice.createInjector(module);
File[] s4rFiles = new File(appsDir).listFiles(new PatternFilenameFilter("\\w+\\.s4r"));
for (File s4rFile : s4rFiles) {
@@ -241,6 +206,12 @@ public class Server {
}
app.setCommLayer(sender, receiver);
+
+ if (app instanceof Adapter) {
+ RemoteSender remoteSender = injector.getInstance(RemoteSender.class);
+ ((Adapter) app).setRemoteSender(remoteSender);
+ }
+
App previous = apps.put(appName, app);
logger.info("Loaded application from file {}", s4r.getAbsolutePath());
signalOneAppLoaded.countDown();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index a328941..6f23fde 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -46,7 +46,7 @@ public class Stream<T extends Event> implements Runnable, Streamable {
private Thread thread;
final private Sender sender;
final private Receiver receiver;
- final private int id;
+ // final private int id;
final private App app;
/**
@@ -63,9 +63,9 @@ public class Stream<T extends Event> implements Runnable, Streamable {
* the target PE prototypes for this stream.
*/
public Stream(App app, String name, KeyFinder<T> finder, ProcessingElement... processingElements) {
- synchronized (Stream.class) {
- id = idCounter++;
- }
+ // synchronized (Stream.class) {
+ // id = idCounter++;
+ // }
this.app = app;
app.addStream(this);
this.name = name;
@@ -111,7 +111,7 @@ public class Stream<T extends Event> implements Runnable, Streamable {
@SuppressWarnings("unchecked")
public void put(Event event) {
try {
- event.setStreamId(getId());
+ event.setStreamId(getName());
event.setAppId(app.getId());
/*
@@ -182,9 +182,9 @@ public class Stream<T extends Event> implements Runnable, Streamable {
/**
* @return the stream id
*/
- int getId() {
- return id;
- }
+ // int getId() {
+ // return id;
+ // }
/**
* @return the app
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/Adapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/Adapter.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/Adapter.java
new file mode 100644
index 0000000..25ffc59
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/Adapter.java
@@ -0,0 +1,26 @@
+package org.apache.s4.core.adapter;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.RemoteSender;
+
+import com.google.inject.Inject;
+
+public abstract class Adapter extends App {
+
+ @Inject
+ RemoteSender remoteSender;
+
+ public RemoteSender getRemoteSender() {
+ return remoteSender;
+ }
+
+ public void setRemoteSender(RemoteSender remoteSender) {
+ this.remoteSender = remoteSender;
+ }
+
+ protected <T extends Event> RemoteStream createRemoteStream(String name) {
+
+ return new RemoteStream(this, name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
new file mode 100644
index 0000000..a1c496c
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
@@ -0,0 +1,59 @@
+package org.apache.s4.core.adapter;
+
+import java.io.File;
+import java.io.FileInputStream;
+
+import org.apache.s4.core.Server;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class AdapterMain {
+ private static final Logger logger = LoggerFactory.getLogger(AdapterMain.class);
+
+ public static void main(String[] args) {
+
+ AdapterArgs adapterArgs = new AdapterArgs();
+ JCommander jc = new JCommander(adapterArgs);
+
+ try {
+ jc.parse(args);
+ } catch (Exception e) {
+ e.printStackTrace();
+ jc.usage();
+ }
+
+ try {
+ Injector injector = Guice.createInjector(new AdapterModule(new FileInputStream(new File(
+ adapterArgs.s4PropertiesFilePath))));
+ Server server = injector.getInstance(Server.class);
+ try {
+ server.start(injector);
+ } catch (Exception e) {
+ logger.error("Failed to start the controller.", e);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ @Parameters(separators = "=")
+ static class AdapterArgs {
+
+ @Parameter(names = "-moduleClass", description = "module class name")
+ String moduleClass;
+
+ @Parameter(names = "-adapterClass", description = "adapter class name")
+ String adapterClass;
+
+ @Parameter(names = "-s4Properties", description = "s4 properties file path")
+ String s4PropertiesFilePath;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterModule.java
new file mode 100644
index 0000000..6e5c4a0
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterModule.java
@@ -0,0 +1,97 @@
+package org.apache.s4.core.adapter;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
+import org.apache.s4.comm.tcp.TCPRemoteEmitter;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.RemoteTopology;
+import org.apache.s4.comm.topology.RemoteTopologyFromZK;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromZK;
+import org.apache.s4.deploy.DeploymentManager;
+import org.apache.s4.deploy.DistributedDeploymentManager;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+public class AdapterModule extends AbstractModule {
+
+ InputStream configFileInputStream;
+ private PropertiesConfiguration config;
+
+ public AdapterModule(InputStream configFileInputStream) {
+ this.configFileInputStream = configFileInputStream;
+ }
+
+ @Override
+ protected void configure() {
+ if (config == null) {
+ loadProperties(binder());
+ }
+ if (configFileInputStream != null) {
+ try {
+ configFileInputStream.close();
+ } catch (IOException ignored) {
+ }
+ }
+
+ int numHosts = config.getList("cluster.hosts").size();
+ boolean isCluster = numHosts > 1 ? true : false;
+ bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(Boolean.valueOf(isCluster));
+
+ bind(Cluster.class);
+
+ bind(Assignment.class).to(AssignmentFromZK.class);
+
+ bind(Topology.class).to(TopologyFromZK.class);
+ bind(RemoteTopology.class).to(RemoteTopologyFromZK.class);
+
+ bind(RemoteEmitter.class).to(TCPRemoteEmitter.class);
+ bind(Emitter.class).to(TCPEmitter.class);
+ bind(Listener.class).to(TCPListener.class);
+
+ // TODO downstream hasher
+
+ /* The hashing function to map keys top partitions. */
+ bind(Hasher.class).to(DefaultHasher.class);
+
+ /* Use Kryo to serialize events. */
+ bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+ bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
+ }
+
+ private void loadProperties(Binder binder) {
+ try {
+ config = new PropertiesConfiguration();
+ config.load(configFileInputStream);
+
+ System.out.println(ConfigurationUtils.toString(config));
+ // TODO - validate properties.
+
+ /* Make all properties injectable. Do we need this? */
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/RemoteStream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/RemoteStream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/RemoteStream.java
new file mode 100644
index 0000000..a83feee
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/RemoteStream.java
@@ -0,0 +1,60 @@
+package org.apache.s4.core.adapter;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.RemoteSender;
+import org.apache.s4.core.Streamable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Stream that dispatches events to a remote cluster
+ *
+ */
+public class RemoteStream implements Streamable<Event> {
+
+ private Thread thread;
+ String name;
+
+ RemoteSender remoteSender;
+ int id;
+ private Adapter adapter;
+ private static Logger logger = LoggerFactory.getLogger(RemoteStream.class);
+
+ private static AtomicInteger remoteStreamCounter = new AtomicInteger();
+
+ public RemoteStream(Adapter adapter, String name) {
+ this.name = name;
+ this.adapter = adapter;
+ adapter.addStream(this);
+ remoteSender = adapter.getRemoteSender();
+ this.id = remoteStreamCounter.addAndGet(1);
+ }
+
+ @Override
+ public void start() {
+
+ }
+
+ @Override
+ public void put(Event event) {
+ event.setStreamId(name);
+ event.setAppId(adapter.getId());
+
+ // TODO specify partitioning?
+ remoteSender.sendToRemotePartitions(event);
+
+ }
+
+ @Override
+ public void close() {
+ thread.interrupt();
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 279ccaa..4e0a3e7 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -296,7 +296,7 @@ public class TestAutomaticDeployment {
clusterName = config.getString("cluster.name");
TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
taskSetup.clean(clusterName);
- taskSetup.setup(clusterName, 1);
+ taskSetup.setup(clusterName, 1, 1300);
zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
zkClient.setZkSerializer(new ZNRecordSerializer());
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
deleted file mode 100644
index f5f3912..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package org.apache.s4.deploy;
-
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.tcp.TCPListener;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromZK;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-public class TestModule extends AbstractModule {
-
- private PropertiesConfiguration config;
-
- private void loadProperties(Binder binder) {
-
- try {
- InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
- config = new PropertiesConfiguration();
- config.load(is);
- System.out.println(ConfigurationUtils.toString(config));
- // TODO - validate properties.
-
- /* Make all properties injectable. Do we need this? */
- Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
-
- @Override
- protected void configure() {
- if (config == null) {
- loadProperties(binder());
- }
- bind(Cluster.class);
- bind(Hasher.class).to(DefaultHasher.class);
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
- bind(Assignment.class).to(AssignmentFromZK.class);
- bind(Topology.class).to(TopologyFromZK.class);
- bind(Emitter.class).to(TCPEmitter.class);
- bind(Listener.class).to(TCPListener.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
index 1418396..e853df6 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
@@ -140,7 +140,7 @@ public class TestProducerConsumer {
clusterName = config.getString("cluster.name");
TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
taskSetup.clean(clusterName);
- taskSetup.setup(clusterName, 1);
+ taskSetup.setup(clusterName, 1, 1300);
zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
zkClient.setZkSerializer(new ZNRecordSerializer());
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
index 3a33219..b79f1a0 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
@@ -1,5 +1,7 @@
package org.apache.s4.fixtures;
+import java.util.Arrays;
+
import org.apache.s4.comm.tools.TaskSetup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -21,6 +23,8 @@ public class ZKServer {
try {
jc.parse(args);
} catch (Exception e) {
+ System.out.println(Arrays.toString(args));
+ e.printStackTrace();
jc.usage();
System.exit(-1);
}
@@ -28,10 +32,12 @@ public class ZKServer {
logger.info("Starting zookeeper server for cluster [{}] with [{}] node(s)", clusterArgs.clusterName,
clusterArgs.nbTasks);
- CommTestUtils.startZookeeperServer();
+ if (clusterArgs.startZK) {
+ CommTestUtils.startZookeeperServer();
+ }
TaskSetup taskSetup = new TaskSetup(clusterArgs.zkConnectionString);
taskSetup.clean(clusterArgs.clusterName);
- taskSetup.setup(clusterArgs.clusterName, clusterArgs.nbTasks);
+ taskSetup.setup(clusterArgs.clusterName, clusterArgs.nbTasks, clusterArgs.firstListeningPort);
logger.info("Zookeeper started");
} catch (Exception e) {
logger.error("Cannot initialize zookeeper with specified configuration", e);
@@ -42,14 +48,20 @@ public class ZKServer {
@Parameters(separators = "=", commandDescription = "Start Zookeeper server and initialize S4 cluster configuration in Zookeeper (and clean previous one with same cluster name)")
static class ZKServerArgs {
- @Parameter(names = "cluster", description = "S4 cluster name", required = true)
+ @Parameter(names = "-cluster", description = "S4 cluster name", required = true)
String clusterName = "s4-test-cluster";
- @Parameter(names = "nbTasks", description = "number of tasks for the cluster", required = true)
+ @Parameter(names = "-nbTasks", description = "number of tasks for the cluster", required = true)
int nbTasks = 1;
- @Parameter(names = "zk", description = "Zookeeper connection string")
- String zkConnectionString = "localhost:2181";
+ @Parameter(names = "-zk", description = "Zookeeper connection string")
+ String zkConnectionString = "localhost:21810";
+
+ @Parameter(names = "-startZK", description = "Start local zookeeper server (connection string ignored in that case)", required = false)
+ boolean startZK = false;
+
+ @Parameter(names = "-firstListeningPort", description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
+ int firstListeningPort = -1;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
index d74927f..9a5d11a 100644
--- a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
+++ b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
@@ -6,6 +6,6 @@ cluster.name = s4-test-cluster
cluster.zk_address = localhost:21810
cluster.zk_session_timeout = 10000
cluster.zk_connection_timeout = 10000
-comm.module = org.apache.s4.deploy.TestModule
-s4.logger_level = TRACE
-appsDir=/tmp/deploy-test
\ No newline at end of file
+comm.module = org.apache.s4.core.adapter.AdapterModule
+s4.logger_level = DEBUG
+appsDir=/tmp/deploy-test
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-adapter/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/build.gradle b/test-apps/twitter-adapter/build.gradle
new file mode 100644
index 0000000..669d62b
--- /dev/null
+++ b/test-apps/twitter-adapter/build.gradle
@@ -0,0 +1,200 @@
+/*
+* Copyright 2010 the original author or authors.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/**
+* Apache S4 Application Build File
+*
+* Use this script to buils and package S4 apps.
+*
+* Run 'gradle install' on the s4 project to publish to your local maven repo.
+*
+* TODO: This should probably be distributed as an s4 plugin for Gradle.
+* TODO: There seem to be to be similarities with the war and jetty plugins. (war -> s4r, jetty -> s4Run).
+* We should make it easy to test the app from this script by a running a test task that starts and stops
+* an s4 server. See: http://www.gradle.org/releases/1.0-milestone-3/docs/userguide/userguide_single.html#war_plugin
+*
+* This is an interesting discussion:
+* http://gradle.1045684.n5.nabble.com/Exclude-properties-file-from-war-td3365147.html
+*
+*/
+
+/* Set the destination where we want to install the apps. */
+//s4AppInstallDir = "/tmp/s4Apps" // TODO: decide how to standarize dirs, use env var?
+
+s4AppInstallDir = hasProperty('appsDir') ? "$appsDir" : "/tmp/appsDir"
+
+s4Version = '0.5.0-SNAPSHOT'
+description = 'Apache S4 App'
+//defaultTasks 'installS4R'
+archivesBaseName = "$project.name"
+distRootFolder = "$archivesBaseName-${-> version}"
+
+
+// Append the suffix 'SNAPSHOT' when the build is not for release.
+//version = new Version(major: 0, minor: 0, bugfix: 0, isRelease: false)
+group = 'org.apache.s4'
+
+apply plugin: 'java'
+apply plugin: 'eclipse'
+
+/* The app classname is set automatically from the source files. */
+def appClassname = ''
+
+/* Set Java version. */
+sourceCompatibility = 1.6
+targetCompatibility = 1.6
+
+repositories {
+ mavenLocal()
+ mavenCentral()
+ mavenRepo name: "gson", urls: "http://google-gson.googlecode.com/svn/mavenrepo"
+
+ /* Add lib dir as a repo. Some jar files that are not available
+ in a public repo are distributed in the lib dir. */
+ flatDir name: 'libDir', dirs: "$rootDir/lib"
+}
+
+/* All project libraries must be defined here. */
+libraries = [
+ twitter4j_core: 'org.twitter4j:twitter4j-core:2.2.5',
+ twitter4j_stream: 'org.twitter4j:twitter4j-stream:2.2.5',
+ s4_base: 'org.apache.s4:s4-base:0.5.0-SNAPSHOT',
+ s4_comm: 'org.apache.s4:s4-comm:0.5.0-SNAPSHOT',
+ s4_core: 'org.apache.s4:s4-core:0.5.0-SNAPSHOT'
+ ]
+
+
+dependencies {
+
+ /* S4 Platform. We only need the API, not the transitive dependencies. */
+// s4Libs.each { module ->
+// compile( module ) //{ transitive = false }
+// s4API( module )
+// }
+
+ compile (libraries.s4_base)
+ compile (libraries.s4_comm)
+ compile (libraries.s4_core)
+ compile (libraries.twitter4j_core)
+ compile (libraries.twitter4j_stream)
+
+
+}
+
+/* Set the manifest attributes for the S4 archive here.
+* TODO: separate custom properties from std ones and set custom properties at the top of the build script.
+*/
+manifest.mainAttributes(
+ provider: 'gradle',
+ 'Implementation-Url': 'http://incubator.apache.org/projects/s4.html',
+ 'Implementation-Version': version,
+ 'Implementation-Vendor': 'Apache S4',
+ 'Implementation-Vendor-Id': 's4app',
+ 'S4-App-Class': appClassname, // gets set by the s4r task.
+ 'S4-Version': s4Version
+ )
+
+appDependencies = ( configurations.compile )
+
+/* This task will extract all the class files and create a fat jar. We set the manifest and the extension to make it an S4 archive file. */
+// TODO: exclude schenma files as needed (not critical) see: http://forums.gradle.org/gradle/topics/using_gradle_to_fat_jar_a_spring_project
+task s4r(type: Jar) {
+ dependsOn jar
+ from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
+ from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+ manifest = project.manifest
+ extension = 's4r'
+
+ /* Set class name in manifest. Parse source files until we find a class that extends App.
+ * Get fully qualified Java class name and set attribute in Manifest.
+ */
+ sourceSets.main.allSource.files.each { File file ->
+ if (appClassname =="" || appClassname == "UNKNOWN") {
+ // only execute the closure for this file if we haven't already found the app class name
+ appClassname = getAppClassname(file)
+ if(appClassname != "") {
+ manifest.mainAttributes('S4-App-Class': appClassname)
+ }
+ }
+ }
+
+ if (appClassname == "UNKNOWN") {
+
+ println "Couldn't find App class in source files...aborting."
+ exit(1)
+ }
+}
+
+/* List the artifacts that will br added to the s4 archive (and explode if needed). */
+s4r << {
+ appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
+ configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+
+ /* This is for debugging. */
+ //configurations.s4All.each { File file -> println 's4All: ' + file.name }
+ //deployableDependencies.each { File file -> println 'Deploy: ' + file.name }
+
+ // more debugging statements.
+ //sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
+
+}
+
+/* Install the S4 archive to the install directory. */
+task installS4R (type: Copy) {
+ dependsOn s4r
+ from s4r.archivePath
+ into s4AppInstallDir
+}
+
+/* Generates the gradlew scripts.
+http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
+task wrapper(type: Wrapper) { gradleVersion = '1.0-milestone-3' }
+
+
+/* Parse source file to get the app classname so we can use it in the manifest.
+* TODO: Use a real Java parser. (This is not skippong comments for example.)
+*/
+def getAppClassname(file) {
+ def classname = "UNKNOWN"
+ lines= file.readLines()
+ for(line in lines) {
+
+ def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
+ if(pn) {
+ packageName = pn[0][1] + "."
+ }
+
+ def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+Adapter.*\{/
+ if (an) {
+ classname = packageName + an[0][1]
+ println "Found adapter class name: " + classname
+ break
+ }
+
+ }
+ classname
+}
+
+class Version {
+ int major
+ int minor
+ int bugfix
+ boolean isRelease
+
+ String toString() {
+ "$major.$minor.$bugfix${isRelease ? '' : '-SNAPSHOT'}"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
new file mode 100644
index 0000000..ee905d9
--- /dev/null
+++ b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
@@ -0,0 +1,113 @@
+package org.apache.s4.example.twitter;
+
+import java.net.ServerSocket;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.s4.base.Event;
+import org.apache.s4.core.adapter.Adapter;
+import org.apache.s4.core.adapter.RemoteStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
+import twitter4j.TwitterStream;
+import twitter4j.TwitterStreamFactory;
+
+public class TwitterInputAdapter extends Adapter {
+
+ private ZkClient zkClient;
+ private static Logger logger = LoggerFactory.getLogger(TwitterInputAdapter.class);
+ private String urlString = "https://stream.twitter.com/1/statuses/sample.json";
+
+ public TwitterInputAdapter() {
+ }
+
+ private LinkedBlockingQueue<Status> messageQueue = new LinkedBlockingQueue<Status>();
+
+ protected ServerSocket serverSocket;
+
+ private Thread t;
+
+ private int messageCount;
+
+ private RemoteStream remoteStream;
+
+ @Override
+ protected void onClose() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onInit() {
+ remoteStream = createRemoteStream("RawStatus");
+ t = new Thread(new Dequeuer());
+ }
+
+ public void connectAndRead() throws Exception {
+
+ TwitterStream twitterStream = TwitterStreamFactory.getSingleton();
+ StatusListener statusListener = new StatusListener() {
+
+ @Override
+ public void onException(Exception ex) {
+ logger.error("error", ex);
+ }
+
+ @Override
+ public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
+ logger.error("error");
+ }
+
+ @Override
+ public void onStatus(Status status) {
+ messageQueue.add(status);
+
+ }
+
+ @Override
+ public void onScrubGeo(long userId, long upToStatusId) {
+ logger.error("error");
+ }
+
+ @Override
+ public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
+ logger.error("error");
+ }
+ };
+ twitterStream.addListener(statusListener);
+ twitterStream.sample();
+
+ }
+
+ @Override
+ protected void onStart() {
+ try {
+ t.start();
+ connectAndRead();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ class Dequeuer implements Runnable {
+
+ @Override
+ public void run() {
+ while (!Thread.interrupted()) {
+ try {
+ Status status = messageQueue.take();
+ Event event = new Event();
+ event.put("statusText", String.class, status.getText());
+ remoteStream.put(event);
+ } catch (Exception e) {
+
+ }
+ }
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-adapter/src/main/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/src/main/resources/default.s4.properties b/test-apps/twitter-adapter/src/main/resources/default.s4.properties
new file mode 100644
index 0000000..cd36aaa
--- /dev/null
+++ b/test-apps/twitter-adapter/src/main/resources/default.s4.properties
@@ -0,0 +1,18 @@
+comm.queue_emmiter_size = 8000
+comm.queue_listener_size = 8000
+cluster.hosts = localhost
+cluster.ports = 5077
+cluster.name = s4-adapter-cluster
+cluster.zk_address = localhost:21810
+cluster.zk_session_timeout = 10000
+cluster.zk_connection_timeout = 10000
+comm.module = org.apache.s4.deploy.TestModule
+s4.logger_level = TRACE
+appsDir=/tmp/deploy-test
+tcp.partition.queue_size=1000
+comm.timeout=100
+comm.retry_delay=100
+comm.retries=10
+
+# specify the name of the remote cluster (there is currently only 1 remote cluster max)
+cluster.remote.name=s4-test-cluster
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/src/main/resources/twitter4j.properties b/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
new file mode 100644
index 0000000..7d58c7d
--- /dev/null
+++ b/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
@@ -0,0 +1,5 @@
+debug=true
+# you need to set those parameters with valid twitter account credentials
+twitter4j.user=????
+twitter4j.password=????
+
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-counter/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/build.gradle b/test-apps/twitter-counter/build.gradle
new file mode 100644
index 0000000..e737b40
--- /dev/null
+++ b/test-apps/twitter-counter/build.gradle
@@ -0,0 +1,200 @@
+/*
+* Copyright 2010 the original author or authors.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/**
+* Apache S4 Application Build File
+*
+* Use this script to buils and package S4 apps.
+*
+* Run 'gradle install' on the s4 project to publish to your local maven repo.
+*
+* TODO: This should probably be distributed as an s4 plugin for Gradle.
+* TODO: There seem to be to be similarities with the war and jetty plugins. (war -> s4r, jetty -> s4Run).
+* We should make it easy to test the app from this script by a running a test task that starts and stops
+* an s4 server. See: http://www.gradle.org/releases/1.0-milestone-3/docs/userguide/userguide_single.html#war_plugin
+*
+* This is an interesting discussion:
+* http://gradle.1045684.n5.nabble.com/Exclude-properties-file-from-war-td3365147.html
+*
+*/
+
+/* Set the destination where we want to install the apps. */
+//s4AppInstallDir = "/tmp/s4Apps" // TODO: decide how to standarize dirs, use env var?
+
+s4AppInstallDir = hasProperty('appsDir') ? "$appsDir" : "/tmp/appsDir"
+
+s4Version = '0.5.0-SNAPSHOT'
+description = 'Apache S4 App'
+//defaultTasks 'installS4R'
+archivesBaseName = "$project.name"
+distRootFolder = "$archivesBaseName-${-> version}"
+
+
+// Append the suffix 'SNAPSHOT' when the build is not for release.
+//version = new Version(major: 0, minor: 0, bugfix: 0, isRelease: false)
+group = 'org.apache.s4'
+
+apply plugin: 'java'
+apply plugin: 'eclipse'
+
+/* The app classname is set automatically from the source files. */
+def appClassname = ''
+
+/* Set Java version. */
+sourceCompatibility = 1.6
+targetCompatibility = 1.6
+
+repositories {
+ mavenLocal()
+ mavenCentral()
+ mavenRepo name: "gson", urls: "http://google-gson.googlecode.com/svn/mavenrepo"
+
+ /* Add lib dir as a repo. Some jar files that are not available
+ in a public repo are distributed in the lib dir. */
+ flatDir name: 'libDir', dirs: "$rootDir/lib"
+}
+
+/* All project libraries must be defined here. */
+libraries = [
+ twitter4j_core: 'org.twitter4j:twitter4j-core:2.2.5',
+ twitter4j_stream: 'org.twitter4j:twitter4j-stream:2.2.5',
+ s4_base: 'org.apache.s4:s4-base:0.5.0-SNAPSHOT',
+ s4_comm: 'org.apache.s4:s4-comm:0.5.0-SNAPSHOT',
+ s4_core: 'org.apache.s4:s4-core:0.5.0-SNAPSHOT'
+ ]
+
+
+dependencies {
+
+ /* S4 Platform. We only need the API, not the transitive dependencies. */
+// s4Libs.each { module ->
+// compile( module ) //{ transitive = false }
+// s4API( module )
+// }
+
+ compile (libraries.s4_base)
+ compile (libraries.s4_comm)
+ compile (libraries.s4_core)
+ compile (libraries.twitter4j_core)
+ compile (libraries.twitter4j_stream)
+
+
+}
+
+/* Set the manifest attributes for the S4 archive here.
+* TODO: separate custom properties from std ones and set custom properties at the top of the build script.
+*/
+manifest.mainAttributes(
+ provider: 'gradle',
+ 'Implementation-Url': 'http://incubator.apache.org/projects/s4.html',
+ 'Implementation-Version': version,
+ 'Implementation-Vendor': 'Apache S4',
+ 'Implementation-Vendor-Id': 's4app',
+ 'S4-App-Class': appClassname, // gets set by the s4r task.
+ 'S4-Version': s4Version
+ )
+
+appDependencies = ( configurations.compile )
+
+/* This task will extract all the class files and create a fat jar. We set the manifest and the extension to make it an S4 archive file. */
+// TODO: exclude schenma files as needed (not critical) see: http://forums.gradle.org/gradle/topics/using_gradle_to_fat_jar_a_spring_project
+task s4r(type: Jar) {
+ dependsOn jar
+ from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
+ from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+ manifest = project.manifest
+ extension = 's4r'
+
+ /* Set class name in manifest. Parse source files until we find a class that extends App.
+ * Get fully qualified Java class name and set attribute in Manifest.
+ */
+ sourceSets.main.allSource.files.each { File file ->
+ if (appClassname =="" || appClassname == "UNKNOWN") {
+ // only execute the closure for this file if we haven't already found the app class name
+ appClassname = getAppClassname(file)
+ if(appClassname != "") {
+ manifest.mainAttributes('S4-App-Class': appClassname)
+ }
+ }
+ }
+
+ if (appClassname == "UNKNOWN") {
+
+ println "Couldn't find App class in source files...aborting."
+ exit(1)
+ }
+}
+
+/* List the artifacts that will br added to the s4 archive (and explode if needed). */
+s4r << {
+ appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
+ configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+
+ /* This is for debugging. */
+ //configurations.s4All.each { File file -> println 's4All: ' + file.name }
+ //deployableDependencies.each { File file -> println 'Deploy: ' + file.name }
+
+ // more debugging statements.
+ //sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
+
+}
+
+/* Install the S4 archive to the install directory. */
+task installS4R (type: Copy) {
+ dependsOn s4r
+ from s4r.archivePath
+ into s4AppInstallDir
+}
+
+/* Generates the gradlew scripts.
+http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
+task wrapper(type: Wrapper) { gradleVersion = '1.0-milestone-3' }
+
+
+/* Parse source file to get the app classname so we can use it in the manifest.
+* TODO: Use a real Java parser. (This is not skippong comments for example.)
+*/
+def getAppClassname(file) {
+ def classname = "UNKNOWN"
+ lines= file.readLines()
+ for(line in lines) {
+
+ def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
+ if(pn) {
+ packageName = pn[0][1] + "."
+ }
+
+ def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
+ if (an) {
+ classname = packageName + an[0][1]
+ println "Found app class name: " + classname
+ break
+ }
+
+ }
+ classname
+}
+
+class Version {
+ int major
+ int minor
+ int bugfix
+ boolean isRelease
+
+ String toString() {
+ "$major.$minor.$bugfix${isRelease ? '' : '-SNAPSHOT'}"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
new file mode 100644
index 0000000..3d9a9fb
--- /dev/null
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
@@ -0,0 +1,79 @@
+package org.apache.s4.example.twitter;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class TopNTopicPE extends ProcessingElement {
+
+ public TopNTopicPE(App app) {
+ super(app);
+ // TODO Auto-generated constructor stub
+ }
+
+ Map<String, Integer> countedTopics = Maps.newHashMap();
+ static Logger logger = LoggerFactory.getLogger(TopNTopicPE.class);
+
+ public void onEvent(TopicSeenEvent event) {
+ countedTopics.put(event.topic, event.count);
+ }
+
+ public void onTime() {
+ TreeSet<TopNEntry> sortedTopics = Sets.newTreeSet();
+ for (Map.Entry<String, Integer> topicCount : countedTopics.entrySet()) {
+ sortedTopics.add(new TopNEntry(topicCount.getKey(), topicCount.getValue()));
+ }
+
+ int i = 0;
+ Iterator<TopNEntry> iterator = sortedTopics.iterator();
+ long time = System.currentTimeMillis();
+ while (iterator.hasNext() && i < 10) {
+ TopNEntry entry = iterator.next();
+ logger.info("{} : topic [{}] count [{}]",
+ new String[] { String.valueOf(time), entry.topic, String.valueOf(entry.count) });
+ }
+ }
+
+ @Override
+ protected void onCreate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onRemove() {
+ // TODO Auto-generated method stub
+
+ }
+
+ class TopNEntry implements Comparable<TopNEntry> {
+ String topic = null;
+ int count = 0;
+
+ public TopNEntry(String topic, int count) {
+ this.topic = topic;
+ this.count = count;
+ }
+
+ public int compareTo(TopNEntry topNEntry) {
+ if (topNEntry.count < this.count) {
+ return -1;
+ } else if (topNEntry.count > this.count) {
+ return 1;
+ }
+ return 0;
+ }
+
+ public String toString() {
+ return "topic:" + topic + " count:" + count;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
new file mode 100644
index 0000000..a6d1478
--- /dev/null
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
@@ -0,0 +1,47 @@
+package org.apache.s4.example.twitter;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+
+// keyed by topic name
+public class TopicCountAndReportPE extends ProcessingElement {
+
+ Stream<TopicSeenEvent> downStream;
+ int threshold = 10;
+ int count;
+
+ public TopicCountAndReportPE(App app) {
+ super(app);
+ // TODO Auto-generated constructor stub
+ }
+
+ public void setDownstream(Stream<TopicSeenEvent> stream) {
+ this.downStream = stream;
+ }
+
+ public void onEvent(TopicSeenEvent event) {
+ count += event.count;
+ }
+
+ @Override
+ protected void onCreate() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void onTime() {
+ if (count < threshold) {
+ return;
+ }
+ TopicSeenEvent topicSeenEvent = new TopicSeenEvent(getId(), count);
+ downStream.put(topicSeenEvent);
+ }
+
+ @Override
+ protected void onRemove() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
new file mode 100644
index 0000000..501d6fd
--- /dev/null
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
@@ -0,0 +1,49 @@
+package org.apache.s4.example.twitter;
+
+import java.net.ServerSocket;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Streamable;
+
+import com.google.common.base.Splitter;
+
+public class TopicExtractorPE extends ProcessingElement {
+
+ static private ServerSocket serverSocket;
+ Streamable<TopicSeenEvent> downStream;
+
+ public TopicExtractorPE(App app) {
+ super(app);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ protected void onCreate() {
+
+ }
+
+ public void setDownStream(Streamable<TopicSeenEvent> stream) {
+ this.downStream = stream;
+ }
+
+ public void onEvent(Event event) {
+ String text = event.get("statusText", String.class);
+ if (text.contains("#")) {
+ Iterable<String> split = Splitter.on("#").omitEmptyStrings().trimResults()
+ .split(text.substring(text.indexOf("#") + 1, text.length()));
+ for (String topic : split) {
+ String topicOnly = topic.split(" ")[0];
+ downStream.put(new TopicSeenEvent(topicOnly, 1));
+ }
+ }
+ }
+
+ @Override
+ protected void onRemove() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
new file mode 100644
index 0000000..b28d61e
--- /dev/null
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
@@ -0,0 +1,17 @@
+package org.apache.s4.example.twitter;
+
+import org.apache.s4.base.Event;
+
+public class TopicSeenEvent extends Event {
+
+ public String topic;
+ public int count;
+ public String reportKey = "x";
+
+ public TopicSeenEvent(String topic, int count) {
+ super();
+ this.topic = topic;
+ this.count = count;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d11f7fbc/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
new file mode 100644
index 0000000..cf0fb40
--- /dev/null
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
@@ -0,0 +1,77 @@
+package org.apache.s4.example.twitter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+import org.apache.s4.core.Stream;
+
+public class TwitterCounterApp extends App {
+
+ private ZkClient zkClient;
+
+ private Thread t;
+
+ @Override
+ protected void onClose() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onInit() {
+ try {
+
+ TopNTopicPE topNTopicPE = createPE(TopNTopicPE.class);
+ topNTopicPE.setTimerInterval(10, TimeUnit.SECONDS);
+ @SuppressWarnings("unchecked")
+ Stream<TopicSeenEvent> aggregatedTopicStream = createStream("AggregatedTopicSeen", new KeyFinder() {
+
+ @Override
+ public List<String> get(Event arg0) {
+ return new ArrayList<String>() {
+ {
+ add("x");
+ }
+ };
+ }
+ }, topNTopicPE);
+
+ TopicCountAndReportPE topicCountAndReportPE = createPE(TopicCountAndReportPE.class);
+ topicCountAndReportPE.setDownstream(aggregatedTopicStream);
+ topicCountAndReportPE.setTimerInterval(10, TimeUnit.SECONDS);
+ Stream<TopicSeenEvent> topicSeenStream = createStream("TopicSeen", new KeyFinder<TopicSeenEvent>() {
+
+ @Override
+ public List<String> get(final TopicSeenEvent arg0) {
+ return new ArrayList<String>() {
+ {
+ add(arg0.topic);
+ }
+ };
+ }
+ }, topicCountAndReportPE);
+
+ TopicExtractorPE topicExtractorPE = createPE(TopicExtractorPE.class);
+ topicExtractorPE.setDownStream(topicSeenStream);
+ topicExtractorPE.setSingleton(true);
+ createStream("RawStatus", topicExtractorPE);
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void onStart() {
+ // try {
+ // t.start();
+ // } catch (Exception e) {
+ // throw new RuntimeException(e);
+ // }
+ }
+}