You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/05/24 18:19:26 UTC

[3/5] git commit: inter-app communications + refactorings

inter-app communications + refactorings

- applications publish streams they produce and streams they consume
- RemoteStream instances send messages to remote consumers, identified by (cluster, appId),
through multiple senders and emitters (1 per consumer cluster)
- added related tests
- removed file-based cluster configuration: cluster configuration is now
through ZooKeeper only
- renamed Topology to Cluster, and Cluster to PhysicalCluster
- improved tests, minimized the number of configuration files
- commented out code for supporting network glitches: it blocked the sending
of messages upon cluster disconnections
- migrated to gradle 1.0-rc-3 , so that we can embed build operations with parameters


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/99b6f048
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/99b6f048
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/99b6f048

Branch: refs/heads/S4-22
Commit: 99b6f04824681cb5a687bc5c3f49d53ccada500a
Parents: a63cda9
Author: Matthieu Morel <mm...@apache.org>
Authored: Sat May 19 20:15:47 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Sat May 19 20:15:47 2012 +0200

----------------------------------------------------------------------
 build.gradle                                       |   24 ++-
 gradle/wrapper/gradle-wrapper.properties           |    2 +-
 gradlew                                            |  108 ++++----
 gradlew.bat                                        |   52 ++--
 subprojects/s4-comm/s4-comm.gradle                 |    4 +-
 .../src/main/java/org/apache/s4/comm/Module.java   |  111 --------
 .../org/apache/s4/comm/tcp/RemoteEmitters.java     |   34 +++
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |  107 ++++----
 .../java/org/apache/s4/comm/tcp/TCPListener.java   |   44 ++--
 .../org/apache/s4/comm/tcp/TCPRemoteEmitter.java   |   13 +-
 .../java/org/apache/s4/comm/tools/TaskSetup.java   |   53 ++---
 .../org/apache/s4/comm/topology/Assignment.java    |    8 +-
 .../s4/comm/topology/AssignmentFromFile.java       |  127 ---------
 .../apache/s4/comm/topology/AssignmentFromZK.java  |   45 ++--
 .../java/org/apache/s4/comm/topology/Cluster.java  |  133 +---------
 .../s4/comm/topology/ClusterChangeListener.java    |   10 +
 .../org/apache/s4/comm/topology/ClusterFromZK.java |  198 ++++++++++++++
 .../java/org/apache/s4/comm/topology/Clusters.java |   12 +
 .../apache/s4/comm/topology/ClustersFromZK.java    |  103 +++++++
 .../apache/s4/comm/topology/PhysicalCluster.java   |  125 +++++++++
 .../org/apache/s4/comm/topology/RemoteCluster.java |   10 +
 .../org/apache/s4/comm/topology/RemoteStreams.java |  211 +++++++++++++++
 .../apache/s4/comm/topology/RemoteTopology.java    |    5 -
 .../s4/comm/topology/RemoteTopologyFromZK.java     |   16 --
 .../apache/s4/comm/topology/StreamConsumer.java    |   27 ++
 .../java/org/apache/s4/comm/topology/Topology.java |    9 -
 .../s4/comm/topology/TopologyChangeListener.java   |    5 -
 .../apache/s4/comm/topology/TopologyFromFile.java  |   33 ---
 .../apache/s4/comm/topology/TopologyFromZK.java    |  144 ----------
 .../java/org/apache/s4/comm/topology/ZNRecord.java |   17 ++-
 .../java/org/apache/s4/comm/udp/UDPEmitter.java    |   18 +-
 .../java/org/apache/s4/comm/DeliveryTestUtil.java  |   19 +-
 .../java/org/apache/s4/comm/tcp/TCPCommTest.java   |    5 +-
 .../s4/comm/topology/AssignmentFromZKTest.java     |   43 ---
 .../s4/comm/topology/AssignmentsFromZKTest.java    |   69 +++++
 .../s4/comm/topology/ClustersFromZKTest.java       |   97 +++++++
 .../s4/comm/topology/TopologyFromZKTest.java       |   78 ------
 .../org/apache/s4/comm/topology/ZKBaseTest.java    |   37 +--
 .../java/org/apache/s4/comm/udp/UDPCommTest.java   |    2 +-
 .../java/org/apache/s4/fixtures/CommTestUtils.java |    4 +-
 .../FileBasedClusterManagementTestModule.java      |   77 ------
 .../ZkBasedClusterManagementTestModule.java        |   36 ++-
 .../java/org/apache/s4/fixtures/ZkBasedTest.java   |   47 +---
 .../src/test/resources/default.s4.properties       |    5 +-
 .../src/test/resources/s4-comm-test.properties     |   10 -
 subprojects/s4-core/s4-core.gradle                 |   10 +-
 .../src/main/java/org/apache/s4/core/App.java      |   44 +++-
 .../main/java/org/apache/s4/core/CustomModule.java |   94 -------
 .../java/org/apache/s4/core/DefaultModule.java     |   99 +++++++
 .../src/main/java/org/apache/s4/core/Main.java     |    5 +-
 .../src/main/java/org/apache/s4/core/Receiver.java |    6 +-
 .../main/java/org/apache/s4/core/RemoteSender.java |   28 ++-
 .../java/org/apache/s4/core/RemoteSenders.java     |   61 +++++
 .../main/java/org/apache/s4/core/RemoteStream.java |   77 ++++++
 .../src/main/java/org/apache/s4/core/Sender.java   |    2 -
 .../src/main/java/org/apache/s4/core/Server.java   |   67 +----
 .../src/main/java/org/apache/s4/core/Stream.java   |   19 +--
 .../java/org/apache/s4/core/adapter/Adapter.java   |   26 --
 .../org/apache/s4/core/adapter/AdapterMain.java    |   53 ----
 .../org/apache/s4/core/adapter/AdapterModule.java  |   97 -------
 .../org/apache/s4/core/adapter/RemoteStream.java   |   60 ----
 .../s4/deploy/DistributedDeploymentManager.java    |    2 +-
 .../test/java/org/apache/s4/core/TriggerTest.java  |   18 +-
 .../apache/s4/core/triggers/TriggeredModule.java   |    6 -
 .../apache/s4/deploy/TestAutomaticDeployment.java  |   66 ++---
 .../s4/deploy/prodcon/TestProducerConsumer.java    |   30 ++-
 .../java/org/apache/s4/fixtures/CoreTestUtils.java |   85 +++----
 .../java/org/apache/s4/fixtures/SocketAdapter.java |   27 +-
 .../org/apache/s4/fixtures/ZkBasedAppModule.java   |    9 +-
 .../org/apache/s4/wordcount/WordCountModule.java   |    4 +-
 .../org/apache/s4/wordcount/WordCountTest.java     |    9 +
 .../apache/s4/wordcount/zk/WordCountModuleZk.java  |    7 -
 .../apache/s4/wordcount/zk/WordCountTestZk.java    |   46 ----
 .../src/test/resources/default.s4.properties       |    6 +-
 .../resources/org.apache.s4.deploy.s4.properties   |   11 -
 .../test/resources/s4-counter-example.properties   |    7 -
 .../java/org/apache/s4/example/counter/Module.java |   81 +++----
 .../apache/s4/example/fluent/counter/Module.java   |   82 +++----
 subprojects/s4-tools/s4-tools.gradle               |   17 ++
 .../java/org/apache/s4/tools/DefineCluster.java    |    2 +-
 .../src/main/java/org/apache/s4/tools/Deploy.java  |   89 +++----
 test-apps/s4-counter/build.gradle                  |   18 +-
 test-apps/s4-showtime/build.gradle                 |   22 +-
 test-apps/simple-deployable-app-1/build.gradle     |   18 +-
 test-apps/simple-deployable-app-2/build.gradle     |   18 +-
 test-apps/twitter-adapter/build.gradle             |    8 +-
 .../s4/example/twitter/TwitterInputAdapter.java    |   13 +-
 test-apps/twitter-counter/build.gradle             |    5 +-
 .../s4/example/twitter/TwitterCounterApp.java      |    8 +-
 89 files changed, 1866 insertions(+), 1933 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index fd75057..58ff560 100644
