You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/15 16:06:01 UTC
[19/22] git commit: utility scripts + refactored twitter app -
scripts for defining cluster configurations,
starting nodes and uploading apps - removed custom event class for twitter app
utility scripts + refactored twitter app
- scripts for defining cluster configurations, starting nodes and
uploading apps
- removed custom event class for twitter app
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/91a7fff8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/91a7fff8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/91a7fff8
Branch: refs/heads/piper
Commit: 91a7fff86c1727ae6852d9dd0683d1078a77c86d
Parents: d11f7fb
Author: Matthieu Morel <mm...@apache.org>
Authored: Mon Mar 26 15:46:20 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Mar 29 13:45:33 2012 +0200
----------------------------------------------------------------------
build.gradle | 3 +-
s4 | 44 ++++
settings.gradle | 5 +
.../java/org/apache/s4/fixtures/CommTestUtils.java | 2 +-
.../src/main/java/org/apache/s4/core/App.java | 2 +
.../src/main/java/org/apache/s4/core/Main.java | 23 ++-
.../org/apache/s4/core/adapter/AdapterMain.java | 8 +-
.../java/org/apache/s4/deploy/util/DeployApp.java | 151 --------------
.../main/java/org/apache/s4/fluent/FluentApp.java | 11 -
.../test/java/org/apache/s4/fixtures/ZKServer.java | 67 ------
.../org/apache/s4/wordcount/WordClassifierPE.java | 19 +-
.../src/test/resources/default.s4.properties | 9 +-
.../resources/org.apache.s4.deploy.s4.properties | 4 +-
subprojects/s4-tools/s4-tools.gradle | 48 +++++
.../java/org/apache/s4/tools/DefineCluster.java | 65 ++++++
.../src/main/java/org/apache/s4/tools/Deploy.java | 162 +++++++++++++++
.../src/main/java/org/apache/s4/tools/Tools.java | 22 ++
.../main/java/org/apache/s4/tools/ZKServer.java | 75 +++++++
.../s4-counter/src/main/java/s4app/ClockApp.java | 25 +--
.../src/main/java/s4app/ShowTimeApp.java | 26 +--
.../main/java/org/apache/s4/deploy/SimplePE.java | 2 +-
.../main/java/org/apache/s4/deploy/TestApp.java | 2 +-
.../main/java/org/apache/s4/deploy/TestApp.java | 2 +-
test-apps/twitter-adapter/README.txt | 1 +
test-apps/twitter-adapter/build.gradle | 3 +
.../s4/example/twitter/TwitterInputAdapter.java | 19 ++-
.../src/main/resources/default.s4.properties | 18 --
.../src/main/resources/s4.properties | 18 ++
.../src/main/resources/twitter4j.properties | 5 -
test-apps/twitter-counter/README.txt | 33 +++
test-apps/twitter-counter/build.gradle | 1 +
.../org/apache/s4/example/twitter/TopNTopicPE.java | 16 +-
.../s4/example/twitter/TopicCountAndReportPE.java | 26 ++-
.../s4/example/twitter/TopicExtractorPE.java | 13 +-
.../apache/s4/example/twitter/TopicSeenEvent.java | 17 --
.../s4/example/twitter/TwitterCounterApp.java | 10 +-
.../src/main/resources/default.s4.properties | 15 ++
37 files changed, 602 insertions(+), 370 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 7db43d7..fd75057 100644
--- a/build.gradle
+++ b/build.gradle
@@ -72,7 +72,8 @@ libraries = [
junit: 'junit:junit:4.10',
zkclient: 'com.github.sgroschupf:zkclient:0.1',
diezel: 'net.ericaro:diezel-maven-plugin:1.0.0-beta-4',
- jcommander: 'com.beust:jcommander:1.23'
+ jcommander: 'com.beust:jcommander:1.23',
+ commons_io: 'commons-io:commons-io:2.1'
]
subprojects {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/s4
----------------------------------------------------------------------
diff --git a/s4 b/s4
new file mode 100755
index 0000000..a958cc2
--- /dev/null
+++ b/s4
@@ -0,0 +1,44 @@
+#!/bin/bash
+
+# NOTE: "./gradlew s4-tools:installApp" will prepare/update the tools subproject and related startup scripts
+
+echo $0
+echo $1
+
+GRADLE=`pwd`/gradlew
+
+case "$1" in
+"deploy")
+# examples:
+# ./s4 deploy -appName=twitter-counter -buildFile=<s4-dir>/test-apps/twitter-counter/build.gradle -cluster=s4-test-cluster
+# ./s4 deploy -appName=twitter-adapter -buildFile=<s4-dir>/test-apps/twitter-adapter/build.gradle -cluster=s4-adapter-cluster
+ shift
+ subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.tools.Deploy -gradle=$GRADLE $@
+;;
+"zkServer")
+ shift
+ subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.tools.ZKServer $@
+;;
+"newCluster")
+# examples:
+#./s4 newCluster -name=s4-test-cluster -firstListeningPort=11000 -nbTasks=2 ; ./s4 newCluster -name=s4-adapter-cluster -firstListeningPort=13000 -nbTasks=1
+ shift
+ subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.tools.DefineCluster $@
+;;
+"appNode")
+# example:
+# ./s4 appNode <s4-dir>/subprojects/s4-core/src/test/resources/default.s4.properties
+ shift
+ subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.core.Main $@
+;;
+"adapterNode")
+# example:
+# ./s4 adapterNode -s4Properties=<s4-dir>/test-apps/twitter-adapter/src/main/resources/s4.properties
+ shift
+ subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.core.adapter.AdapterMain $@
+;;
+
+
+
+esac
+
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index a5655c4..33827ba 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -17,8 +17,13 @@ include 's4-base'
include 's4-core'
include 's4-comm'
include 's4-example'
+include 's4-tools'
+//include 's4-example'
+//include ':test-apps:simple-adapter-1'
include ':test-apps:simple-deployable-app-1'
include ':test-apps:simple-deployable-app-2'
+include ':test-apps:s4-showtime'
+include ':test-apps:s4-counter'
rootProject.name = 's4'
rootProject.children.each {project ->
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
index e0a9456..7d7913d 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
@@ -38,7 +38,7 @@ public class CommTestUtils {
private static final Logger logger = LoggerFactory.getLogger(CommTestUtils.class);
- public static final int ZK_PORT = 21810;
+ public static final int ZK_PORT = 2181;
public static final int INITIAL_BOOKIE_PORT = 5000;
public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp");
public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 2575013..dd46439 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -18,6 +18,7 @@ package org.apache.s4.core;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.s4.base.Event;
@@ -25,6 +26,7 @@ import org.apache.s4.base.KeyFinder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Maps;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Inject;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index 71756a4..c18603c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -29,16 +29,19 @@ public class Main {
* @param args
*/
public static void main(String[] args) {
-
- if (args.length == 0) {
- logger.info("Starting S4 node with default configuration");
- startDefaultS4Node();
- } else if (args.length == 1) {
- logger.info("Starting S4 node with custom configuration from file {}", args[0]);
- startCustomS4Node(args[0]);
- } else {
- logger.info("Starting S4 node in development mode");
- startDevelopmentMode(args);
+ try {
+ if (args.length == 0) {
+ logger.info("Starting S4 node with default configuration");
+ startDefaultS4Node();
+ } else if (args.length == 1) {
+ logger.info("Starting S4 node with custom configuration from file {}", args[0]);
+ startCustomS4Node(args[0]);
+ } else {
+ logger.info("Starting S4 node in development mode");
+ startDevelopmentMode(args);
+ }
+ } catch (Exception e) {
+ logger.error("Cannot start S4 node", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
index a1c496c..f027f06 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
@@ -46,13 +46,7 @@ public class AdapterMain {
@Parameters(separators = "=")
static class AdapterArgs {
- @Parameter(names = "-moduleClass", description = "module class name")
- String moduleClass;
-
- @Parameter(names = "-adapterClass", description = "adapter class name")
- String adapterClass;
-
- @Parameter(names = "-s4Properties", description = "s4 properties file path")
+ @Parameter(names = "-s4Properties", description = "s4 properties file path", required = true)
String s4PropertiesFilePath;
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
deleted file mode 100644
index d8b77a4..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
+++ /dev/null
@@ -1,151 +0,0 @@
-package org.apache.s4.deploy.util;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import junit.framework.Assert;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.s4.comm.topology.ZNRecord;
-import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.deploy.DistributedDeploymentManager;
-import org.apache.zookeeper.CreateMode;
-import org.slf4j.LoggerFactory;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-
-public class DeployApp {
-
- private static File tmpAppsDir;
-
- /**
- * @param args
- */
- public static void main(String[] args) {
-
- DeployAppArgs appArgs = new DeployAppArgs();
- JCommander jc = new JCommander(appArgs);
- // configure log4j for Zookeeper
- BasicConfigurator.configure();
- Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
- Logger.getLogger("org.I0Itec").setLevel(Level.ERROR);
-
- try {
- jc.parse(args);
- } catch (Exception e) {
- jc.usage();
- System.exit(-1);
- }
- try {
- ZkClient zkClient = new ZkClient(appArgs.zkConnectionString);
- zkClient.setZkSerializer(new ZNRecordSerializer());
-
- tmpAppsDir = Files.createTempDir();
-
- // File gradlewFile = CoreTestUtils.findGradlewInRootDir();
-
- // CoreTestUtils.callGradleTask(gradlewFile, new File(appArgs.gradleBuildFilePath), "installS4R",
- // new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
- ExecGradle.exec(appArgs.gradleExecPath, appArgs.gradleBuildFilePath, "installS4R",
- new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
-
- File s4rToDeploy = File.createTempFile("testapp" + System.currentTimeMillis(), "s4r");
-
- Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath() + "/"
- + appArgs.appName + ".s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
-
- final String uri = s4rToDeploy.toURI().toString();
- ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
- record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
- zkClient.create("/" + appArgs.clusterName + "/apps/" + appArgs.appName, record, CreateMode.PERSISTENT);
-
- } catch (Exception e) {
- LoggerFactory.getLogger(DeployApp.class).error("Cannot deploy app", e);
- }
-
- }
-
- @Parameters(separators = "=")
- static class DeployAppArgs {
-
- @Parameter(names = "-gradle", description = "path to gradle/gradlew executable", required = true)
- String gradleExecPath;
-
- @Parameter(names = "-buildFile", description = "path to gradle build file for the S4 application", required = true)
- String gradleBuildFilePath;
-
- @Parameter(names = "-appName", description = "name of S4 application", required = true)
- String appName;
-
- @Parameter(names = "-cluster", description = "logical name of the S4 cluster", required = true)
- String clusterName;
-
- @Parameter(names = "-zk", description = "zookeeper connection string")
- String zkConnectionString = "localhost:2181";
-
- }
-
- static class ExecGradle {
-
- public static void exec(String gradlewExecPath, String buildFilePath, String taskName, String[] params)
- throws Exception {
- List<String> cmdList = new ArrayList<String>();
- cmdList.add(gradlewExecPath);
- // cmdList.add("-c");
- // cmdList.add(gradlewFile.getParentFile().getAbsolutePath() + "/settings.gradle");
- cmdList.add("-b");
- cmdList.add(buildFilePath);
- cmdList.add(taskName);
- if (params.length > 0) {
- for (int i = 0; i < params.length; i++) {
- cmdList.add("-P" + params[i]);
- }
- }
-
- System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
- ProcessBuilder pb = new ProcessBuilder(cmdList);
-
- pb.directory(new File(buildFilePath).getParentFile());
- pb.redirectErrorStream();
-
- final Process process = pb.start();
- new Thread(new Runnable() {
- @Override
- public void run() {
- BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
- String line;
- try {
- line = br.readLine();
- while (line != null) {
- System.out.println(line);
- line = br.readLine();
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }).start();
- process.waitFor();
-
- // try {
- // int exitValue = process.exitValue();
- // Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
- // } catch (IllegalThreadStateException ignored) {
- // }
-
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
index 9f66e0d..670c375 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
@@ -35,15 +35,4 @@ public class FluentApp extends App {
appMaker.close();
}
- public void start() {
- super.start();
- }
-
- public void init() {
- super.init();
- }
-
- public void close() {
- super.close();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
deleted file mode 100644
index b79f1a0..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package org.apache.s4.fixtures;
-
-import java.util.Arrays;
-
-import org.apache.s4.comm.tools.TaskSetup;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-
-public class ZKServer {
-
- private static Logger logger = LoggerFactory.getLogger(ZKServer.class);
-
- /**
- * @param args
- */
- public static void main(String[] args) {
- ZKServerArgs clusterArgs = new ZKServerArgs();
- JCommander jc = new JCommander(clusterArgs);
- try {
- jc.parse(args);
- } catch (Exception e) {
- System.out.println(Arrays.toString(args));
- e.printStackTrace();
- jc.usage();
- System.exit(-1);
- }
- try {
-
- logger.info("Starting zookeeper server for cluster [{}] with [{}] node(s)", clusterArgs.clusterName,
- clusterArgs.nbTasks);
- if (clusterArgs.startZK) {
- CommTestUtils.startZookeeperServer();
- }
- TaskSetup taskSetup = new TaskSetup(clusterArgs.zkConnectionString);
- taskSetup.clean(clusterArgs.clusterName);
- taskSetup.setup(clusterArgs.clusterName, clusterArgs.nbTasks, clusterArgs.firstListeningPort);
- logger.info("Zookeeper started");
- } catch (Exception e) {
- logger.error("Cannot initialize zookeeper with specified configuration", e);
- }
-
- }
-
- @Parameters(separators = "=", commandDescription = "Start Zookeeper server and initialize S4 cluster configuration in Zookeeper (and clean previous one with same cluster name)")
- static class ZKServerArgs {
-
- @Parameter(names = "-cluster", description = "S4 cluster name", required = true)
- String clusterName = "s4-test-cluster";
-
- @Parameter(names = "-nbTasks", description = "number of tasks for the cluster", required = true)
- int nbTasks = 1;
-
- @Parameter(names = "-zk", description = "Zookeeper connection string")
- String zkConnectionString = "localhost:21810";
-
- @Parameter(names = "-startZK", description = "Start local zookeeper server (connection string ignored in that case)", required = false)
- boolean startZK = false;
-
- @Parameter(names = "-firstListeningPort", description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
- int firstListeningPort = -1;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
index 820dbe5..dffe6cb 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
@@ -1,6 +1,5 @@
package org.apache.s4.wordcount;
-
import java.io.File;
import java.io.IOException;
import java.util.Map.Entry;
@@ -18,25 +17,25 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
-
public class WordClassifierPE extends ProcessingElement implements Watcher {
TreeMap<String, Integer> counts = new TreeMap<String, Integer>();
private int counter;
transient private ZooKeeper zk;
- private WordClassifierPE () {}
+ private WordClassifierPE() {
+ }
public WordClassifierPE(App app) {
super(app);
}
-
+
public void onEvent(WordCountEvent event) {
try {
WordCountEvent wcEvent = event;
if (zk == null) {
try {
- zk = new ZooKeeper("localhost:21810", 4000, this);
+ zk = new ZooKeeper("localhost:2181", 4000, this);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -73,8 +72,8 @@ public class WordClassifierPE extends ProcessingElement implements Watcher {
// zookeeper
zk.create("/classifierIteration_" + counter, new byte[counter], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- Logger.getLogger("s4-ft").debug("wrote classifier iteration ["+counter+"]");
- System.out.println("wrote classifier iteration ["+counter+"]");
+ Logger.getLogger("s4-ft").debug("wrote classifier iteration [" + counter + "]");
+ System.out.println("wrote classifier iteration [" + counter + "]");
// check if we are allowed to continue
if (null == zk.exists("/continue_" + counter, null)) {
CountDownLatch latch = new CountDownLatch(1);
@@ -96,19 +95,19 @@ public class WordClassifierPE extends ProcessingElement implements Watcher {
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
-
+
}
@Override
protected void onCreate() {
// TODO Auto-generated method stub
-
+
}
@Override
protected void onRemove() {
// TODO Auto-generated method stub
-
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/test/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/default.s4.properties b/subprojects/s4-core/src/test/resources/default.s4.properties
index 62fc7d5..2235032 100644
--- a/subprojects/s4-core/src/test/resources/default.s4.properties
+++ b/subprojects/s4-core/src/test/resources/default.s4.properties
@@ -4,6 +4,13 @@ cluster.hosts = localhost
cluster.ports = 5077
cluster.lock_dir = {user.dir}/tmp
cluster.name = s4-test-cluster
-cluster.zk_address = localhost:21810
+cluster.zk_address = localhost:2181
cluster.zk_session_timeout = 10000
cluster.zk_connection_timeout = 10000
+s4.logger_level = DEBUG
+comm.module = org.apache.s4.core.CustomModule
+appsDir=/tmp/deploy-test
+tcp.partition.queue_size=1000
+comm.timeout=100
+comm.retry_delay=100
+comm.retries=10
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
index 9a5d11a..3a566f2 100644
--- a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
+++ b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
@@ -3,9 +3,9 @@ comm.queue_listener_size = 8000
cluster.hosts = localhost
cluster.ports = 5077
cluster.name = s4-test-cluster
-cluster.zk_address = localhost:21810
+cluster.zk_address = localhost:2181
cluster.zk_session_timeout = 10000
cluster.zk_connection_timeout = 10000
comm.module = org.apache.s4.core.adapter.AdapterModule
-s4.logger_level = DEBUG
+s4.logger_level = TRACE
appsDir=/tmp/deploy-test
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/s4-tools.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/s4-tools.gradle b/subprojects/s4-tools/s4-tools.gradle
new file mode 100644
index 0000000..68c728e
--- /dev/null
+++ b/subprojects/s4-tools/s4-tools.gradle
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2010 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+description = 'The S4 core platform.'
+
+apply plugin: 'java'
+
+task "create-dirs" << {
+ sourceSets.all*.java.srcDirs*.each { it.mkdirs() }
+ sourceSets.all*.resources.srcDirs*.each { it.mkdirs() }
+}
+
+dependencies {
+ compile project(":s4-base")
+ compile project(":s4-comm")
+ compile project(":s4-core")
+ compile libraries.jcommander
+ compile libraries.zkclient
+ compile libraries.commons_io
+}
+
+apply plugin:'application'
+mainClassName = "org.apache.s4.tools.Tools"
+
+run {
+ // run doesn't yet directly accept command line parameters...
+ if ( project.hasProperty('args') ) {
+ args project.args.split('\\s+')
+ print args
+ }
+ }
+
+test {
+ forkEvery=1
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
new file mode 100644
index 0000000..4fc09f0
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
@@ -0,0 +1,65 @@
+package org.apache.s4.tools;
+
+import java.util.Arrays;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Level;
+import org.apache.s4.comm.tools.TaskSetup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class DefineCluster {
+
+ static Logger logger = LoggerFactory.getLogger(DefineCluster.class);
+
+ public static void main(String[] args) {
+ // configure log4j for Zookeeper
+ BasicConfigurator.configure();
+ org.apache.log4j.Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
+ org.apache.log4j.Logger.getLogger("org.I0Itec").setLevel(Level.ERROR);
+
+ ZKServerArgs clusterArgs = new ZKServerArgs();
+ JCommander jc = new JCommander(clusterArgs);
+ try {
+ jc.parse(args);
+ } catch (Exception e) {
+ System.out.println(Arrays.toString(args));
+ e.printStackTrace();
+ jc.usage();
+ System.exit(-1);
+ }
+ try {
+
+ logger.info("preparing new cluster [{}] with [{}] node(s)", clusterArgs.clusterName, clusterArgs.nbTasks);
+
+ TaskSetup taskSetup = new TaskSetup(clusterArgs.zkConnectionString);
+ taskSetup.clean(clusterArgs.clusterName);
+ taskSetup.setup(clusterArgs.clusterName, clusterArgs.nbTasks, clusterArgs.firstListeningPort);
+ logger.info("New cluster configuration uploaded into zookeeper");
+ } catch (Exception e) {
+ logger.error("Cannot initialize zookeeper with specified configuration", e);
+ }
+
+ }
+
+ @Parameters(separators = "=", commandDescription = "Setup new S4 logical cluster")
+ static class ZKServerArgs {
+
+ @Parameter(names = "-name", description = "S4 cluster name", required = true)
+ String clusterName = "s4-test-cluster";
+
+ @Parameter(names = "-nbTasks", description = "number of tasks for the cluster", required = true)
+ int nbTasks = 1;
+
+ @Parameter(names = "-zk", description = "Zookeeper connection string")
+ String zkConnectionString = "localhost:2181";
+
+ @Parameter(names = "-firstListeningPort", description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
+ int firstListeningPort = -1;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
new file mode 100644
index 0000000..1b2eb7d
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
@@ -0,0 +1,162 @@
+package org.apache.s4.tools;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+
+public class Deploy {
+
+ private static File tmpAppsDir;
+ static org.slf4j.Logger logger = LoggerFactory.getLogger(Deploy.class);
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+
+ DeployAppArgs appArgs = new DeployAppArgs();
+ JCommander jc = new JCommander(appArgs);
+ // configure log4j for Zookeeper
+ BasicConfigurator.configure();
+ Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
+ Logger.getLogger("org.I0Itec").setLevel(Level.ERROR);
+
+ try {
+ jc.parse(args);
+ } catch (Exception e) {
+ e.printStackTrace();
+ jc.usage();
+ System.exit(-1);
+ }
+ try {
+ ZkClient zkClient = new ZkClient(appArgs.zkConnectionString, appArgs.timeout);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ tmpAppsDir = Files.createTempDir();
+
+ File s4rToDeploy = File.createTempFile("testapp" + System.currentTimeMillis(), "s4r");
+
+ String generatedS4RPath = null;
+
+ ExecGradle.exec(appArgs.gradleExecPath, appArgs.gradleBuildFilePath, "installS4R",
+ new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
+ generatedS4RPath = tmpAppsDir.getAbsolutePath() + "/" + appArgs.appName + ".s4r";
+
+ Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(generatedS4RPath)),
+ Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
+
+ final String uri = s4rToDeploy.toURI().toString();
+ ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
+ record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
+ zkClient.create("/" + appArgs.clusterName + "/apps/" + appArgs.appName, record, CreateMode.PERSISTENT);
+ logger.info("uploaded application [{}] to cluster [{}], using zookeeper znode [{}]", new String[] {
+ appArgs.appName, appArgs.clusterName, "/" + appArgs.clusterName + "/apps/" + appArgs.appName });
+
+ } catch (Exception e) {
+ LoggerFactory.getLogger(Deploy.class).error("Cannot deploy app", e);
+ }
+
+ }
+
+ @Parameters(separators = "=")
+ static class DeployAppArgs {
+
+ @Parameter(names = "-gradle", description = "path to gradle/gradlew executable", required = true)
+ String gradleExecPath;
+
+ @Parameter(names = "-buildFile", description = "path to gradle build file for the S4 application", required = true)
+ String gradleBuildFilePath;
+
+ @Parameter(names = "-appName", description = "name of S4 application", required = true)
+ String appName;
+
+ @Parameter(names = "-cluster", description = "logical name of the S4 cluster", required = true)
+ String clusterName;
+
+ @Parameter(names = "-zk", description = "zookeeper connection string")
+ String zkConnectionString = "localhost:2181";
+
+ @Parameter(names = "-timeout", description = "connection timeout to Zookeeper, in ms")
+ int timeout = 10000;
+
+ }
+
+ static class ExecGradle {
+
+ public static void exec(String gradlewExecPath, String buildFilePath, String taskName, String[] params)
+ throws Exception {
+ // Thread.sleep(10000);
+ List<String> cmdList = new ArrayList<String>();
+ // cmdList.add("sleep");
+ // cmdList.add("2");
+ // cmdList.add(";");
+ cmdList.add(gradlewExecPath);
+ // cmdList.add("-c");
+ // cmdList.add(gradlewFile.getParentFile().getAbsolutePath() + "/settings.gradle");
+ cmdList.add("-b");
+ cmdList.add(buildFilePath);
+ cmdList.add(taskName);
+ cmdList.add("-stacktrace");
+ if (params.length > 0) {
+ for (int i = 0; i < params.length; i++) {
+ cmdList.add("-P" + params[i]);
+ }
+ }
+ System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
+ ProcessBuilder pb = new ProcessBuilder(cmdList);
+
+ pb.directory(new File(buildFilePath).getParentFile());
+ pb.redirectErrorStream();
+
+ final Process process = pb.start();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ String line;
+ try {
+ line = br.readLine();
+ while (line != null) {
+ System.out.println(line);
+ line = br.readLine();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }).start();
+
+ process.waitFor();
+
+ // try {
+ // int exitValue = process.exitValue();
+ // Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
+ // } catch (IllegalThreadStateException ignored) {
+ // }
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
new file mode 100644
index 0000000..ae53cb6
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -0,0 +1,22 @@
+package org.apache.s4.tools;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+
+public class Tools {
+
+ public static void main(String[] args) {
+ try {
+ Class<?> toolClass = Class.forName(args[0]);
+ Method main = toolClass.getMethod("main", String[].class);
+ if (args.length > 1) {
+ main.invoke(null, new Object[] { Arrays.copyOfRange(args, 1, args.length) });
+ } else {
+ main.invoke(null, new Object[] { new String[0] });
+ }
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
new file mode 100644
index 0000000..cb4c85c
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
@@ -0,0 +1,75 @@
+package org.apache.s4.tools;
+
+import java.io.File;
+import java.util.Arrays;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class ZKServer {
+
+ static Logger logger = LoggerFactory.getLogger(ZKServer.class);
+
+ public static void main(String[] args) {
+ ZKServerArgs zkArgs = new ZKServerArgs();
+ JCommander jc = new JCommander(zkArgs);
+ try {
+ jc.parse(args);
+ } catch (Exception e) {
+ System.out.println(Arrays.toString(args));
+ e.printStackTrace();
+ jc.usage();
+ System.exit(-1);
+ }
+ try {
+
+ logger.info("Starting zookeeper server on port [{}]", zkArgs.zkPort);
+
+ if (zkArgs.clean) {
+ logger.info("cleaning existing data in [{}] and [{}]", zkArgs.dataDir, zkArgs.logDir);
+ FileUtils.deleteDirectory(new File(zkArgs.dataDir));
+ FileUtils.deleteDirectory(new File(zkArgs.logDir));
+ }
+ IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient) {
+
+ }
+ };
+
+ ZkServer zkServer = new ZkServer(zkArgs.dataDir, zkArgs.logDir, defaultNameSpace);
+ zkServer.start();
+ } catch (Exception e) {
+ logger.error("Cannot initialize zookeeper with specified configuration", e);
+ }
+ }
+
+ @Parameters(separators = "=", commandDescription = "Start Zookeeper server")
+ static class ZKServerArgs {
+
+ @Parameter(names = "-port", description = "Zookeeper port")
+ String zkPort = "2181";
+
+ @Parameter(names = "-dataDir", description = "data directory", required = false)
+ String dataDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
+ + "zookeeper" + File.separator + "data").getAbsolutePath();
+
+ @Parameter(names = "-clean", description = "clean zookeeper data (arity=0) (make sure you specify correct directories...)")
+ boolean clean = true;
+
+ @Parameter(names = "-logDir", description = "log directory")
+ String logDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
+ + "zookeeper" + File.separator + "log").getAbsolutePath();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
----------------------------------------------------------------------
diff --git a/test-apps/s4-counter/src/main/java/s4app/ClockApp.java b/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
index 316b876..cab4eab 100644
--- a/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
+++ b/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
@@ -12,44 +12,27 @@ public class ClockApp extends App {
private ClockPE clockPE;
@Override
- protected void start() {
+ protected void onStart() {
System.out.println("Starting CounterApp...");
clockPE.getInstanceForKey("single");
}
// generic array due to varargs generates a warning.
@Override
- protected void init() {
+ protected void onInit() {
System.out.println("Initing CounterApp...");
clockPE = new ClockPE(this);
clockPE.setTimerInterval(1, TimeUnit.SECONDS);
- eventSource = new EventSource(this, "I can give you the time!");
+ eventSource = new EventSource(this, "clockStream");
clockPE.setStreams((Streamable) eventSource);
}
@Override
- protected void close() {
+ protected void onClose() {
System.out.println("Closing CounterApp...");
eventSource.close();
}
- @Override
- protected void onStart() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- protected void onInit() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- protected void onClose() {
- // TODO Auto-generated method stub
-
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
----------------------------------------------------------------------
diff --git a/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java b/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
index 1c92c57..6652f59 100644
--- a/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
+++ b/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
@@ -7,41 +7,23 @@ public class ShowTimeApp extends App {
private ShowPE showPE;
@Override
- protected void start() {
+ protected void onStart() {
System.out.println("Starting ShowTimeApp...");
showPE.getInstanceForKey("single");
}
@Override
- protected void init() {
+ protected void onInit() {
System.out.println("Initing ShowTimeApp...");
showPE = new ShowPE(this);
/* This stream will receive events from another app. */
- createStream("I need the time.", showPE);
- }
-
- @Override
- protected void close() {
- System.out.println("Closing ShowTimeApp...");
- }
-
- @Override
- protected void onStart() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- protected void onInit() {
- // TODO Auto-generated method stub
-
+ createStream("clockStream", showPE);
}
@Override
protected void onClose() {
- // TODO Auto-generated method stub
-
+ System.out.println("Closing ShowTimeApp...");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
index 809cebd..8cb5843 100644
--- a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
+++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
@@ -41,7 +41,7 @@ public class SimplePE extends ProcessingElement implements Watcher {
protected void onCreate() {
if (zk == null) {
try {
- zk = new ZooKeeper("localhost:" + 21810, 4000, this);
+ zk = new ZooKeeper("localhost:" + 2181, 4000, this);
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
index 25b98a6..bbfaf17 100644
--- a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
+++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
@@ -39,7 +39,7 @@ public class TestApp extends App {
} catch (IOException e) {
throw new RuntimeException(e);
}
- zkClient = new ZkClient("localhost:" + 21810);
+ zkClient = new ZkClient("localhost:" + 2181);
if (!zkClient.exists("/s4-test")) {
zkClient.create("/s4-test", null, CreateMode.PERSISTENT);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
index 0657ee9..8eb345b 100644
--- a/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
+++ b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
@@ -17,7 +17,7 @@ public class TestApp extends App {
@Override
protected void onInit() {
try {
- zkClient = new ZkClient("localhost:" + 21810);
+ zkClient = new ZkClient("localhost:" + 2181);
if (!zkClient.exists("/s4-test")) {
zkClient.create("/s4-test", null, CreateMode.PERSISTENT);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/README.txt
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/README.txt b/test-apps/twitter-adapter/README.txt
new file mode 100644
index 0000000..f9bda45
--- /dev/null
+++ b/test-apps/twitter-adapter/README.txt
@@ -0,0 +1 @@
+Please refer to README.txt in twitter-counter application
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/build.gradle b/test-apps/twitter-adapter/build.gradle
index 669d62b..100837d 100644
--- a/test-apps/twitter-adapter/build.gradle
+++ b/test-apps/twitter-adapter/build.gradle
@@ -49,6 +49,9 @@ group = 'org.apache.s4'
apply plugin: 'java'
apply plugin: 'eclipse'
+apply plugin:'application'
+
+mainClassName = "org.apache.s4.core.Main"
/* The app classname is set automatically from the source files. */
def appClassname = ''
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
index ee905d9..102ca10 100644
--- a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
+++ b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
@@ -1,6 +1,9 @@
package org.apache.s4.example.twitter;
+import java.io.File;
+import java.io.FileInputStream;
import java.net.ServerSocket;
+import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import org.I0Itec.zkclient.ZkClient;
@@ -15,6 +18,7 @@ import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
+import twitter4j.conf.ConfigurationBuilder;
public class TwitterInputAdapter extends Adapter {
@@ -49,7 +53,20 @@ public class TwitterInputAdapter extends Adapter {
public void connectAndRead() throws Exception {
- TwitterStream twitterStream = TwitterStreamFactory.getSingleton();
+ ConfigurationBuilder cb = new ConfigurationBuilder();
+ Properties twitterProperties = new Properties();
+ File twitter4jPropsFile = new File(System.getProperty("user.home") + "/twitter4j.properties");
+ if (!twitter4jPropsFile.exists()) {
+ logger.error(
+ "Cannot find twitter4j.properties file in this location :[{}]. Make sure it is available at this place and includes user/password credentials",
+ twitter4jPropsFile.getAbsolutePath());
+ return;
+ }
+ twitterProperties.load(new FileInputStream(twitter4jPropsFile));
+
+ cb.setDebugEnabled(Boolean.valueOf(twitterProperties.getProperty("debug")))
+ .setUser(twitterProperties.getProperty("user")).setPassword(twitterProperties.getProperty("password"));
+ TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
StatusListener statusListener = new StatusListener() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/src/main/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/src/main/resources/default.s4.properties b/test-apps/twitter-adapter/src/main/resources/default.s4.properties
deleted file mode 100644
index cd36aaa..0000000
--- a/test-apps/twitter-adapter/src/main/resources/default.s4.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-comm.queue_emmiter_size = 8000
-comm.queue_listener_size = 8000
-cluster.hosts = localhost
-cluster.ports = 5077
-cluster.name = s4-adapter-cluster
-cluster.zk_address = localhost:21810
-cluster.zk_session_timeout = 10000
-cluster.zk_connection_timeout = 10000
-comm.module = org.apache.s4.deploy.TestModule
-s4.logger_level = TRACE
-appsDir=/tmp/deploy-test
-tcp.partition.queue_size=1000
-comm.timeout=100
-comm.retry_delay=100
-comm.retries=10
-
-# specify the name of the remote cluster (there is currently only 1 remote cluster max)
-cluster.remote.name=s4-test-cluster
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/src/main/resources/s4.properties
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/src/main/resources/s4.properties b/test-apps/twitter-adapter/src/main/resources/s4.properties
new file mode 100644
index 0000000..fdd1bf6
--- /dev/null
+++ b/test-apps/twitter-adapter/src/main/resources/s4.properties
@@ -0,0 +1,18 @@
+comm.queue_emmiter_size = 8000
+comm.queue_listener_size = 8000
+cluster.hosts = localhost
+cluster.ports = 5077
+cluster.name = s4-adapter-cluster
+cluster.zk_address = localhost:2181
+cluster.zk_session_timeout = 10000
+cluster.zk_connection_timeout = 10000
+comm.module = org.apache.s4.core.adapter.AdapterModule
+s4.logger_level = DEBUG
+appsDir=/tmp/deploy-test
+tcp.partition.queue_size=1000
+comm.timeout=100
+comm.retry_delay=100
+comm.retries=10
+
+# specify the name of the remote cluster (there is currently only 1 remote cluster max)
+cluster.remote.name=s4-test-cluster
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/src/main/resources/twitter4j.properties b/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
deleted file mode 100644
index 7d58c7d..0000000
--- a/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
+++ /dev/null
@@ -1,5 +0,0 @@
-debug=true
-# you need to set those parameters with valid twitter account credentials
-twitter4j.user=????
-twitter4j.password=????
-
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/README.txt
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/README.txt b/test-apps/twitter-counter/README.txt
new file mode 100644
index 0000000..d46b491
--- /dev/null
+++ b/test-apps/twitter-counter/README.txt
@@ -0,0 +1,33 @@
+An application that displays the current top 10 topics, as gathered from the twitter sample stream.
+It was ported and adapted from S4 0.3
+
+Architecture:
+- twitter-adapter app in adapter node connects to the twitter stream, extracts the twitted text and passes that to the application cluster
+- twitter-counter app in the application cluster receives the text of the tweets, extracts the topics, counts topic occurences and periodically displays the top 10 topics on the console
+
+How to configure:
+- you need a twitter4j.properties file in your home dir, with the following properties filled:
+debug=true|false
+user=<a twitter user name>
+password=<the matching password>
+
+How to run:
+0/ make sure tools are compiled by running ./gradlew s4-tools:installApp
+
+1/ start zookeeper server
+./s4 zkServer
+
+2/ create adapter cluster configuration
+./s4 newCluster -name=s4-test-cluster -firstListeningPort=10000 -nbTasks=1
+
+3/ create application cluster configuration
+./s4 newCluster -name=s4-adapter-cluster -firstListeningPort=11000 -nbTasks=<number of nodes>
+NOTE: - the name of the downstream cluster is currently hardcoded in <s4-dir>/test-apps/twitter-adapter/src/main/resources/s4.properties Make sure you use the same name
+
+4/ start application nodes (as many as defined in the cluster configuration, more for failover capabilities)
+./s4 appNode <s4-dir>/subprojects/s4-core/src/test/resources/default.s4.properties
+
+5/ start adapter node
+./s4 adapterNode -s4Properties=<s4-dir>/test-apps/twitter-adapter/src/main/resources/s4.properties
+
+6/ observe the topic count output (on 1 of the application nodes)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/build.gradle b/test-apps/twitter-counter/build.gradle
index e737b40..e10eae0 100644
--- a/test-apps/twitter-counter/build.gradle
+++ b/test-apps/twitter-counter/build.gradle
@@ -49,6 +49,7 @@ group = 'org.apache.s4'
apply plugin: 'java'
apply plugin: 'eclipse'
+apply plugin:'application'
/* The app classname is set automatically from the source files. */
def appClassname = ''
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
index 3d9a9fb..6fd68b5 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
@@ -4,6 +4,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.TreeSet;
+import org.apache.s4.base.Event;
import org.apache.s4.core.App;
import org.apache.s4.core.ProcessingElement;
import org.slf4j.Logger;
@@ -14,16 +15,16 @@ import com.google.common.collect.Sets;
public class TopNTopicPE extends ProcessingElement {
+ static Logger logger = LoggerFactory.getLogger(TopNTopicPE.class);
+ Map<String, Integer> countedTopics = Maps.newHashMap();
+
public TopNTopicPE(App app) {
super(app);
- // TODO Auto-generated constructor stub
+ logger.info("key: [{}]", getId());
}
- Map<String, Integer> countedTopics = Maps.newHashMap();
- static Logger logger = LoggerFactory.getLogger(TopNTopicPE.class);
-
- public void onEvent(TopicSeenEvent event) {
- countedTopics.put(event.topic, event.count);
+ public void onEvent(Event event) {
+ countedTopics.put(event.get("topic"), event.get("count", Integer.class));
}
public void onTime() {
@@ -32,6 +33,8 @@ public class TopNTopicPE extends ProcessingElement {
sortedTopics.add(new TopNEntry(topicCount.getKey(), topicCount.getValue()));
}
+ logger.info("\n------------------");
+
int i = 0;
Iterator<TopNEntry> iterator = sortedTopics.iterator();
long time = System.currentTimeMillis();
@@ -39,6 +42,7 @@ public class TopNTopicPE extends ProcessingElement {
TopNEntry entry = iterator.next();
logger.info("{} : topic [{}] count [{}]",
new String[] { String.valueOf(time), entry.topic, String.valueOf(entry.count) });
+ i++;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
index a6d1478..5a41231 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
@@ -1,27 +1,36 @@
package org.apache.s4.example.twitter;
+import org.apache.s4.base.Event;
import org.apache.s4.core.App;
import org.apache.s4.core.ProcessingElement;
import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
// keyed by topic name
public class TopicCountAndReportPE extends ProcessingElement {
- Stream<TopicSeenEvent> downStream;
+ Stream<Event> downStream;
int threshold = 10;
int count;
+ boolean firstEvent = true;
+
+ static Logger logger = LoggerFactory.getLogger(TopicCountAndReportPE.class);
public TopicCountAndReportPE(App app) {
super(app);
- // TODO Auto-generated constructor stub
}
- public void setDownstream(Stream<TopicSeenEvent> stream) {
+ public void setDownstream(Stream<Event> stream) {
this.downStream = stream;
}
- public void onEvent(TopicSeenEvent event) {
- count += event.count;
+ public void onEvent(Event event) {
+ if (firstEvent) {
+ logger.info("Handling new topic [{}]", getId());
+ firstEvent = false;
+ }
+ count += event.get("count", Integer.class);
}
@Override
@@ -34,14 +43,15 @@ public class TopicCountAndReportPE extends ProcessingElement {
if (count < threshold) {
return;
}
- TopicSeenEvent topicSeenEvent = new TopicSeenEvent(getId(), count);
+ Event topicSeenEvent = new Event();
+ topicSeenEvent.put("topic", String.class, getId());
+ topicSeenEvent.put("count", Integer.class, count);
+ topicSeenEvent.put("aggregationKey", String.class, "aggregationValue");
downStream.put(topicSeenEvent);
}
@Override
protected void onRemove() {
- // TODO Auto-generated method stub
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
index 501d6fd..9936ed0 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
@@ -6,13 +6,16 @@ import org.apache.s4.base.Event;
import org.apache.s4.core.App;
import org.apache.s4.core.ProcessingElement;
import org.apache.s4.core.Streamable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Splitter;
public class TopicExtractorPE extends ProcessingElement {
static private ServerSocket serverSocket;
- Streamable<TopicSeenEvent> downStream;
+ Streamable<Event> downStream;
+ static Logger logger = LoggerFactory.getLogger(TopicExtractorPE.class);
public TopicExtractorPE(App app) {
super(app);
@@ -24,18 +27,22 @@ public class TopicExtractorPE extends ProcessingElement {
}
- public void setDownStream(Streamable<TopicSeenEvent> stream) {
+ public void setDownStream(Streamable<Event> stream) {
this.downStream = stream;
}
public void onEvent(Event event) {
String text = event.get("statusText", String.class);
+ logger.trace("event text [{}]", text);
if (text.contains("#")) {
Iterable<String> split = Splitter.on("#").omitEmptyStrings().trimResults()
.split(text.substring(text.indexOf("#") + 1, text.length()));
for (String topic : split) {
String topicOnly = topic.split(" ")[0];
- downStream.put(new TopicSeenEvent(topicOnly, 1));
+ Event event2 = new Event();
+ event2.put("topic", String.class, topicOnly);
+ event2.put("count", Integer.class, 1);
+ downStream.put(event2);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
deleted file mode 100644
index b28d61e..0000000
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.s4.example.twitter;
-
-import org.apache.s4.base.Event;
-
-public class TopicSeenEvent extends Event {
-
- public String topic;
- public int count;
- public String reportKey = "x";
-
- public TopicSeenEvent(String topic, int count) {
- super();
- this.topic = topic;
- this.count = count;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
index cf0fb40..f6edd8c 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
@@ -29,13 +29,13 @@ public class TwitterCounterApp extends App {
TopNTopicPE topNTopicPE = createPE(TopNTopicPE.class);
topNTopicPE.setTimerInterval(10, TimeUnit.SECONDS);
@SuppressWarnings("unchecked")
- Stream<TopicSeenEvent> aggregatedTopicStream = createStream("AggregatedTopicSeen", new KeyFinder() {
+ Stream<Event> aggregatedTopicStream = createStream("AggregatedTopicSeen", new KeyFinder() {
@Override
public List<String> get(Event arg0) {
return new ArrayList<String>() {
{
- add("x");
+ add("aggregationKey");
}
};
}
@@ -44,13 +44,13 @@ public class TwitterCounterApp extends App {
TopicCountAndReportPE topicCountAndReportPE = createPE(TopicCountAndReportPE.class);
topicCountAndReportPE.setDownstream(aggregatedTopicStream);
topicCountAndReportPE.setTimerInterval(10, TimeUnit.SECONDS);
- Stream<TopicSeenEvent> topicSeenStream = createStream("TopicSeen", new KeyFinder<TopicSeenEvent>() {
+ Stream<Event> topicSeenStream = createStream("TopicSeen", new KeyFinder<Event>() {
@Override
- public List<String> get(final TopicSeenEvent arg0) {
+ public List<String> get(final Event arg0) {
return new ArrayList<String>() {
{
- add(arg0.topic);
+ add(arg0.get("topic"));
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/resources/default.s4.properties b/test-apps/twitter-counter/src/main/resources/default.s4.properties
new file mode 100644
index 0000000..d5da3f3
--- /dev/null
+++ b/test-apps/twitter-counter/src/main/resources/default.s4.properties
@@ -0,0 +1,15 @@
+comm.queue_emmiter_size = 8000
+comm.queue_listener_size = 8000
+cluster.hosts = localhost
+cluster.ports = 5077
+cluster.name = s4-adapter-cluster
+cluster.zk_address = localhost:21810
+cluster.zk_session_timeout = 10000
+cluster.zk_connection_timeout = 10000
+comm.module = org.apache.s4.core.CustomModule
+s4.logger_level = DEBUG
+appsDir=/tmp/deploy-test
+tcp.partition.queue_size=1000
+comm.timeout=100
+comm.retry_delay=100
+comm.retries=10