You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/15 16:06:01 UTC

[19/22] git commit: utility scripts + refactored twitter app - scripts for defining cluster configurations, starting nodes and uploading apps - removed custom event class for twitter app

utility scripts + refactored twitter app
- scripts for defining cluster configurations, starting nodes and
uploading apps
- removed custom event class for twitter app


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/91a7fff8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/91a7fff8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/91a7fff8

Branch: refs/heads/piper
Commit: 91a7fff86c1727ae6852d9dd0683d1078a77c86d
Parents: d11f7fb
Author: Matthieu Morel <mm...@apache.org>
Authored: Mon Mar 26 15:46:20 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Mar 29 13:45:33 2012 +0200

----------------------------------------------------------------------
 build.gradle                                       |    3 +-
 s4                                                 |   44 ++++
 settings.gradle                                    |    5 +
 .../java/org/apache/s4/fixtures/CommTestUtils.java |    2 +-
 .../src/main/java/org/apache/s4/core/App.java      |    2 +
 .../src/main/java/org/apache/s4/core/Main.java     |   23 ++-
 .../org/apache/s4/core/adapter/AdapterMain.java    |    8 +-
 .../java/org/apache/s4/deploy/util/DeployApp.java  |  151 --------------
 .../main/java/org/apache/s4/fluent/FluentApp.java  |   11 -
 .../test/java/org/apache/s4/fixtures/ZKServer.java |   67 ------
 .../org/apache/s4/wordcount/WordClassifierPE.java  |   19 +-
 .../src/test/resources/default.s4.properties       |    9 +-
 .../resources/org.apache.s4.deploy.s4.properties   |    4 +-
 subprojects/s4-tools/s4-tools.gradle               |   48 +++++
 .../java/org/apache/s4/tools/DefineCluster.java    |   65 ++++++
 .../src/main/java/org/apache/s4/tools/Deploy.java  |  162 +++++++++++++++
 .../src/main/java/org/apache/s4/tools/Tools.java   |   22 ++
 .../main/java/org/apache/s4/tools/ZKServer.java    |   75 +++++++
 .../s4-counter/src/main/java/s4app/ClockApp.java   |   25 +--
 .../src/main/java/s4app/ShowTimeApp.java           |   26 +--
 .../main/java/org/apache/s4/deploy/SimplePE.java   |    2 +-
 .../main/java/org/apache/s4/deploy/TestApp.java    |    2 +-
 .../main/java/org/apache/s4/deploy/TestApp.java    |    2 +-
 test-apps/twitter-adapter/README.txt               |    1 +
 test-apps/twitter-adapter/build.gradle             |    3 +
 .../s4/example/twitter/TwitterInputAdapter.java    |   19 ++-
 .../src/main/resources/default.s4.properties       |   18 --
 .../src/main/resources/s4.properties               |   18 ++
 .../src/main/resources/twitter4j.properties        |    5 -
 test-apps/twitter-counter/README.txt               |   33 +++
 test-apps/twitter-counter/build.gradle             |    1 +
 .../org/apache/s4/example/twitter/TopNTopicPE.java |   16 +-
 .../s4/example/twitter/TopicCountAndReportPE.java  |   26 ++-
 .../s4/example/twitter/TopicExtractorPE.java       |   13 +-
 .../apache/s4/example/twitter/TopicSeenEvent.java  |   17 --
 .../s4/example/twitter/TwitterCounterApp.java      |   10 +-
 .../src/main/resources/default.s4.properties       |   15 ++
 37 files changed, 602 insertions(+), 370 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 7db43d7..fd75057 100644
--- a/build.gradle
+++ b/build.gradle
@@ -72,7 +72,8 @@ libraries = [
     junit:              'junit:junit:4.10',
     zkclient:           'com.github.sgroschupf:zkclient:0.1',
     diezel:             'net.ericaro:diezel-maven-plugin:1.0.0-beta-4',
-    jcommander:         'com.beust:jcommander:1.23'
+    jcommander:         'com.beust:jcommander:1.23',
+    commons_io:         'commons-io:commons-io:2.1'
 ]
 
 subprojects {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/s4
----------------------------------------------------------------------
diff --git a/s4 b/s4
new file mode 100755
index 0000000..a958cc2
--- /dev/null
+++ b/s4
@@ -0,0 +1,44 @@
+#!/bin/bash
+
+# NOTE: "./gradlew s4-tools:installApp" will prepare/update the tools subproject and related startup scripts
+
+echo $0
+echo $1
+
+GRADLE=`pwd`/gradlew
+
+case "$1" in
+"deploy")
+# examples:
+# ./s4 deploy -appName=twitter-counter -buildFile=<s4-dir>/test-apps/twitter-counter/build.gradle -cluster=s4-test-cluster
+# ./s4 deploy -appName=twitter-adapter -buildFile=<s4-dir>/test-apps/twitter-adapter/build.gradle -cluster=s4-adapter-cluster
+	shift
+    subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.tools.Deploy -gradle=$GRADLE $@
+;;
+"zkServer")
+	shift
+    subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.tools.ZKServer $@
+;;
+"newCluster")
+# examples:
+#./s4 newCluster -name=s4-test-cluster -firstListeningPort=11000 -nbTasks=2 ; ./s4 newCluster -name=s4-adapter-cluster -firstListeningPort=13000 -nbTasks=1
+	shift
+    subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.tools.DefineCluster $@
+;;
+"appNode")
+# example:
+# ./s4 appNode <s4-dir>/subprojects/s4-core/src/test/resources/default.s4.properties
+	shift
+  	subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.core.Main $@
+;;
+"adapterNode")
+# example:
+# ./s4 adapterNode -s4Properties=<s4-dir>/test-apps/twitter-adapter/src/main/resources/s4.properties
+	shift
+  	subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.core.adapter.AdapterMain $@
+;;
+
+
+
+esac
+

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index a5655c4..33827ba 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -17,8 +17,13 @@ include 's4-base'
 include 's4-core'
 include 's4-comm'
 include 's4-example'