--- a/build.gradle
+++ b/build.gradle
@@ -73,7 +73,11 @@ libraries = [
     zkclient:           'com.github.sgroschupf:zkclient:0.1',
     diezel:             'net.ericaro:diezel-maven-plugin:1.0.0-beta-4',
     jcommander:         'com.beust:jcommander:1.23',
-    commons_io:         'commons-io:commons-io:2.1'
+    commons_io:         'commons-io:commons-io:2.1',
+    gradle_base_services: 'gradle-base-services:gradle-base-services:1.0-rc-3',
+    gradle_core: 'gradle-core:gradle-core:1.0-rc-3',
+    gradle_tooling_api: 'gradle-tooling-api:gradle-tooling-api:1.0-rc-3',
+    gradle_wrapper: 'gradle-wrapper:gradle-wrapper:1.0-rc-3'
 ]
 
 subprojects {
@@ -94,6 +98,7 @@ subprojects {
         /* Google. */
         compile( libraries.guava )
         compile( libraries.guice )
+        compile (libraries.guice_assist)
 
         /* Logging. */
         compile( libraries.slf4j )
@@ -109,6 +114,16 @@ subprojects {
         compile( libraries.jcip )
         compile( libraries.zk )
 
+//        // added here since from 1.0 dependencies don't seem to
+//        // be copied in through the application plugin (s4-tools)
+//        compile libraries.jcommander
+//        compile libraries.zkclient
+//        compile libraries.commons_io
+//        compile libraries.gradle_base_services
+//        compile libraries.gradle_core
+//        compile libraries.gradle_tooling_api
+//        compile libraries.gradle_wrapper
+//
         /* Testing. */
         testCompile( libraries.junit )
     }
@@ -124,7 +139,7 @@ subprojects {
 
 dependsOnChildren()
 
-platformProjects = [project(':s4-base'), project(':s4-core'), project(':s4-comm')]
+platformProjects = [project(':s4-base'), project(':s4-core'), project(':s4-comm'), project(':s4-tools')]
 
 configurations {
     platformLibs
@@ -153,7 +168,7 @@ binDistImage = copySpec {
         into ("platform/lib") {
             from proj.sourceSets.main.resources
             from proj.configurations.runtime.getFiles()
-            from proj.configurations.archives.allArtifactFiles
+            from proj.configurations.archives.allArtifacts.files
         }
     }
 }
@@ -177,7 +192,8 @@ task listJars << {
 /* Generates the gradlew scripts.
  http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
 task wrapper(type: Wrapper) {
-    gradleVersion = '1.0-milestone-3'
+    gradleVersion = '1.0-rc-3'
+    jarFile='lib/gradle-wrapper-1.0-rc-3.jar'
 }
 
 class Version {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/gradle/wrapper/gradle-wrapper.properties
----------------------------------------------------------------------
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 7515ec6..1d9b9c0 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
 zipStoreBase=GRADLE_USER_HOME
 zipStorePath=wrapper/dists
-distributionUrl=http\://repo.gradle.org/gradle/distributions/gradle-1.0-milestone-3-bin.zip
+distributionUrl=http\://services.gradle.org/distributions/gradle-1.0-rc-3-bin.zip

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/gradlew
----------------------------------------------------------------------
diff --git a/gradlew b/gradlew
index d8809f1..28e6f10 100755
--- a/gradlew
+++ b/gradlew
@@ -1,16 +1,16 @@
 #!/bin/bash
 
 ##############################################################################
-##                                                                          ##
-##  Gradle wrapper script for UN*X                                         ##
-##                                                                          ##
+##
+##  Gradle start up script for UN*X
+##
 ##############################################################################
 
-# Uncomment those lines to set JVM options. GRADLE_OPTS and JAVA_OPTS can be used together.
-# GRADLE_OPTS="$GRADLE_OPTS -Xmx512m"
-# JAVA_OPTS="$JAVA_OPTS -Xmx512m"
+# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+DEFAULT_JVM_OPTS=""
 
-GRADLE_APP_NAME=Gradle
+APP_NAME="Gradle"
+APP_BASE_NAME=`basename "$0"`
 
 # Use the maximum available, or set MAX_FD != -1 to use that value.
 MAX_FD="maximum"
@@ -42,54 +42,51 @@ case "`uname`" in
     ;;
 esac
 
-# Attempt to set JAVA_HOME if it's not already set.
-if [ -z "$JAVA_HOME" ] ; then
-    if $darwin ; then
-        [ -z "$JAVA_HOME" -a -d "/Library/Java/Home" ] && export JAVA_HOME="/Library/Java/Home"
-        [ -z "$JAVA_HOME" -a -d "/System/Library/Frameworks/JavaVM.framework/Home" ] && export JAVA_HOME="/System/Library/Frameworks/JavaVM.framework/Home"
-    else
-        javaExecutable="`which javac`"
-        [ -z "$javaExecutable" -o "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ] && die "JAVA_HOME not set and cannot find javac to deduce location, please set JAVA_HOME."
-        # readlink(1) is not available as standard on Solaris 10.
-        readLink=`which readlink`
-        [ `expr "$readLink" : '\([^ ]*\)'` = "no" ] && die "JAVA_HOME not set and readlink not available, please set JAVA_HOME."
-        javaExecutable="`readlink -f \"$javaExecutable\"`"
-        javaHome="`dirname \"$javaExecutable\"`"
-        javaHome=`expr "$javaHome" : '\(.*\)/bin'`
-        export JAVA_HOME="$javaHome"
-    fi
-fi
-
 # For Cygwin, ensure paths are in UNIX format before anything is touched.
 if $cygwin ; then
-    [ -n "$JAVACMD" ] && JAVACMD=`cygpath --unix "$JAVACMD"`
     [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
 fi
 
-STARTER_MAIN_CLASS=org.gradle.wrapper.GradleWrapperMain
-CLASSPATH=`dirname "$0"`/gradle/wrapper/gradle-wrapper.jar
-WRAPPER_PROPERTIES=`dirname "$0"`/gradle/wrapper/gradle-wrapper.properties
+# Attempt to set APP_HOME
+# Resolve links: $0 may be a link
+PRG="$0"
+# Need this for relative symlinks.
+while [ -h "$PRG" ] ; do
+    ls=`ls -ld "$PRG"`
+    link=`expr "$ls" : '.*-> \(.*\)$'`
+    if expr "$link" : '/.*' > /dev/null; then
+        PRG="$link"
+    else
+        PRG=`dirname "$PRG"`"/$link"
+    fi
+done
+SAVED="`pwd`"
+cd "`dirname \"$PRG\"`/"
+APP_HOME="`pwd -P`"
+cd "$SAVED"
+
+CLASSPATH=$APP_HOME/lib/gradle-wrapper-1.0-rc-3.jar
+
 # Determine the Java command to use to start the JVM.
-if [ -z "$JAVACMD" ] ; then
-    if [ -n "$JAVA_HOME" ] ; then
-        if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
-            # IBM's JDK on AIX uses strange locations for the executables
-            JAVACMD="$JAVA_HOME/jre/sh/java"
-        else
-            JAVACMD="$JAVA_HOME/bin/java"
-        fi
+if [ -n "$JAVA_HOME" ] ; then
+    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+        # IBM's JDK on AIX uses strange locations for the executables
+        JAVACMD="$JAVA_HOME/jre/sh/java"
     else
-        JAVACMD="java"
+        JAVACMD="$JAVA_HOME/bin/java"
     fi
-fi
-if [ ! -x "$JAVACMD" ] ; then
-    die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
+    if [ ! -x "$JAVACMD" ] ; then
+        die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+    fi
+else
+    JAVACMD="java"
+    which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
 
 Please set the JAVA_HOME variable in your environment to match the
 location of your Java installation."
-fi
-if [ -z "$JAVA_HOME" ] ; then
-    warn "JAVA_HOME environment variable is not set"
 fi
 
 # Increase the maximum file descriptors if we can.
@@ -108,15 +105,14 @@ if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then
     fi
 fi
 
-# For Darwin, add GRADLE_APP_NAME to the JAVA_OPTS as -Xdock:name
+# For Darwin, add options to specify how the application appears in the dock
 if $darwin; then
-    JAVA_OPTS="$JAVA_OPTS -Xdock:name=$GRADLE_APP_NAME"
-# we may also want to set -Xdock:image
+    GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
 fi
 
 # For Cygwin, switch paths to Windows format before running java
 if $cygwin ; then
-    JAVA_HOME=`cygpath --path --mixed "$JAVA_HOME"`
+    APP_HOME=`cygpath --path --mixed "$APP_HOME"`
     CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
 
     # We build the pattern for arguments to be converted via cygpath
@@ -143,7 +139,7 @@ if $cygwin ; then
             eval `echo args$i`="\"$arg\""
         fi
         i=$((i+1))
-    done 
+    done
     case $i in
         (0) set -- ;;
         (1) set -- "$args0" ;;
@@ -158,11 +154,11 @@ if $cygwin ; then
     esac
 fi
 
-GRADLE_APP_BASE_NAME=`basename "$0"`
+# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules
+function splitJvmOpts() {
+    JVM_OPTS=("$@")
+}
+eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
+JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
 
-exec "$JAVACMD" $JAVA_OPTS $GRADLE_OPTS \
-        -classpath "$CLASSPATH" \
-        -Dorg.gradle.appname="$GRADLE_APP_BASE_NAME" \
-        -Dorg.gradle.wrapper.properties="$WRAPPER_PROPERTIES" \
-        $STARTER_MAIN_CLASS \
-        "$@"
+exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/gradlew.bat
----------------------------------------------------------------------
diff --git a/gradlew.bat b/gradlew.bat
index 4855abb..d410d61 100644
--- a/gradlew.bat
+++ b/gradlew.bat
@@ -1,24 +1,37 @@
 @if "%DEBUG%" == "" @echo off
 @rem ##########################################################################
-@rem                                                                         ##
-@rem  Gradle startup script for Windows                                      ##
-@rem                                                                         ##
+@rem
+@rem  Gradle startup script for Windows
+@rem
 @rem ##########################################################################
 
 @rem Set local scope for the variables with windows NT shell
 if "%OS%"=="Windows_NT" setlocal
 
-@rem Uncomment those lines to set JVM options. GRADLE_OPTS and JAVA_OPTS can be used together.
-@rem set GRADLE_OPTS=%GRADLE_OPTS% -Xmx512m
-@rem set JAVA_OPTS=%JAVA_OPTS% -Xmx512m
+@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+set DEFAULT_JVM_OPTS=
 
 set DIRNAME=%~dp0
-if "%DIRNAME%" == "" set DIRNAME=.\
+if "%DIRNAME%" == "" set DIRNAME=.
+set APP_BASE_NAME=%~n0
+set APP_HOME=%DIRNAME%
 
 @rem Find java.exe
+if defined JAVA_HOME goto findJavaFromJavaHome
+
 set JAVA_EXE=java.exe
-if not defined JAVA_HOME goto init
+%JAVA_EXE% -version >NUL 2>&1
+if "%ERRORLEVEL%" == "0" goto init
+
+echo.
+echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+echo.
+echo Please set the JAVA_HOME variable in your environment to match the
+echo location of your Java installation.
 
+goto fail
+
+:findJavaFromJavaHome
 set JAVA_HOME=%JAVA_HOME:"=%
 set JAVA_EXE=%JAVA_HOME%/bin/java.exe
 
@@ -29,14 +42,14 @@ echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
 echo.
 echo Please set the JAVA_HOME variable in your environment to match the
 echo location of your Java installation.
-echo.
-goto end
+
+goto fail
 
 :init
 @rem Get command-line arguments, handling Windowz variants
 
 if not "%OS%" == "Windows_NT" goto win9xME_args
-if "%eval[2+2]" == "4" goto 4NT_args
+if "%@eval[2+2]" == "4" goto 4NT_args
 
 :win9xME_args
 @rem Slurp the command line arguments.
@@ -56,27 +69,22 @@ set CMD_LINE_ARGS=%$
 :execute
 @rem Setup the command line
 
-set STARTER_MAIN_CLASS=org.gradle.wrapper.GradleWrapperMain
-set CLASSPATH=%DIRNAME%\gradle\wrapper\gradle-wrapper.jar
-set WRAPPER_PROPERTIES=%DIRNAME%\gradle\wrapper\gradle-wrapper.properties
-
-set GRADLE_OPTS=%JAVA_OPTS% %GRADLE_OPTS% -Dorg.gradle.wrapper.properties="%WRAPPER_PROPERTIES%"
+set CLASSPATH=%APP_HOME%\lib\gradle-wrapper-1.0-rc-3.jar
 
 @rem Execute Gradle
-"%JAVA_EXE%" %GRADLE_OPTS% -classpath "%CLASSPATH%" %STARTER_MAIN_CLASS% %CMD_LINE_ARGS%
+"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
 
 :end
 @rem End local scope for the variables with windows NT shell
 if "%ERRORLEVEL%"=="0" goto mainEnd
 
-if not "%OS%"=="Windows_NT" echo 1 > nul | choice /n /c:1
-
+:fail
 rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
 rem the _cmd.exe /c_ return code!
-if  not "" == "%GRADLE_EXIT_CONSOLE%" exit "%ERRORLEVEL%"
-exit /b "%ERRORLEVEL%"
+if  not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
+exit /b 1
 
 :mainEnd
 if "%OS%"=="Windows_NT" endlocal
 
-:omega
\ No newline at end of file
+:omega

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/s4-comm.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/s4-comm.gradle b/subprojects/s4-comm/s4-comm.gradle
index b88239c..88abefd 100644
--- a/subprojects/s4-comm/s4-comm.gradle
+++ b/subprojects/s4-comm/s4-comm.gradle
@@ -15,7 +15,7 @@
  */
 
 description = 'Implementation-specific components of the communication layer.'
- 
+
 dependencies {
     compile project(":s4-base")
     compile libraries.json
@@ -27,7 +27,7 @@ dependencies {
 
 task testJar(type: Jar) {
     baseName = "test-${project.archivesBaseName}"
-    from sourceSets.test.classes
+    from sourceSets.test.output
 }
 
 configurations {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/Module.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/Module.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/Module.java
deleted file mode 100644
index 36b33fc..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/Module.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package org.apache.s4.comm;
-
-
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-/**
- * Default comm layer module. To change the module, modify property
- * <tt>comm.module</tt> in <tt>/s4-comm.properties</tt>
- * 
- * @author Leo Neumeyer
- */
-public class Module extends AbstractModule {
-
-    protected PropertiesConfiguration config = null;
-
-    private void loadProperties(Binder binder) {
-
-        try {
-            InputStream is = this.getClass().getResourceAsStream(
-                    "/s4-comm.properties");
-            config = new PropertiesConfiguration();
-            config.load(is);
-
-            System.out.println(ConfigurationUtils.toString(config));
-            // TODO - validate properties.
-
-            /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder,
-                    ConfigurationConverter.getProperties(config));
-        } catch (ConfigurationException e) {
-            binder.addError(e);
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    protected void configure() {
-        if (config == null)
-            loadProperties(binder());
-
-        int numHosts = config.getList("cluster.hosts").size();
-        boolean isCluster = numHosts > 1 ? true : false;
-        bind(Boolean.class).annotatedWith(Names.named("isCluster"))
-                .toInstance(Boolean.valueOf(isCluster));
-
-        bind(Cluster.class);
-
-        /* Configure static assignment using a configuration file. */
-        bind(Assignment.class).to(AssignmentFromFile.class);
-
-        /* Configure a static cluster topology using a configuration file. */
-        bind(Topology.class).to(TopologyFromFile.class);
-
-        // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
-        // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
-        //
-        // bind(Emitter.class).to(QueueingEmitter.class);
-        // bind(Listener.class).to(QueueingListener.class);
-
-        /* Use the Netty comm layer implementation. */
-        // bind(Emitter.class).to(NettyEmitter.class);
-        // bind(Listener.class).to(NettyListener.class);
-
-        /* Use a simple UDP comm layer implementation. */
-        bind(Emitter.class).to(UDPEmitter.class);
-        bind(Listener.class).to(UDPListener.class);
-
-        /* The hashing function to map keys top partitions. */
-        bind(Hasher.class).to(DefaultHasher.class);
-
-        /* Use Kryo to serialize events. */
-        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
new file mode 100644
index 0000000..7623d52
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
@@ -0,0 +1,34 @@
+package org.apache.s4.comm.tcp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.comm.RemoteEmitterFactory;
+import org.apache.s4.comm.topology.Cluster;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * Manages the {@link RemoteEmitter} instances for sending messages to remote subclusters.
+ * 
+ */
+@Singleton
+public class RemoteEmitters {
+
+    Map<Cluster, RemoteEmitter> emitters = new HashMap<Cluster, RemoteEmitter>();
+
+    @Inject
+    RemoteEmitterFactory emitterFactory;
+
+    public RemoteEmitter getEmitter(Cluster topology) {
+        RemoteEmitter emitter = emitters.get(topology);
+        if (emitter == null) {
+            emitter = emitterFactory.createRemoteEmitter(topology);
+            emitters.put(topology, emitter);
+        }
+        return emitter;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 90b4686..623172a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -11,8 +11,8 @@ import org.apache.s4.base.Emitter;
 import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.topology.ClusterNode;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyChangeListener;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.ClusterChangeListener;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -35,12 +35,17 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.HashBiMap;
 import com.google.inject.Inject;
 
-public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChangeListener {
+/**
+ * 
+ * Sends messages through TCP, to the associated subcluster.
+ * 
+ */
+public class TCPEmitter implements Emitter, ChannelFutureListener, ClusterChangeListener {
     private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
     private static final int BUFFER_SIZE = 10;
     private static final int NUM_RETRIES = 10;
 
-    private Topology topology;
+    private Cluster topology;
     private final ClientBootstrap bootstrap;
 
     @Inject
@@ -100,10 +105,10 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
     private MessageQueuesPerPartition queuedMessages = new MessageQueuesPerPartition(true);
 
     @Inject
-    public TCPEmitter(Topology topology) throws InterruptedException {
+    public TCPEmitter(Cluster topology) throws InterruptedException {
         this.topology = topology;
         topology.addListener(this);
-        int clusterSize = this.topology.getTopology().getNodes().size();
+        int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
 
         partitionChannelMap = HashBiMap.create(clusterSize);
         partitionNodeMap = HashBiMap.create(clusterSize);
@@ -117,8 +122,8 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
             @Override
             public ChannelPipeline getPipeline() {
                 ChannelPipeline p = Channels.pipeline();
-                p.addLast("1", new LengthFieldPrepender(4));
-                p.addLast("2", new TestHandler());
+                p.addLast("Framing", new LengthFieldPrepender(4));
+                p.addLast("ChannelStateMonitor", new ChannelStateMonitoringHandler());
                 return p;
             }
         });
@@ -131,11 +136,11 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
         ClusterNode clusterNode = partitionNodeMap.get(partitionId);
 
         if (clusterNode == null) {
-            if (topology.getTopology().getNodes().size() == 0) {
+            if (topology.getPhysicalCluster().getNodes().size() == 0) {
                 logger.error("No node in cluster ");
                 return false;
             }
-            clusterNode = topology.getTopology().getNodes().get(partitionId);
+            clusterNode = topology.getPhysicalCluster().getNodes().get(partitionId);
             partitionNodeMap.forcePut(partitionId, clusterNode);
         }
 
@@ -184,37 +189,44 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
             }
         }
 
-        /*
-         * Try limiting the size of the send queue inside Netty
-         */
-        if (!channel.isWritable()) {
-            synchronized (sendLock) {
-                // check again now that we have the lock
-                while (!channel.isWritable()) {
-                    try {
-                        sendLock.wait();
-                    } catch (InterruptedException ie) {
-                        return false;
-                    }
-                }
-            }
-        }
-
-        /*
-         * Channel is available. Write messages in the following order: (1) Messages already on wire, (2) Previously
-         * buffered messages, and (3) the Current Message
-         * 
-         * Once the channel returns success delete from the messagesOnTheWire
-         */
-        byte[] messageBeingSent = null;
-        // while ((messageBeingSent = messagesOnTheWire.peek(partitionId)) != null) {
-        // writeMessageToChannel(channel, partitionId, messageBeingSent, false);
+        // /*
+        // * Try limiting the size of the send queue inside Netty
+
+        // FIXME this does not work: in case of a disconnection, the channel remains non writeable, and the lock is
+        // never released, hence blocking.
+        // should be fixed using: 1/ configurable timeouts (should drop pending messages after the timeout) 2/ make sure
+        // the handler is correctly responding to disconnections/reconnections 3/ there should be a lock per channel,
+        // no?
+
+        // */
+        // if (!channel.isWritable()) {
+        // synchronized (sendLock) {
+        // // check again now that we have the lock
+        // while (!channel.isWritable()) {
+        // try {
+        // sendLock.wait();
+        // } catch (InterruptedException ie) {
+        // return false;
+        // }
+        // }
+        // }
         // }
 
-        while ((messageBeingSent = queuedMessages.peek(partitionId)) != null) {
-            writeMessageToChannel(channel, partitionId, messageBeingSent);
-            queuedMessages.remove(partitionId);
-        }
+        // /*
+        // * Channel is available. Write messages in the following order: (1) Messages already on wire, (2) Previously
+        // * buffered messages, and (3) the Current Message
+        // *
+        // * Once the channel returns success delete from the messagesOnTheWire
+        // */
+        // byte[] messageBeingSent = null;
+        // // while ((messageBeingSent = messagesOnTheWire.peek(partitionId)) != null) {
+        // // writeMessageToChannel(channel, partitionId, messageBeingSent, false);
+        // // }
+        //
+        // while ((messageBeingSent = queuedMessages.peek(partitionId)) != null) {
+        // writeMessageToChannel(channel, partitionId, messageBeingSent);
+        // queuedMessages.remove(partitionId);
+        // }
 
         writeMessageToChannel(channel, partitionId, serDeser.serialize(message));
         return true;
@@ -242,7 +254,7 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
         /*
          * Close the channels that correspond to changed partitions and update partitionNodeMap
          */
-        for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
+        for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
             Integer partition = clusterNode.getPartition();
             ClusterNode oldNode = partitionNodeMap.get(partition);
 
@@ -259,19 +271,20 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
     @Override
     public int getPartitionCount() {
         // Number of nodes is not same as number of partitions
-        return topology.getTopology().getPartitionCount();
+        return topology.getPhysicalCluster().getPartitionCount();
     }
 
-    class TestHandler extends SimpleChannelHandler {
+    class ChannelStateMonitoringHandler extends SimpleChannelHandler {
         @Override
         public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
             // logger.info(String.format("%08x %08x %08x", e.getValue(),
             // e.getChannel().getInterestOps(), Channel.OP_WRITE));
-            synchronized (sendLock) {
-                if (e.getChannel().isWritable()) {
-                    sendLock.notify();
-                }
-            }
+            // FIXME see above comments about fixing the buffering of messages
+            // synchronized (sendLock) {
+            // if (e.getChannel().isWritable()) {
+            // sendLock.notify();
+            // }
+            // }
             ctx.sendUpstream(e);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index b3776e1..58348af 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -14,6 +14,7 @@ import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
@@ -25,21 +26,22 @@ import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 
-
+/**
+ * Receives messages through TCP for the assigned subcluster.
+ * 
+ */
 public class TCPListener implements Listener {
     private BlockingQueue<byte[]> handoffQueue = new SynchronousQueue<byte[]>();
     private ClusterNode node;
     private static final Logger logger = LoggerFactory.getLogger(TCPListener.class);
-    
+
     @Inject
     public TCPListener(Assignment assignment) {
         // wait for an assignment
         node = assignment.assignClusterNode();
-        
-        ChannelFactory factory =
-            new NioServerSocketChannelFactory(
-                    Executors.newCachedThreadPool(),
-                    Executors.newCachedThreadPool());
+
+        ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+                Executors.newCachedThreadPool());
 
         ServerBootstrap bootstrap = new ServerBootstrap(factory);
 
@@ -48,38 +50,37 @@ public class TCPListener implements Listener {
                 ChannelPipeline p = Channels.pipeline();
                 p.addLast("1", new LengthFieldBasedFrameDecoder(999999, 0, 4, 0, 4));
                 p.addLast("2", new ChannelHandler(handoffQueue));
-                
+
                 return p;
             }
         });
 
         bootstrap.setOption("child.tcpNoDelay", true);
         bootstrap.setOption("child.keepAlive", true);
-        
+
         bootstrap.bind(new InetSocketAddress(node.getPort()));
     }
-    
+
     public byte[] recv() {
         try {
             return handoffQueue.take();
         } catch (InterruptedException e) {
-        	return null;
+            return null;
         }
     }
-    
+
     public int getPartitionId() {
         return node.getPartition();
     }
-    
+
     public class ChannelHandler extends SimpleChannelHandler {
         private BlockingQueue<byte[]> handoffQueue;
-        
+
         public ChannelHandler(BlockingQueue<byte[]> handOffQueue) {
             this.handoffQueue = handOffQueue;
         }
-        
-        public void messageReceived(ChannelHandlerContext ctx,
-                MessageEvent e) {
+
+        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
             ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
             try {
                 handoffQueue.put(buffer.array()); // this holds up the Netty upstream I/O thread if
@@ -88,7 +89,7 @@ public class TCPListener implements Listener {
                 Thread.currentThread().interrupt();
             }
         }
-        
+
         public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
             logger.error("Error", event.getCause());
             if (context.getChannel().isOpen()) {
@@ -96,5 +97,12 @@ public class TCPListener implements Listener {
                 context.getChannel().close();
             }
         }
+
+        @Override
+        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+            // TODO Auto-generated method stub
+            super.channelClosed(ctx, e);
+        }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
index 0fa663f..7dfa2c8 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
@@ -1,14 +1,23 @@
 package org.apache.s4.comm.tcp;
 
 import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.comm.topology.RemoteTopology;
+import org.apache.s4.comm.topology.Cluster;
 
 import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
 
+/**
+ * Emitter to remote subclusters.
+ * 
+ */
 public class TCPRemoteEmitter extends TCPEmitter implements RemoteEmitter {
 
+    /**
+     * Sends to remote subclusters. This is dynamically created, through an injected factory, when new subclusters are
+     * discovered (as remote streams outputs)
+     */
     @Inject
-    public TCPRemoteEmitter(RemoteTopology topology) throws InterruptedException {
+    public TCPRemoteEmitter(@Assisted Cluster topology) throws InterruptedException {
         super(topology);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
index 0466993..0458c70 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
@@ -1,9 +1,8 @@
 package org.apache.s4.comm.tools;
 
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.ClusterNode;
-import org.apache.s4.comm.topology.TopologyFromZK;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.exception.ZkException;
 import org.apache.s4.comm.topology.ZNRecord;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
 import org.apache.s4.comm.topology.ZkClient;
@@ -15,46 +14,38 @@ public class TaskSetup {
     public TaskSetup(String zookeeperAddress) {
         zkclient = new ZkClient(zookeeperAddress);
         zkclient.setZkSerializer(new ZNRecordSerializer());
-        zkclient.waitUntilConnected();
+        if (!zkclient.waitUntilConnected(10, TimeUnit.SECONDS)) {
+            throw new RuntimeException("Could not connect to ZooKeeper after 10 seconds.");
+        }
+    }
+
+    public void clean(String clusterName, String topologyName) {
+        zkclient.deleteRecursive("/s4/clusters/" + topologyName);
     }
 
     public void clean(String clusterName) {
         zkclient.deleteRecursive("/" + clusterName);
     }
 
-    public void setup(String clusterName, int tasks, int initialPort) {
-        zkclient.createPersistent("/" + clusterName + "/tasks", true);
-        zkclient.createPersistent("/" + clusterName + "/process", true);
-        zkclient.createPersistent("/" + clusterName + "/apps", true);
+    public void setup(String cluster, int tasks, int initialPort) {
+        try {
+            zkclient.createPersistent("/s4/streams", true);
+        } catch (ZkException ignored) {
+            // ignore if exists
+        }
+
+        zkclient.createPersistent("/s4/clusters/" + cluster + "/tasks", true);
+        zkclient.createPersistent("/s4/clusters/" + cluster + "/process", true);
+        zkclient.createPersistent("/s4/clusters/" + cluster + "/apps", true);
         for (int i = 0; i < tasks; i++) {
             String taskId = "Task-" + i;
             ZNRecord record = new ZNRecord(taskId);
             record.putSimpleField("taskId", taskId);
             record.putSimpleField("port", String.valueOf(initialPort + i));
             record.putSimpleField("partition", String.valueOf(i));
-            record.putSimpleField("cluster", clusterName);
-            zkclient.createPersistent("/" + clusterName + "/tasks/" + taskId,
-                    record);
+            record.putSimpleField("cluster", cluster);
+            zkclient.createPersistent("/s4/clusters/" + cluster + "/tasks/" + taskId, record);
         }
     }
 
-    public static void main(String[] args) throws Exception {
-        TaskSetup taskSetup = new TaskSetup("localhost:2181");
-        String clusterName = "test-s4-cluster";
-        taskSetup.clean(clusterName);
-        taskSetup.setup(clusterName, 10, 1300);
-        String zookeeperAddress = "localhost:2181";
-        for (int i = 0; i < 10; i++) {
-            AssignmentFromZK assignmentFromZK = new AssignmentFromZK(
-                    clusterName, zookeeperAddress, 30000, 30000);
-            ClusterNode assignClusterNode = assignmentFromZK
-                    .assignClusterNode();
-            System.out.println(i+"-->"+assignClusterNode);
-        }
-        TopologyFromZK topologyFromZK=new TopologyFromZK(clusterName, zookeeperAddress, 30000, 30000);
-        Thread.sleep(3000);
-        Cluster topology = topologyFromZK.getTopology();
-        System.out.println(topology.getNodes().size());
-        Thread.currentThread().join();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Assignment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Assignment.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Assignment.java
index fc12549..0cac238 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Assignment.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Assignment.java
@@ -2,12 +2,10 @@ package org.apache.s4.comm.topology;
 
 /**
  * 
- * Upon startup an S4 process in a cluster must be assigned one and only one of
- * the available cluster nodes. Cluster nodes ({@link ClusterNode}) are defined
- * using a configuration mechanism at startup.
+ * Upon startup an S4 process in a cluster must be assigned one and only one of the available cluster nodes. Cluster
+ * nodes ({@link ClusterNode}) are defined using a configuration mechanism at startup.
  * 
- * The Assignment implementation is responsible for coordinating how cluster
- * nodes are uniquely assigned to processes.
+ * The Assignment implementation is responsible for coordinating how cluster nodes are uniquely assigned to processes.
  * 
  */
 public interface Assignment {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromFile.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromFile.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromFile.java
deleted file mode 100644
index d82a818..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromFile.java
+++ /dev/null
@@ -1,127 +0,0 @@
-package org.apache.s4.comm.topology;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.net.InetAddress;
-import java.nio.channels.FileLock;
-
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.ClusterNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
-/**
- * Implements the assignment interface {@link Assignment} using a file lock.
- * 
- */
-public class AssignmentFromFile implements Assignment {
-    private static final Logger logger = LoggerFactory.getLogger(AssignmentFromFile.class);
-    final private Cluster cluster;
-    final private String lockDir;
-
-    @Inject
-    public AssignmentFromFile(Cluster cluster, @Named("cluster.lock_dir") String lockDir) {
-        this.cluster = cluster;
-        this.lockDir = lockDir;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.s4.comm.topology.Assignment#assignClusterNode()
-     */
-    @Override
-    public ClusterNode assignClusterNode() {
-        while (true) {
-            try {
-                for (ClusterNode node : cluster.getNodes()) {
-                    boolean partitionAvailable = canTakeupProcess(node);
-                    logger.info("Partition available: " + partitionAvailable);
-                    if (partitionAvailable) {
-                        boolean success = takeProcess(node);
-                        logger.info("Acquire partition:" + ((success) ? "success." : "failure."));
-                        if (success) {
-                            return node;
-                        }
-                    }
-                }
-                Thread.sleep(5000);
-
-            } catch (Exception e) {
-                logger.error("Exception in assignPartition Method:", e);
-            }
-        }
-    }
-
-    private boolean takeProcess(ClusterNode node) {
-        File lockFile = null;
-        try {
-            // TODO:contruct from processConfig
-            String lockFileName = createLockFileName(node);
-            lockFile = new File(lockFileName);
-            if (!lockFile.exists()) {
-                FileOutputStream fos = new FileOutputStream(lockFile);
-                FileLock fl = fos.getChannel().tryLock();
-                if (fl != null) {
-                    String message = "Partition acquired by PID:" + getPID() + " HOST:"
-                            + InetAddress.getLocalHost().getHostName();
-                    fos.write(message.getBytes());
-                    fos.close();
-                    logger.info(message + "  Lock File location: " + lockFile.getAbsolutePath());
-                    return true;
-                }
-            }
-        } catch (Exception e) {
-            logger.error("Exception trying to take up partition:" + node.getPartition(), e);
-        } finally {
-            if (lockFile != null) {
-                lockFile.deleteOnExit();
-            }
-        }
-        return false;
-    }
-
-    private String createLockFileName(ClusterNode node) {
-        String lockFileName = "s4-" + node.getPartition() + ".lock";
-        if (lockDir != null && lockDir.trim().length() > 0) {
-            File file = new File(lockDir);
-            if (!file.exists()) {
-                file.mkdirs();
-            }
-            return lockDir + "/" + lockFileName;
-        } else {
-            return lockFileName;
-        }
-    }
-
-    private boolean canTakeupProcess(ClusterNode node) {
-        try {
-            InetAddress inetAddress = InetAddress.getByName(node.getMachineName());
-            logger.info("Host Name: " + InetAddress.getLocalHost().getCanonicalHostName());
-            if (!node.getMachineName().equals("localhost")) {
-                if (!InetAddress.getLocalHost().equals(inetAddress)) {
-                    return false;
-                }
-            }
-        } catch (Exception e) {
-            logger.error("Invalid host:" + node.getMachineName());
-            return false;
-        }
-        String lockFileName = createLockFileName(node);
-        File lockFile = new File(lockFileName);
-        if (!lockFile.exists()) {
-            return true;
-        } else {
-            logger.info("Partition taken up by another process lockFile:" + lockFileName);
-        }
-        return false;
-    }
-
-    private long getPID() {
-        String processName = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
-        return Long.parseLong(processName.split("@")[0]);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index 2cd3dbd..d172d73 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -13,6 +13,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.data.Stat;
@@ -23,11 +24,7 @@ import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
 public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateListener, IZkDataListener {
-    private static final Logger logger = LoggerFactory.getLogger(TopologyFromZK.class);
-    /*
-     * Name of the cluster
-     */
-    private final String clusterName;
+    private static final Logger logger = LoggerFactory.getLogger(AssignmentFromZK.class);
     /**
      * Current state of connection with ZK
      */
@@ -64,15 +61,19 @@ public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateL
      * Holds the reference to ClusterNode which points to the current partition owned
      */
     AtomicReference<ClusterNode> clusterNodeRef;
+    private int connectionTimeout;
+    private String clusterName;
 
+    // TODO we currently have a single assignment per node (i.e. a node can only belong to 1 topology)
     @Inject
     public AssignmentFromZK(@Named("cluster.name") String clusterName,
             @Named("cluster.zk_address") String zookeeperAddress,
             @Named("cluster.zk_session_timeout") int sessionTimeout,
             @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
         this.clusterName = clusterName;
-        taskPath = "/" + clusterName + "/" + "tasks";
-        processPath = "/" + clusterName + "/" + "process";
+        this.connectionTimeout = connectionTimeout;
+        taskPath = "/s4/clusters/" + clusterName + "/tasks";
+        processPath = "/s4/clusters/" + clusterName + "/process";
         lock = new ReentrantLock();
         clusterNodeRef = new AtomicReference<ClusterNode>();
         taskAcquired = lock.newCondition();
@@ -89,7 +90,9 @@ public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateL
         ZkSerializer serializer = new ZNRecordSerializer();
         zkClient.setZkSerializer(serializer);
         zkClient.subscribeStateChanges(this);
-        zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
+        if (!zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS)) {
+            throw new Exception("cannot connect to zookeeper");
+        }
         // bug in zkClient, it does not invoke handleNewSession the first time
         // it connects
         this.handleStateChanged(KeeperState.SyncConnected);
@@ -109,11 +112,17 @@ public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateL
     @Override
     public void handleStateChanged(KeeperState state) throws Exception {
         this.state = state;
+        if (!state.equals(KeeperState.SyncConnected)) {
+            logger.warn("Session not connected for cluster [{}]: [{}]. Trying to reconnect", clusterName, state.name());
+            zkClient.close();
+            zkClient.connect(connectionTimeout, null);
+            handleNewSession();
+        }
     }
 
     @Override
     public void handleNewSession() {
-        logger.info("New session:" + zkClient.getSessionId());
+        logger.info("New session:" + zkClient.getSessionId() + "; state is : " + state.name());
         currentlyOwningTask.set(false);
         zkClient.subscribeChildChanges(taskPath, this);
         zkClient.subscribeChildChanges(processPath, this);
@@ -174,13 +183,17 @@ public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateL
                         zkClient.createEphemeral(processPath + "/" + taskName, process);
 
                     } catch (Throwable e) {
-                        logger.warn("Exception trying to acquire task:" + taskName
-                                + " This is warning and can be ignored. " + e);
-                        // Any exception does not means we failed to acquire
-                        // task because we might have acquired task but there
-                        // was ZK connection loss
-                        // We will check again in the next section if we created
-                        // the process node successfully
+                        if (e instanceof ZkNodeExistsException) {
+                            logger.trace("Task already created");
+                        } else {
+                            logger.debug("Exception trying to acquire task:" + taskName
+                                    + " This is warning and can be ignored. " + e);
+                            // Any exception does not means we failed to acquire
+                            // task because we might have acquired task but there
+                            // was ZK connection loss
+                            // We will check again in the next section if we created
+                            // the process node successfully
+                        }
                     }
                     // check if the process node is created and we own it
                     Stat stat = zkClient.getStat(processPath + "/" + taskName);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
index e4ebb4f..6813f3f 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
@@ -1,134 +1,13 @@
 package org.apache.s4.comm.topology;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.google.inject.name.Named;
-
 /**
- * 
- * The S4 physical cluster implementation.
+ * Represents a logical cluster
  * 
  */
-@Singleton
-public class Cluster {
-
-    // TODO: do we need a Cluster interface to represent different types of
-    // implementations?
-
-    private static final Logger logger = LoggerFactory.getLogger(Cluster.class);
-
-    List<ClusterNode> nodes = new ArrayList<ClusterNode>();
-    String mode = "unicast";
-    String name = "unknown";
-
-    final private String[] hosts;
-    final private String[] ports;
-    final private int numNodes;
-    private int numPartitions;
-    public Cluster(int numPartitions){
-        this.hosts = new String[] {};
-        this.ports = new String[] {};
-        this.numNodes = 0;
-        this.numPartitions = numPartitions;
-    }
-    /**
-     * Define the hosts and corresponding ports in the cluster.
-     * 
-     * @param hosts
-     *            a comma separates list of host names.
-     * @param ports
-     *            a comma separated list of ports.
-     * @throws IOException
-     *             if number of hosts and ports don't match.
-     */
-    @Inject
-    Cluster(@Named("cluster.hosts") String hosts,
-            @Named("cluster.ports") String ports) throws IOException {
-        if (hosts != null && hosts.length() > 0 && ports != null
-                && ports.length() > 0) {
-            this.ports = ports.split(",");
-            this.hosts = hosts.split(",");
-
-            if (this.ports.length != this.hosts.length) {
-                logger.error("Number of hosts should match number of ports in properties file. hosts: "
-                        + hosts + " ports: " + ports);
-                throw new IOException();
-            }
-
-            numNodes = this.hosts.length;
-            for (int i = 0; i < numNodes; i++) {
-                ClusterNode node = new ClusterNode(i,
-                        Integer.parseInt(this.ports[i]), this.hosts[i], "");
-                nodes.add(node);
-                logger.info("Added cluster node: " + this.hosts[i] + ":"
-                        + this.ports[i]);
-            }
-            numPartitions = numNodes;
-        } else {
-            this.hosts = new String[] {};
-            this.ports = new String[] {};
-            this.numNodes = 0;
-
-        }
-    }
-
-    /**
-     * Number of partitions in the cluster.
-     * @return
-     */
-    public int getPartitionCount() {
-        return numPartitions;
-    }
-
-    /**
-     * @param node
-     */
-    public void addNode(ClusterNode node) {
-        nodes.add(node);
-    }
-
-    /**
-     * @return a list of {@link ClusterNode} objects available in the cluster.
-     */
-    public List<ClusterNode> getNodes() {
-        return Collections.unmodifiableList(nodes);
-    }
-
-    // TODO: do we need mode and name? Making provate for now.
-
-    @SuppressWarnings("unused")
-    private String getMode() {
-        return mode;
-    }
-
-    @SuppressWarnings("unused")
-    private void setMode(String mode) {
-        this.mode = mode;
-    }
-
-    @SuppressWarnings("unused")
-    private String getName() {
-        return name;
-    }
-
-    @SuppressWarnings("unused")
-    private void setName(String name) {
-        this.name = name;
-    }
+public interface Cluster {
+    public PhysicalCluster getPhysicalCluster();
 
-    public String toString() {
-        StringBuffer sb = new StringBuffer();
-        sb.append("{name=").append(name).append(",mode=").append(mode)
-                .append(",type=").append(",nodes=").append(nodes).append("}");
-        return sb.toString();
-    }
+    public void addListener(ClusterChangeListener listener);
 
-}
\ No newline at end of file
+    public void removeListener(ClusterChangeListener listener);
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterChangeListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterChangeListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterChangeListener.java
new file mode 100644
index 0000000..e4452d1
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterChangeListener.java
@@ -0,0 +1,10 @@
+package org.apache.s4.comm.topology;
+
+/**
+ * Entities interested in changes occuring in topologies implement this listener and should register through the
+ * {@link Cluster} interface
+ * 
+ */
+public interface ClusterChangeListener {
+    public void onChange();
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
new file mode 100644
index 0000000..82e7a5e
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -0,0 +1,198 @@
+package org.apache.s4.comm.topology;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener, IZkStateListener {
+
+    private static Logger logger = LoggerFactory.getLogger(ClusterFromZK.class);
+
+    private final AtomicReference<PhysicalCluster> clusterRef;
+    private final List<ClusterChangeListener> listeners;
+    private KeeperState state;
+    private final ZkClient zkClient;
+    private final String taskPath;
+    private final String processPath;
+    private final Lock lock;
+    private AtomicBoolean currentlyOwningTask;
+    private String machineId;
+    private String clusterName;
+
+    private int connectionTimeout;
+
+    /**
+     * only the local topology
+     */
+    @Inject
+    public ClusterFromZK(@Named("cluster.name") String clusterName,
+            @Named("cluster.zk_address") String zookeeperAddress,
+            @Named("cluster.zk_session_timeout") int sessionTimeout,
+            @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+        this.connectionTimeout = connectionTimeout;
+        this.clusterName = clusterName;
+        this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
+        this.processPath = "/s4/clusters/" + clusterName + "/process";
+        lock = new ReentrantLock();
+        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
+        ZkSerializer serializer = new ZNRecordSerializer();
+        zkClient.setZkSerializer(serializer);
+        zkClient.subscribeStateChanges(this);
+        if (!zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS)) {
+            throw new Exception("cannot connect to zookeeper");
+        }
+        try {
+            machineId = InetAddress.getLocalHost().getCanonicalHostName();
+        } catch (UnknownHostException e) {
+            logger.warn("Unable to get hostname", e);
+            machineId = "UNKNOWN";
+        }
+        this.clusterRef = new AtomicReference<PhysicalCluster>();
+        this.listeners = new ArrayList<ClusterChangeListener>();
+        this.handleStateChanged(KeeperState.SyncConnected);
+        zkClient.subscribeChildChanges(taskPath, this);
+        zkClient.subscribeChildChanges(processPath, this);
+        // bug in zkClient, it does not invoke handleNewSession the first time
+        // it connects
+        this.handleNewSession();
+
+    }
+
+    /**
+     * any topology
+     */
+    public ClusterFromZK(String clusterName, ZkClient zkClient, String machineId) {
+
+        this.zkClient = zkClient;
+        this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
+        this.processPath = "/s4/clusters/" + clusterName + "/process";
+        this.clusterName = clusterName;
+        this.lock = new ReentrantLock();
+        this.machineId = machineId;
+        this.listeners = new ArrayList<ClusterChangeListener>();
+        this.clusterRef = new AtomicReference<PhysicalCluster>();
+        zkClient.subscribeChildChanges(taskPath, this);
+        zkClient.subscribeChildChanges(processPath, this);
+    }
+
+    @Override
+    public PhysicalCluster getPhysicalCluster() {
+        return clusterRef.get();
+    }
+
+    @Override
+    public void addListener(ClusterChangeListener listener) {
+        logger.info("Adding topology change listener:" + listener);
+        listeners.add(listener);
+    }
+
+    @Override
+    public void removeListener(ClusterChangeListener listener) {
+        logger.info("Removing topology change listener:" + listener);
+        listeners.remove(listener);
+    }
+
+    @Override
+    public void handleChildChange(String paramString, List<String> paramList) throws Exception {
+        doProcess();
+    }
+
+    void doProcess() {
+        lock.lock();
+        try {
+            refreshTopology();
+        } catch (Exception e) {
+            logger.error("", e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void refreshTopology() throws Exception {
+        List<String> processes = zkClient.getChildren(processPath);
+        List<String> tasks = zkClient.getChildren(taskPath);
+        PhysicalCluster cluster = new PhysicalCluster(tasks.size());
+        for (int i = 0; i < processes.size(); i++) {
+            cluster.setName(clusterName);
+            ZNRecord process = zkClient.readData(processPath + "/" + processes.get(i), true);
+            if (process != null) {
+                int partition = Integer.parseInt(process.getSimpleField("partition"));
+                String host = process.getSimpleField("host");
+                int port = Integer.parseInt(process.getSimpleField("port"));
+                String taskId = process.getSimpleField("taskId");
+                ClusterNode node = new ClusterNode(partition, port, host, taskId);
+                cluster.addNode(node);
+            }
+        }
+        logger.info("Changing cluster topology to " + cluster + " from " + clusterRef.get());
+        clusterRef.set(cluster);
+        // Notify all changeListeners about the topology change
+        for (ClusterChangeListener listener : listeners) {
+            listener.onChange();
+        }
+    }
+
+    @Override
+    public void handleDataChange(String dataPath, Object data) throws Exception {
+        doProcess();
+    }
+
+    @Override
+    public void handleDataDeleted(String dataPath) throws Exception {
+        doProcess();
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((clusterName == null) ? 0 : clusterName.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        ClusterFromZK other = (ClusterFromZK) obj;
+        if (clusterName == null) {
+            if (other.clusterName != null)
+                return false;
+        } else if (!clusterName.equals(other.clusterName))
+            return false;
+        return true;
+    }
+
+    @Override
+    public void handleStateChanged(KeeperState state) throws Exception {
+        // TODO we should reconnect only if we hold the zookeeper connection (i.e. this is the local cluster)
+    }
+
+    @Override
+    public void handleNewSession() throws Exception {
+        doProcess();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Clusters.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Clusters.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Clusters.java
new file mode 100644
index 0000000..aa8b78f
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Clusters.java
@@ -0,0 +1,12 @@
+package org.apache.s4.comm.topology;
+
+/**
+ * Represents clusters related to the current node (clusters to which this node belongs, and connected clusters that may
+ * receive messages from this node)
+ * 
+ */
+public interface Clusters {
+
+    Cluster getCluster(String clusterName);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
new file mode 100644
index 0000000..c5445d3
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
@@ -0,0 +1,103 @@
+package org.apache.s4.comm.topology;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Monitors all clusters
+ * 
+ */
+public class ClustersFromZK implements Clusters, IZkStateListener {
+    private static final Logger logger = LoggerFactory.getLogger(ClustersFromZK.class);
+    private KeeperState state;
+    private final ZkClient zkClient;
+    private final Lock lock;
+    private String machineId;
+    private Map<String, ClusterFromZK> clusters = new HashMap<String, ClusterFromZK>();
+    private int connectionTimeout;
+    private String clusterName;
+
+    @Inject
+    public ClustersFromZK(@Named("cluster.name") String clusterName,
+            @Named("cluster.zk_address") String zookeeperAddress,
+            @Named("cluster.zk_session_timeout") int sessionTimeout,
+            @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+        this.clusterName = clusterName;
+        this.connectionTimeout = connectionTimeout;
+        lock = new ReentrantLock();
+        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
+        ZkSerializer serializer = new ZNRecordSerializer();
+        zkClient.setZkSerializer(serializer);
+        zkClient.subscribeStateChanges(this);
+        zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
+        try {
+            machineId = InetAddress.getLocalHost().getCanonicalHostName();
+        } catch (UnknownHostException e) {
+            logger.warn("Unable to get hostname", e);
+            machineId = "UNKNOWN";
+        }
+        // bug in zkClient, it does not invoke handleNewSession the first time
+        // it connects
+        this.handleStateChanged(KeeperState.SyncConnected);
+
+        this.handleNewSession();
+
+    }
+
+    /**
+     * One method to do any processing if there is a change in ZK, all callbacks will be processed sequentially
+     */
+    private void doProcess() {
+        lock.lock();
+        try {
+            for (Map.Entry<String, ClusterFromZK> cluster : clusters.entrySet()) {
+                cluster.getValue().doProcess();
+            }
+        } catch (Exception e) {
+            logger.warn("Exception in tryToAcquireTask", e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void handleStateChanged(KeeperState state) throws Exception {
+        this.state = state;
+        if (!state.equals(KeeperState.SyncConnected)) {
+            logger.warn("Session not connected for cluster [{}]: [{}]. Trying to reconnect", clusterName, state.name());
+            zkClient.connect(connectionTimeout, null);
+            handleNewSession();
+        }
+    }
+
+    @Override
+    public void handleNewSession() throws Exception {
+        logger.info("New session:" + zkClient.getSessionId());
+        List<String> clusterNames = zkClient.getChildren("/s4/clusters");
+        for (String clusterName : clusterNames) {
+            ClusterFromZK cluster = new ClusterFromZK(clusterName, zkClient, machineId);
+            clusters.put(clusterName, cluster);
+        }
+        doProcess();
+    }
+
+    public Cluster getCluster(String clusterName) {
+        return clusters.get(clusterName);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
new file mode 100644
index 0000000..32348df
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
@@ -0,0 +1,125 @@
+package org.apache.s4.comm.topology;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * The S4 physical cluster implementation.
+ * 
+ */
+public class PhysicalCluster {
+
+    // TODO: do we need a Cluster interface to represent different types of
+    // implementations?
+
+    private static final Logger logger = LoggerFactory.getLogger(PhysicalCluster.class);
+
+    List<ClusterNode> nodes = new ArrayList<ClusterNode>();
+    String mode = "unicast";
+    String name = "unknown";
+
+    final private String[] hosts;
+    final private String[] ports;
+    final private int numNodes;
+    private int numPartitions;
+
+    public PhysicalCluster(int numPartitions) {
+        this.hosts = new String[] {};
+        this.ports = new String[] {};
+        this.numNodes = 0;
+        this.numPartitions = numPartitions;
+    }
+
+    /**
+     * Define the hosts and corresponding ports in the cluster.
+     * 
+     * @param hosts
+     *            a comma separates list of host names.
+     * @param ports
+     *            a comma separated list of ports.
+     * @throws IOException
+     *             if number of hosts and ports don't match.
+     */
+    PhysicalCluster(String hosts, String ports) throws IOException {
+        if (hosts != null && hosts.length() > 0 && ports != null && ports.length() > 0) {
+            this.ports = ports.split(",");
+            this.hosts = hosts.split(",");
+
+            if (this.ports.length != this.hosts.length) {
+                logger.error("Number of hosts should match number of ports in properties file. hosts: " + hosts
+                        + " ports: " + ports);
+                throw new IOException();
+            }
+
+            numNodes = this.hosts.length;
+            for (int i = 0; i < numNodes; i++) {
+                ClusterNode node = new ClusterNode(i, Integer.parseInt(this.ports[i]), this.hosts[i], "");
+                nodes.add(node);
+                logger.info("Added cluster node: " + this.hosts[i] + ":" + this.ports[i]);
+            }
+            numPartitions = numNodes;
+        } else {
+            this.hosts = new String[] {};
+            this.ports = new String[] {};
+            this.numNodes = 0;
+
+        }
+    }
+
+    /**
+     * Number of partitions in the cluster.
+     * 
+     * @return
+     */
+    public int getPartitionCount() {
+        return numPartitions;
+    }
+
+    /**
+     * @param node
+     */
+    public void addNode(ClusterNode node) {
+        nodes.add(node);
+    }
+
+    /**
+     * @return a list of {@link ClusterNode} objects available in the cluster.
+     */
+    public List<ClusterNode> getNodes() {
+        return Collections.unmodifiableList(nodes);
+    }
+
+    // TODO: do we need mode and name? Making provate for now.
+
+    @SuppressWarnings("unused")
+    private String getMode() {
+        return mode;
+    }
+
+    @SuppressWarnings("unused")
+    private void setMode(String mode) {
+        this.mode = mode;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("{ nbNodes=").append(nodes.size()).append(",name=").append(name).append(",mode=").append(mode)
+                .append(",type=").append(",nodes=").append(nodes).append("}");
+        return sb.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteCluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteCluster.java
new file mode 100644
index 0000000..2b9d866
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteCluster.java
@@ -0,0 +1,10 @@
+package org.apache.s4.comm.topology;
+
+/**
+ * 
+ * Represents a logical cluster external to the current cluster
+ * 
+ */
+public interface RemoteCluster extends Cluster {
+
+}