+include 's4-tools'
+//include 's4-example'
+//include ':test-apps:simple-adapter-1'
 include ':test-apps:simple-deployable-app-1'
 include ':test-apps:simple-deployable-app-2'
+include ':test-apps:s4-showtime'
+include ':test-apps:s4-counter'
 
 rootProject.name = 's4'
 rootProject.children.each {project ->

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
index e0a9456..7d7913d 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
@@ -38,7 +38,7 @@ public class CommTestUtils {
 
     private static final Logger logger = LoggerFactory.getLogger(CommTestUtils.class);
 
-    public static final int ZK_PORT = 21810;
+    public static final int ZK_PORT = 2181;
     public static final int INITIAL_BOOKIE_PORT = 5000;
     public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp");
     public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 2575013..dd46439 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -18,6 +18,7 @@ package org.apache.s4.core;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.s4.base.Event;
@@ -25,6 +26,7 @@ import org.apache.s4.base.KeyFinder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Inject;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index 71756a4..c18603c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -29,16 +29,19 @@ public class Main {
      * @param args
      */
     public static void main(String[] args) {
-
-        if (args.length == 0) {
-            logger.info("Starting S4 node with default configuration");
-            startDefaultS4Node();
-        } else if (args.length == 1) {
-            logger.info("Starting S4 node with custom configuration from file {}", args[0]);
-            startCustomS4Node(args[0]);
-        } else {
-            logger.info("Starting S4 node in development mode");
-            startDevelopmentMode(args);
+        try {
+            if (args.length == 0) {
+                logger.info("Starting S4 node with default configuration");
+                startDefaultS4Node();
+            } else if (args.length == 1) {
+                logger.info("Starting S4 node with custom configuration from file {}", args[0]);
+                startCustomS4Node(args[0]);
+            } else {
+                logger.info("Starting S4 node in development mode");
+                startDevelopmentMode(args);
+            }
+        } catch (Exception e) {
+            logger.error("Cannot start S4 node", e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
index a1c496c..f027f06 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
@@ -46,13 +46,7 @@ public class AdapterMain {
     @Parameters(separators = "=")
     static class AdapterArgs {
 
-        @Parameter(names = "-moduleClass", description = "module class name")
-        String moduleClass;
-
-        @Parameter(names = "-adapterClass", description = "adapter class name")
-        String adapterClass;
-
-        @Parameter(names = "-s4Properties", description = "s4 properties file path")
+        @Parameter(names = "-s4Properties", description = "s4 properties file path", required = true)
         String s4PropertiesFilePath;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
deleted file mode 100644
index d8b77a4..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
+++ /dev/null
@@ -1,151 +0,0 @@
-package org.apache.s4.deploy.util;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import junit.framework.Assert;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.s4.comm.topology.ZNRecord;
-import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.deploy.DistributedDeploymentManager;
-import org.apache.zookeeper.CreateMode;
-import org.slf4j.LoggerFactory;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-
-public class DeployApp {
-
-    private static File tmpAppsDir;
-
-    /**
-     * @param args
-     */
-    public static void main(String[] args) {
-
-        DeployAppArgs appArgs = new DeployAppArgs();
-        JCommander jc = new JCommander(appArgs);
-        // configure log4j for Zookeeper
-        BasicConfigurator.configure();
-        Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
-        Logger.getLogger("org.I0Itec").setLevel(Level.ERROR);
-
-        try {
-            jc.parse(args);
-        } catch (Exception e) {
-            jc.usage();
-            System.exit(-1);
-        }
-        try {
-            ZkClient zkClient = new ZkClient(appArgs.zkConnectionString);
-            zkClient.setZkSerializer(new ZNRecordSerializer());
-
-            tmpAppsDir = Files.createTempDir();
-
-            // File gradlewFile = CoreTestUtils.findGradlewInRootDir();
-
-            // CoreTestUtils.callGradleTask(gradlewFile, new File(appArgs.gradleBuildFilePath), "installS4R",
-            // new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
-            ExecGradle.exec(appArgs.gradleExecPath, appArgs.gradleBuildFilePath, "installS4R",
-                    new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
-
-            File s4rToDeploy = File.createTempFile("testapp" + System.currentTimeMillis(), "s4r");
-
-            Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath() + "/"
-                    + appArgs.appName + ".s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
-
-            final String uri = s4rToDeploy.toURI().toString();
-            ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
-            record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
-            zkClient.create("/" + appArgs.clusterName + "/apps/" + appArgs.appName, record, CreateMode.PERSISTENT);
-
-        } catch (Exception e) {
-            LoggerFactory.getLogger(DeployApp.class).error("Cannot deploy app", e);
-        }
-
-    }
-
-    @Parameters(separators = "=")
-    static class DeployAppArgs {
-
-        @Parameter(names = "-gradle", description = "path to gradle/gradlew executable", required = true)
-        String gradleExecPath;
-
-        @Parameter(names = "-buildFile", description = "path to gradle build file for the S4 application", required = true)
-        String gradleBuildFilePath;
-
-        @Parameter(names = "-appName", description = "name of S4 application", required = true)
-        String appName;
-
-        @Parameter(names = "-cluster", description = "logical name of the S4 cluster", required = true)
-        String clusterName;
-
-        @Parameter(names = "-zk", description = "zookeeper connection string")
-        String zkConnectionString = "localhost:2181";
-
-    }
-
-    static class ExecGradle {
-
-        public static void exec(String gradlewExecPath, String buildFilePath, String taskName, String[] params)
-                throws Exception {
-            List<String> cmdList = new ArrayList<String>();
-            cmdList.add(gradlewExecPath);
-            // cmdList.add("-c");
-            // cmdList.add(gradlewFile.getParentFile().getAbsolutePath() + "/settings.gradle");
-            cmdList.add("-b");
-            cmdList.add(buildFilePath);
-            cmdList.add(taskName);
-            if (params.length > 0) {
-                for (int i = 0; i < params.length; i++) {
-                    cmdList.add("-P" + params[i]);
-                }
-            }
-
-            System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
-            ProcessBuilder pb = new ProcessBuilder(cmdList);
-
-            pb.directory(new File(buildFilePath).getParentFile());
-            pb.redirectErrorStream();
-
-            final Process process = pb.start();
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
-                    String line;
-                    try {
-                        line = br.readLine();
-                        while (line != null) {
-                            System.out.println(line);
-                            line = br.readLine();
-                        }
-                    } catch (IOException e) {
-                        throw new RuntimeException(e);
-                    }
-                }
-            }).start();
-            process.waitFor();
-
-            // try {
-            // int exitValue = process.exitValue();
-            // Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
-            // } catch (IllegalThreadStateException ignored) {
-            // }
-
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
index 9f66e0d..670c375 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
@@ -35,15 +35,4 @@ public class FluentApp extends App {
         appMaker.close();
     }
 
-    public void start() {
-        super.start();
-    }
-
-    public void init() {
-        super.init();
-    }
-
-    public void close() {
-        super.close();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
deleted file mode 100644
index b79f1a0..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package org.apache.s4.fixtures;
-
-import java.util.Arrays;
-
-import org.apache.s4.comm.tools.TaskSetup;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-
-public class ZKServer {
-
-    private static Logger logger = LoggerFactory.getLogger(ZKServer.class);
-
-    /**
-     * @param args
-     */
-    public static void main(String[] args) {
-        ZKServerArgs clusterArgs = new ZKServerArgs();
-        JCommander jc = new JCommander(clusterArgs);
-        try {
-            jc.parse(args);
-        } catch (Exception e) {
-            System.out.println(Arrays.toString(args));
-            e.printStackTrace();
-            jc.usage();
-            System.exit(-1);
-        }
-        try {
-
-            logger.info("Starting zookeeper server for cluster [{}] with [{}] node(s)", clusterArgs.clusterName,
-                    clusterArgs.nbTasks);
-            if (clusterArgs.startZK) {
-                CommTestUtils.startZookeeperServer();
-            }
-            TaskSetup taskSetup = new TaskSetup(clusterArgs.zkConnectionString);
-            taskSetup.clean(clusterArgs.clusterName);
-            taskSetup.setup(clusterArgs.clusterName, clusterArgs.nbTasks, clusterArgs.firstListeningPort);
-            logger.info("Zookeeper started");
-        } catch (Exception e) {
-            logger.error("Cannot initialize zookeeper with specified configuration", e);
-        }
-
-    }
-
-    @Parameters(separators = "=", commandDescription = "Start Zookeeper server and initialize S4 cluster configuration in Zookeeper (and clean previous one with same cluster name)")
-    static class ZKServerArgs {
-
-        @Parameter(names = "-cluster", description = "S4 cluster name", required = true)
-        String clusterName = "s4-test-cluster";
-
-        @Parameter(names = "-nbTasks", description = "number of tasks for the cluster", required = true)
-        int nbTasks = 1;
-
-        @Parameter(names = "-zk", description = "Zookeeper connection string")
-        String zkConnectionString = "localhost:21810";
-
-        @Parameter(names = "-startZK", description = "Start local zookeeper server (connection string ignored in that case)", required = false)
-        boolean startZK = false;
-
-        @Parameter(names = "-firstListeningPort", description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
-        int firstListeningPort = -1;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
index 820dbe5..dffe6cb 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
@@ -1,6 +1,5 @@
 package org.apache.s4.wordcount;
 
-
 import java.io.File;
 import java.io.IOException;
 import java.util.Map.Entry;
@@ -18,25 +17,25 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 
-
 public class WordClassifierPE extends ProcessingElement implements Watcher {
 
     TreeMap<String, Integer> counts = new TreeMap<String, Integer>();
     private int counter;
     transient private ZooKeeper zk;
 
-    private WordClassifierPE () {}
+    private WordClassifierPE() {
+    }
 
     public WordClassifierPE(App app) {
         super(app);
     }
-    
+
     public void onEvent(WordCountEvent event) {
         try {
             WordCountEvent wcEvent = event;
             if (zk == null) {
                 try {
-                    zk = new ZooKeeper("localhost:21810", 4000, this);
+                    zk = new ZooKeeper("localhost:2181", 4000, this);
                 } catch (IOException e) {
                     throw new RuntimeException(e);
                 }
@@ -73,8 +72,8 @@ public class WordClassifierPE extends ProcessingElement implements Watcher {
                 // zookeeper
                 zk.create("/classifierIteration_" + counter, new byte[counter], Ids.OPEN_ACL_UNSAFE,
                         CreateMode.PERSISTENT);
-                Logger.getLogger("s4-ft").debug("wrote classifier iteration ["+counter+"]");
-                System.out.println("wrote classifier iteration ["+counter+"]");
+                Logger.getLogger("s4-ft").debug("wrote classifier iteration [" + counter + "]");
+                System.out.println("wrote classifier iteration [" + counter + "]");
                 // check if we are allowed to continue
                 if (null == zk.exists("/continue_" + counter, null)) {
                     CountDownLatch latch = new CountDownLatch(1);
@@ -96,19 +95,19 @@ public class WordClassifierPE extends ProcessingElement implements Watcher {
     @Override
     public void process(WatchedEvent event) {
         // TODO Auto-generated method stub
- 
+
     }
 
     @Override
     protected void onCreate() {
         // TODO Auto-generated method stub
-        
+
     }
 
     @Override
     protected void onRemove() {
         // TODO Auto-generated method stub
-        
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/test/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/default.s4.properties b/subprojects/s4-core/src/test/resources/default.s4.properties
index 62fc7d5..2235032 100644
--- a/subprojects/s4-core/src/test/resources/default.s4.properties
+++ b/subprojects/s4-core/src/test/resources/default.s4.properties
@@ -4,6 +4,13 @@ cluster.hosts = localhost
 cluster.ports = 5077
 cluster.lock_dir = {user.dir}/tmp
 cluster.name = s4-test-cluster
-cluster.zk_address = localhost:21810
+cluster.zk_address = localhost:2181
 cluster.zk_session_timeout = 10000
 cluster.zk_connection_timeout = 10000
+s4.logger_level = DEBUG
+comm.module = org.apache.s4.core.CustomModule
+appsDir=/tmp/deploy-test
+tcp.partition.queue_size=1000
+comm.timeout=100
+comm.retry_delay=100
+comm.retries=10

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
index 9a5d11a..3a566f2 100644
--- a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
+++ b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
@@ -3,9 +3,9 @@ comm.queue_listener_size = 8000
 cluster.hosts = localhost
 cluster.ports = 5077
 cluster.name = s4-test-cluster
-cluster.zk_address = localhost:21810
+cluster.zk_address = localhost:2181
 cluster.zk_session_timeout = 10000
 cluster.zk_connection_timeout = 10000
 comm.module = org.apache.s4.core.adapter.AdapterModule
-s4.logger_level = DEBUG
+s4.logger_level = TRACE
 appsDir=/tmp/deploy-test

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/s4-tools.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/s4-tools.gradle b/subprojects/s4-tools/s4-tools.gradle
new file mode 100644
index 0000000..68c728e
--- /dev/null
+++ b/subprojects/s4-tools/s4-tools.gradle
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2010 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+description = 'The S4 core platform.'
+
+apply plugin: 'java'
+
+task "create-dirs" << {
+   sourceSets.all*.java.srcDirs*.each { it.mkdirs() }
+   sourceSets.all*.resources.srcDirs*.each { it.mkdirs() }
+}
+
+dependencies {
+    compile project(":s4-base")
+    compile project(":s4-comm")
+    compile project(":s4-core")
+    compile libraries.jcommander
+    compile libraries.zkclient
+    compile libraries.commons_io
+}
+
+apply plugin:'application'
+mainClassName = "org.apache.s4.tools.Tools"
+
+run {
+    // run doesn't yet directly accept command line parameters...
+    if ( project.hasProperty('args') ) {
+        args project.args.split('\\s+')
+        print args
+    }
+ }
+
+test {
+    forkEvery=1
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
new file mode 100644
index 0000000..4fc09f0
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
@@ -0,0 +1,65 @@
+package org.apache.s4.tools;
+
+import java.util.Arrays;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Level;
+import org.apache.s4.comm.tools.TaskSetup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class DefineCluster {
+
+    static Logger logger = LoggerFactory.getLogger(DefineCluster.class);
+
+    public static void main(String[] args) {
+        // configure log4j for Zookeeper
+        BasicConfigurator.configure();
+        org.apache.log4j.Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
+        org.apache.log4j.Logger.getLogger("org.I0Itec").setLevel(Level.ERROR);
+
+        ZKServerArgs clusterArgs = new ZKServerArgs();
+        JCommander jc = new JCommander(clusterArgs);
+        try {
+            jc.parse(args);
+        } catch (Exception e) {
+            System.out.println(Arrays.toString(args));
+            e.printStackTrace();
+            jc.usage();
+            System.exit(-1);
+        }
+        try {
+
+            logger.info("preparing new cluster [{}] with [{}] node(s)", clusterArgs.clusterName, clusterArgs.nbTasks);
+
+            TaskSetup taskSetup = new TaskSetup(clusterArgs.zkConnectionString);
+            taskSetup.clean(clusterArgs.clusterName);
+            taskSetup.setup(clusterArgs.clusterName, clusterArgs.nbTasks, clusterArgs.firstListeningPort);
+            logger.info("New cluster configuration uploaded into zookeeper");
+        } catch (Exception e) {
+            logger.error("Cannot initialize zookeeper with specified configuration", e);
+        }
+
+    }
+
+    @Parameters(separators = "=", commandDescription = "Setup new S4 logical cluster")
+    static class ZKServerArgs {
+
+        @Parameter(names = "-name", description = "S4 cluster name", required = true)
+        String clusterName = "s4-test-cluster";
+
+        @Parameter(names = "-nbTasks", description = "number of tasks for the cluster", required = true)
+        int nbTasks = 1;
+
+        @Parameter(names = "-zk", description = "Zookeeper connection string")
+        String zkConnectionString = "localhost:2181";
+
+        @Parameter(names = "-firstListeningPort", description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
+        int firstListeningPort = -1;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
new file mode 100644
index 0000000..1b2eb7d
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
@@ -0,0 +1,162 @@
+package org.apache.s4.tools;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+
+public class Deploy {
+
+    private static File tmpAppsDir;
+    static org.slf4j.Logger logger = LoggerFactory.getLogger(Deploy.class);
+
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+
+        DeployAppArgs appArgs = new DeployAppArgs();
+        JCommander jc = new JCommander(appArgs);
+        // configure log4j for Zookeeper
+        BasicConfigurator.configure();
+        Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
+        Logger.getLogger("org.I0Itec").setLevel(Level.ERROR);
+
+        try {
+            jc.parse(args);
+        } catch (Exception e) {
+            e.printStackTrace();
+            jc.usage();
+            System.exit(-1);
+        }
+        try {
+            ZkClient zkClient = new ZkClient(appArgs.zkConnectionString, appArgs.timeout);
+            zkClient.setZkSerializer(new ZNRecordSerializer());
+
+            tmpAppsDir = Files.createTempDir();
+
+            File s4rToDeploy = File.createTempFile("testapp" + System.currentTimeMillis(), "s4r");
+
+            String generatedS4RPath = null;
+
+            ExecGradle.exec(appArgs.gradleExecPath, appArgs.gradleBuildFilePath, "installS4R",
+                    new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
+            generatedS4RPath = tmpAppsDir.getAbsolutePath() + "/" + appArgs.appName + ".s4r";
+
+            Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(generatedS4RPath)),
+                    Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
+
+            final String uri = s4rToDeploy.toURI().toString();
+            ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
+            record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
+            zkClient.create("/" + appArgs.clusterName + "/apps/" + appArgs.appName, record, CreateMode.PERSISTENT);
+            logger.info("uploaded application [{}] to cluster [{}], using zookeeper znode [{}]", new String[] {
+                    appArgs.appName, appArgs.clusterName, "/" + appArgs.clusterName + "/apps/" + appArgs.appName });
+
+        } catch (Exception e) {
+            LoggerFactory.getLogger(Deploy.class).error("Cannot deploy app", e);
+        }
+
+    }
+
+    @Parameters(separators = "=")
+    static class DeployAppArgs {
+
+        @Parameter(names = "-gradle", description = "path to gradle/gradlew executable", required = true)
+        String gradleExecPath;
+
+        @Parameter(names = "-buildFile", description = "path to gradle build file for the S4 application", required = true)
+        String gradleBuildFilePath;
+
+        @Parameter(names = "-appName", description = "name of S4 application", required = true)
+        String appName;
+
+        @Parameter(names = "-cluster", description = "logical name of the S4 cluster", required = true)
+        String clusterName;
+
+        @Parameter(names = "-zk", description = "zookeeper connection string")
+        String zkConnectionString = "localhost:2181";
+
+        @Parameter(names = "-timeout", description = "connection timeout to Zookeeper, in ms")
+        int timeout = 10000;
+
+    }
+
+    static class ExecGradle {
+
+        public static void exec(String gradlewExecPath, String buildFilePath, String taskName, String[] params)
+                throws Exception {
+            // Thread.sleep(10000);
+            List<String> cmdList = new ArrayList<String>();
+            // cmdList.add("sleep");
+            // cmdList.add("2");
+            // cmdList.add(";");
+            cmdList.add(gradlewExecPath);
+            // cmdList.add("-c");
+            // cmdList.add(gradlewFile.getParentFile().getAbsolutePath() + "/settings.gradle");
+            cmdList.add("-b");
+            cmdList.add(buildFilePath);
+            cmdList.add(taskName);
+            cmdList.add("-stacktrace");
+            if (params.length > 0) {
+                for (int i = 0; i < params.length; i++) {
+                    cmdList.add("-P" + params[i]);
+                }
+            }
+            System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
+            ProcessBuilder pb = new ProcessBuilder(cmdList);
+
+            pb.directory(new File(buildFilePath).getParentFile());
+            pb.redirectErrorStream();
+
+            final Process process = pb.start();
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
+                    String line;
+                    try {
+                        line = br.readLine();
+                        while (line != null) {
+                            System.out.println(line);
+                            line = br.readLine();
+                        }
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }).start();
+
+            process.waitFor();
+
+            // try {
+            // int exitValue = process.exitValue();
+            // Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
+            // } catch (IllegalThreadStateException ignored) {
+            // }
+
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
new file mode 100644
index 0000000..ae53cb6
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -0,0 +1,22 @@
+package org.apache.s4.tools;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+
+public class Tools {
+
+    public static void main(String[] args) {
+        try {
+            Class<?> toolClass = Class.forName(args[0]);
+            Method main = toolClass.getMethod("main", String[].class);
+            if (args.length > 1) {
+                main.invoke(null, new Object[] { Arrays.copyOfRange(args, 1, args.length) });
+            } else {
+                main.invoke(null, new Object[] { new String[0] });
+            }
+        } catch (Exception e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
new file mode 100644
index 0000000..cb4c85c
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
@@ -0,0 +1,75 @@
+package org.apache.s4.tools;
+
+import java.io.File;
+import java.util.Arrays;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class ZKServer {
+
+    static Logger logger = LoggerFactory.getLogger(ZKServer.class);
+
+    public static void main(String[] args) {
+        ZKServerArgs zkArgs = new ZKServerArgs();
+        JCommander jc = new JCommander(zkArgs);
+        try {
+            jc.parse(args);
+        } catch (Exception e) {
+            System.out.println(Arrays.toString(args));
+            e.printStackTrace();
+            jc.usage();
+            System.exit(-1);
+        }
+        try {
+
+            logger.info("Starting zookeeper server on port [{}]", zkArgs.zkPort);
+
+            if (zkArgs.clean) {
+                logger.info("cleaning existing data in [{}] and [{}]", zkArgs.dataDir, zkArgs.logDir);
+                FileUtils.deleteDirectory(new File(zkArgs.dataDir));
+                FileUtils.deleteDirectory(new File(zkArgs.logDir));
+            }
+            IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+
+                @Override
+                public void createDefaultNameSpace(ZkClient zkClient) {
+
+                }
+            };
+
+            ZkServer zkServer = new ZkServer(zkArgs.dataDir, zkArgs.logDir, defaultNameSpace);
+            zkServer.start();
+        } catch (Exception e) {
+            logger.error("Cannot initialize zookeeper with specified configuration", e);
+        }
+    }
+
+    @Parameters(separators = "=", commandDescription = "Start Zookeeper server")
+    static class ZKServerArgs {
+
+        @Parameter(names = "-port", description = "Zookeeper port")
+        String zkPort = "2181";
+
+        @Parameter(names = "-dataDir", description = "data directory", required = false)
+        String dataDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
+                + "zookeeper" + File.separator + "data").getAbsolutePath();
+
+        @Parameter(names = "-clean", description = "clean zookeeper data (arity=0) (make sure you specify correct directories...)")
+        boolean clean = true;
+
+        @Parameter(names = "-logDir", description = "log directory")
+        String logDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
+                + "zookeeper" + File.separator + "log").getAbsolutePath();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
----------------------------------------------------------------------
diff --git a/test-apps/s4-counter/src/main/java/s4app/ClockApp.java b/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
index 316b876..cab4eab 100644
--- a/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
+++ b/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
@@ -12,44 +12,27 @@ public class ClockApp extends App {
     private ClockPE clockPE;
 
     @Override
-    protected void start() {
+    protected void onStart() {
         System.out.println("Starting CounterApp...");
         clockPE.getInstanceForKey("single");
     }
 
     // generic array due to varargs generates a warning.
     @Override
-    protected void init() {
+    protected void onInit() {
         System.out.println("Initing CounterApp...");
 
         clockPE = new ClockPE(this);
         clockPE.setTimerInterval(1, TimeUnit.SECONDS);
 
-        eventSource = new EventSource(this, "I can give you the time!");
+        eventSource = new EventSource(this, "clockStream");
         clockPE.setStreams((Streamable) eventSource);
     }
 
     @Override
-    protected void close() {
+    protected void onClose() {
         System.out.println("Closing CounterApp...");
         eventSource.close();
     }
 
-    @Override
-    protected void onStart() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void onInit() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void onClose() {
-        // TODO Auto-generated method stub
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
----------------------------------------------------------------------
diff --git a/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java b/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
index 1c92c57..6652f59 100644
--- a/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
+++ b/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
@@ -7,41 +7,23 @@ public class ShowTimeApp extends App {
     private ShowPE showPE;
 
     @Override
-    protected void start() {
+    protected void onStart() {
         System.out.println("Starting ShowTimeApp...");
         showPE.getInstanceForKey("single");
     }
 
     @Override
-    protected void init() {
+    protected void onInit() {
         System.out.println("Initing ShowTimeApp...");
 
         showPE = new ShowPE(this);
 
         /* This stream will receive events from another app. */
-        createStream("I need the time.", showPE);
-    }
-
-    @Override
-    protected void close() {
-        System.out.println("Closing ShowTimeApp...");
-    }
-
-    @Override
-    protected void onStart() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void onInit() {
-        // TODO Auto-generated method stub
-
+        createStream("clockStream", showPE);
     }
 
     @Override
     protected void onClose() {
-        // TODO Auto-generated method stub
-
+        System.out.println("Closing ShowTimeApp...");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
index 809cebd..8cb5843 100644
--- a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
+++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
@@ -41,7 +41,7 @@ public class SimplePE extends ProcessingElement implements Watcher {
     protected void onCreate() {
         if (zk == null) {
             try {
-                zk = new ZooKeeper("localhost:" + 21810, 4000, this);
+                zk = new ZooKeeper("localhost:" + 2181, 4000, this);
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
index 25b98a6..bbfaf17 100644
--- a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
+++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
@@ -39,7 +39,7 @@ public class TestApp extends App {
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }
-            zkClient = new ZkClient("localhost:" + 21810);
+            zkClient = new ZkClient("localhost:" + 2181);
             if (!zkClient.exists("/s4-test")) {
                 zkClient.create("/s4-test", null, CreateMode.PERSISTENT);
             }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
index 0657ee9..8eb345b 100644
--- a/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
+++ b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
@@ -17,7 +17,7 @@ public class TestApp extends App {
     @Override
     protected void onInit() {
         try {
-            zkClient = new ZkClient("localhost:" + 21810);
+            zkClient = new ZkClient("localhost:" + 2181);
             if (!zkClient.exists("/s4-test")) {
                 zkClient.create("/s4-test", null, CreateMode.PERSISTENT);
             }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/README.txt
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/README.txt b/test-apps/twitter-adapter/README.txt
new file mode 100644
index 0000000..f9bda45
--- /dev/null
+++ b/test-apps/twitter-adapter/README.txt
@@ -0,0 +1 @@
+Please refer to README.txt in twitter-counter application
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/build.gradle b/test-apps/twitter-adapter/build.gradle
index 669d62b..100837d 100644
--- a/test-apps/twitter-adapter/build.gradle
+++ b/test-apps/twitter-adapter/build.gradle
@@ -49,6 +49,9 @@ group = 'org.apache.s4'
 
 apply plugin: 'java'
 apply plugin: 'eclipse'
+apply plugin:'application'
+
+mainClassName = "org.apache.s4.core.Main"
 
 /* The app classname is set automatically from the source files. */
 def appClassname = ''

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
index ee905d9..102ca10 100644
--- a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
+++ b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
@@ -1,6 +1,9 @@
 package org.apache.s4.example.twitter;
 
+import java.io.File;
+import java.io.FileInputStream;
 import java.net.ServerSocket;
+import java.util.Properties;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.I0Itec.zkclient.ZkClient;
@@ -15,6 +18,7 @@ import twitter4j.StatusDeletionNotice;
 import twitter4j.StatusListener;
 import twitter4j.TwitterStream;
 import twitter4j.TwitterStreamFactory;
+import twitter4j.conf.ConfigurationBuilder;
 
 public class TwitterInputAdapter extends Adapter {
 
@@ -49,7 +53,20 @@ public class TwitterInputAdapter extends Adapter {
 
     public void connectAndRead() throws Exception {
 
-        TwitterStream twitterStream = TwitterStreamFactory.getSingleton();
+        ConfigurationBuilder cb = new ConfigurationBuilder();
+        Properties twitterProperties = new Properties();
+        File twitter4jPropsFile = new File(System.getProperty("user.home") + "/twitter4j.properties");
+        if (!twitter4jPropsFile.exists()) {
+            logger.error(
+                    "Cannot find twitter4j.properties file in this location :[{}]. Make sure it is available at this place and includes user/password credentials",
+                    twitter4jPropsFile.getAbsolutePath());
+            return;
+        }
+        twitterProperties.load(new FileInputStream(twitter4jPropsFile));
+
+        cb.setDebugEnabled(Boolean.valueOf(twitterProperties.getProperty("debug")))
+                .setUser(twitterProperties.getProperty("user")).setPassword(twitterProperties.getProperty("password"));
+        TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
         StatusListener statusListener = new StatusListener() {
 
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/src/main/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/src/main/resources/default.s4.properties b/test-apps/twitter-adapter/src/main/resources/default.s4.properties
deleted file mode 100644
index cd36aaa..0000000
--- a/test-apps/twitter-adapter/src/main/resources/default.s4.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-comm.queue_emmiter_size = 8000
-comm.queue_listener_size = 8000
-cluster.hosts = localhost
-cluster.ports = 5077
-cluster.name = s4-adapter-cluster
-cluster.zk_address = localhost:21810
-cluster.zk_session_timeout = 10000
-cluster.zk_connection_timeout = 10000
-comm.module = org.apache.s4.deploy.TestModule
-s4.logger_level = TRACE
-appsDir=/tmp/deploy-test
-tcp.partition.queue_size=1000
-comm.timeout=100
-comm.retry_delay=100
-comm.retries=10
-
-# specify the name of the remote cluster (there is currently only 1 remote cluster max)
-cluster.remote.name=s4-test-cluster

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/src/main/resources/s4.properties
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/src/main/resources/s4.properties b/test-apps/twitter-adapter/src/main/resources/s4.properties
new file mode 100644
index 0000000..fdd1bf6
--- /dev/null
+++ b/test-apps/twitter-adapter/src/main/resources/s4.properties
@@ -0,0 +1,18 @@
+comm.queue_emmiter_size = 8000
+comm.queue_listener_size = 8000
+cluster.hosts = localhost
+cluster.ports = 5077
+cluster.name = s4-adapter-cluster
+cluster.zk_address = localhost:2181
+cluster.zk_session_timeout = 10000
+cluster.zk_connection_timeout = 10000
+comm.module = org.apache.s4.core.adapter.AdapterModule
+s4.logger_level = DEBUG
+appsDir=/tmp/deploy-test
+tcp.partition.queue_size=1000
+comm.timeout=100
+comm.retry_delay=100
+comm.retries=10
+
+# specify the name of the remote cluster (there is currently only 1 remote cluster max)
+cluster.remote.name=s4-test-cluster

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/src/main/resources/twitter4j.properties b/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
deleted file mode 100644
index 7d58c7d..0000000
--- a/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
+++ /dev/null
@@ -1,5 +0,0 @@
-debug=true
-# you need to set those parameters with valid twitter account credentials
-twitter4j.user=????
-twitter4j.password=????
-

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/README.txt
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/README.txt b/test-apps/twitter-counter/README.txt
new file mode 100644
index 0000000..d46b491
--- /dev/null
+++ b/test-apps/twitter-counter/README.txt
@@ -0,0 +1,33 @@
+An application that displays the current top 10 topics, as gathered from the twitter sample stream.
+It was ported and adapted from S4 0.3
+
+Architecture:
+- twitter-adapter app in adapter node connects to the twitter stream, extracts the twitted text and passes that to the application cluster
+- twitter-counter app in the application cluster receives the text of the tweets, extracts the topics, counts topic occurences and periodically displays the top 10 topics on the console
+
+How to configure:
+- you need a twitter4j.properties file in your home dir, with the following properties filled:
+debug=true|false
+user=<a twitter user name>
+password=<the matching password>
+
+How to run:
+0/ make sure tools are compiled by running ./gradlew s4-tools:installApp
+
+1/ start zookeeper server
+./s4 zkServer
+
+2/ create adapter cluster configuration
+./s4 newCluster -name=s4-test-cluster -firstListeningPort=10000 -nbTasks=1
+
+3/ create application cluster configuration
+./s4 newCluster -name=s4-adapter-cluster -firstListeningPort=11000 -nbTasks=<number of nodes>
+NOTE: - the name of the downstream cluster is currently hardcoded in <s4-dir>/test-apps/twitter-adapter/src/main/resources/s4.properties Make sure you use the same name
+
+4/ start application nodes (as many as defined in the cluster configuration, more for failover capabilities)
+./s4 appNode <s4-dir>/subprojects/s4-core/src/test/resources/default.s4.properties
+
+5/ start adapter node
+./s4 adapterNode -s4Properties=<s4-dir>/test-apps/twitter-adapter/src/main/resources/s4.properties
+
+6/ observe the topic count output (on 1 of the application nodes)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/build.gradle b/test-apps/twitter-counter/build.gradle
index e737b40..e10eae0 100644
--- a/test-apps/twitter-counter/build.gradle
+++ b/test-apps/twitter-counter/build.gradle
@@ -49,6 +49,7 @@ group = 'org.apache.s4'
 
 apply plugin: 'java'
 apply plugin: 'eclipse'
+apply plugin:'application'
 
 /* The app classname is set automatically from the source files. */
 def appClassname = ''

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
index 3d9a9fb..6fd68b5 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
@@ -4,6 +4,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.TreeSet;
 
+import org.apache.s4.base.Event;
 import org.apache.s4.core.App;
 import org.apache.s4.core.ProcessingElement;
 import org.slf4j.Logger;
@@ -14,16 +15,16 @@ import com.google.common.collect.Sets;
 
 public class TopNTopicPE extends ProcessingElement {
 
+    static Logger logger = LoggerFactory.getLogger(TopNTopicPE.class);
+    Map<String, Integer> countedTopics = Maps.newHashMap();
+
     public TopNTopicPE(App app) {
         super(app);
-        // TODO Auto-generated constructor stub
+        logger.info("key: [{}]", getId());
     }
 
-    Map<String, Integer> countedTopics = Maps.newHashMap();
-    static Logger logger = LoggerFactory.getLogger(TopNTopicPE.class);
-
-    public void onEvent(TopicSeenEvent event) {
-        countedTopics.put(event.topic, event.count);
+    public void onEvent(Event event) {
+        countedTopics.put(event.get("topic"), event.get("count", Integer.class));
     }
 
     public void onTime() {
@@ -32,6 +33,8 @@ public class TopNTopicPE extends ProcessingElement {
             sortedTopics.add(new TopNEntry(topicCount.getKey(), topicCount.getValue()));
         }
 
+        logger.info("\n------------------");
+
         int i = 0;
         Iterator<TopNEntry> iterator = sortedTopics.iterator();
         long time = System.currentTimeMillis();
@@ -39,6 +42,7 @@ public class TopNTopicPE extends ProcessingElement {
             TopNEntry entry = iterator.next();
             logger.info("{} : topic [{}] count [{}]",
                     new String[] { String.valueOf(time), entry.topic, String.valueOf(entry.count) });
+            i++;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
index a6d1478..5a41231 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
@@ -1,27 +1,36 @@
 package org.apache.s4.example.twitter;
 
+import org.apache.s4.base.Event;
 import org.apache.s4.core.App;
 import org.apache.s4.core.ProcessingElement;
 import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 // keyed by topic name
 public class TopicCountAndReportPE extends ProcessingElement {
 
-    Stream<TopicSeenEvent> downStream;
+    Stream<Event> downStream;
     int threshold = 10;
     int count;
+    boolean firstEvent = true;
+
+    static Logger logger = LoggerFactory.getLogger(TopicCountAndReportPE.class);
 
     public TopicCountAndReportPE(App app) {
         super(app);
-        // TODO Auto-generated constructor stub
     }
 
-    public void setDownstream(Stream<TopicSeenEvent> stream) {
+    public void setDownstream(Stream<Event> stream) {
         this.downStream = stream;
     }
 
-    public void onEvent(TopicSeenEvent event) {
-        count += event.count;
+    public void onEvent(Event event) {
+        if (firstEvent) {
+            logger.info("Handling new topic [{}]", getId());
+            firstEvent = false;
+        }
+        count += event.get("count", Integer.class);
     }
 
     @Override
@@ -34,14 +43,15 @@ public class TopicCountAndReportPE extends ProcessingElement {
         if (count < threshold) {
             return;
         }
-        TopicSeenEvent topicSeenEvent = new TopicSeenEvent(getId(), count);
+        Event topicSeenEvent = new Event();
+        topicSeenEvent.put("topic", String.class, getId());
+        topicSeenEvent.put("count", Integer.class, count);
+        topicSeenEvent.put("aggregationKey", String.class, "aggregationValue");
         downStream.put(topicSeenEvent);
     }
 
     @Override
     protected void onRemove() {
-        // TODO Auto-generated method stub
-
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
index 501d6fd..9936ed0 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
@@ -6,13 +6,16 @@ import org.apache.s4.base.Event;
 import org.apache.s4.core.App;
 import org.apache.s4.core.ProcessingElement;
 import org.apache.s4.core.Streamable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Splitter;
 
 public class TopicExtractorPE extends ProcessingElement {
 
     static private ServerSocket serverSocket;
-    Streamable<TopicSeenEvent> downStream;
+    Streamable<Event> downStream;
+    static Logger logger = LoggerFactory.getLogger(TopicExtractorPE.class);
 
     public TopicExtractorPE(App app) {
         super(app);
@@ -24,18 +27,22 @@ public class TopicExtractorPE extends ProcessingElement {
 
     }
 
-    public void setDownStream(Streamable<TopicSeenEvent> stream) {
+    public void setDownStream(Streamable<Event> stream) {
         this.downStream = stream;
     }
 
     public void onEvent(Event event) {
         String text = event.get("statusText", String.class);
+        logger.trace("event text [{}]", text);
         if (text.contains("#")) {
             Iterable<String> split = Splitter.on("#").omitEmptyStrings().trimResults()
                     .split(text.substring(text.indexOf("#") + 1, text.length()));
             for (String topic : split) {
                 String topicOnly = topic.split(" ")[0];
-                downStream.put(new TopicSeenEvent(topicOnly, 1));
+                Event event2 = new Event();
+                event2.put("topic", String.class, topicOnly);
+                event2.put("count", Integer.class, 1);
+                downStream.put(event2);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
deleted file mode 100644
index b28d61e..0000000
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.s4.example.twitter;
-
-import org.apache.s4.base.Event;
-
-public class TopicSeenEvent extends Event {
-
-    public String topic;
-    public int count;
-    public String reportKey = "x";
-
-    public TopicSeenEvent(String topic, int count) {
-        super();
-        this.topic = topic;
-        this.count = count;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
index cf0fb40..f6edd8c 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
@@ -29,13 +29,13 @@ public class TwitterCounterApp extends App {
             TopNTopicPE topNTopicPE = createPE(TopNTopicPE.class);
             topNTopicPE.setTimerInterval(10, TimeUnit.SECONDS);
             @SuppressWarnings("unchecked")
-            Stream<TopicSeenEvent> aggregatedTopicStream = createStream("AggregatedTopicSeen", new KeyFinder() {
+            Stream<Event> aggregatedTopicStream = createStream("AggregatedTopicSeen", new KeyFinder() {
 
                 @Override
                 public List<String> get(Event arg0) {
                     return new ArrayList<String>() {
                         {
-                            add("x");
+                            add("aggregationKey");
                         }
                     };
                 }
@@ -44,13 +44,13 @@ public class TwitterCounterApp extends App {
             TopicCountAndReportPE topicCountAndReportPE = createPE(TopicCountAndReportPE.class);
             topicCountAndReportPE.setDownstream(aggregatedTopicStream);
             topicCountAndReportPE.setTimerInterval(10, TimeUnit.SECONDS);
-            Stream<TopicSeenEvent> topicSeenStream = createStream("TopicSeen", new KeyFinder<TopicSeenEvent>() {
+            Stream<Event> topicSeenStream = createStream("TopicSeen", new KeyFinder<Event>() {
 
                 @Override
-                public List<String> get(final TopicSeenEvent arg0) {
+                public List<String> get(final Event arg0) {
                     return new ArrayList<String>() {
                         {
-                            add(arg0.topic);
+                            add(arg0.get("topic"));
                         }
                     };
                 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/91a7fff8/test-apps/twitter-counter/src/main/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/resources/default.s4.properties b/test-apps/twitter-counter/src/main/resources/default.s4.properties
new file mode 100644
index 0000000..d5da3f3
--- /dev/null
+++ b/test-apps/twitter-counter/src/main/resources/default.s4.properties
@@ -0,0 +1,15 @@
+comm.queue_emmiter_size = 8000
+comm.queue_listener_size = 8000
+cluster.hosts = localhost
+cluster.ports = 5077
+cluster.name = s4-adapter-cluster
+cluster.zk_address = localhost:21810
+cluster.zk_session_timeout = 10000
+cluster.zk_connection_timeout = 10000
+comm.module = org.apache.s4.core.CustomModule
+s4.logger_level = DEBUG
+appsDir=/tmp/deploy-test
+tcp.partition.queue_size=1000
+comm.timeout=100
+comm.retry_delay=100
+comm.retries=10