You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/05/24 18:19:26 UTC
[3/5] git commit: inter-app communications + refactorings
inter-app communications + refactorings
- applications publish streams they produce and streams they consume
- RemoteStream instances send messages to remote consumers, identified by (cluster, appId),
through multiple senders and emitters (1 per consumer cluster)
- added related tests
- removed file-based cluster configuration: cluster configuration is now
through ZooKeeper only
- renamed Topology to Cluster, and Cluster to PhysicalCluster
- improved tests, minimized the number of configuration files
- commented out code for supporting network glitches: it blocked the sending
of messages upon cluster disconnections
- migrated to gradle 1.0-rc-3 , so that we can embed build operations with parameters
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/99b6f048
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/99b6f048
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/99b6f048
Branch: refs/heads/S4-22
Commit: 99b6f04824681cb5a687bc5c3f49d53ccada500a
Parents: a63cda9
Author: Matthieu Morel <mm...@apache.org>
Authored: Sat May 19 20:15:47 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Sat May 19 20:15:47 2012 +0200
----------------------------------------------------------------------
build.gradle | 24 ++-
gradle/wrapper/gradle-wrapper.properties | 2 +-
gradlew | 108 ++++----
gradlew.bat | 52 ++--
subprojects/s4-comm/s4-comm.gradle | 4 +-
.../src/main/java/org/apache/s4/comm/Module.java | 111 --------
.../org/apache/s4/comm/tcp/RemoteEmitters.java | 34 +++
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 107 ++++----
.../java/org/apache/s4/comm/tcp/TCPListener.java | 44 ++--
.../org/apache/s4/comm/tcp/TCPRemoteEmitter.java | 13 +-
.../java/org/apache/s4/comm/tools/TaskSetup.java | 53 ++---
.../org/apache/s4/comm/topology/Assignment.java | 8 +-
.../s4/comm/topology/AssignmentFromFile.java | 127 ---------
.../apache/s4/comm/topology/AssignmentFromZK.java | 45 ++--
.../java/org/apache/s4/comm/topology/Cluster.java | 133 +---------
.../s4/comm/topology/ClusterChangeListener.java | 10 +
.../org/apache/s4/comm/topology/ClusterFromZK.java | 198 ++++++++++++++
.../java/org/apache/s4/comm/topology/Clusters.java | 12 +
.../apache/s4/comm/topology/ClustersFromZK.java | 103 +++++++
.../apache/s4/comm/topology/PhysicalCluster.java | 125 +++++++++
.../org/apache/s4/comm/topology/RemoteCluster.java | 10 +
.../org/apache/s4/comm/topology/RemoteStreams.java | 211 +++++++++++++++
.../apache/s4/comm/topology/RemoteTopology.java | 5 -
.../s4/comm/topology/RemoteTopologyFromZK.java | 16 --
.../apache/s4/comm/topology/StreamConsumer.java | 27 ++
.../java/org/apache/s4/comm/topology/Topology.java | 9 -
.../s4/comm/topology/TopologyChangeListener.java | 5 -
.../apache/s4/comm/topology/TopologyFromFile.java | 33 ---
.../apache/s4/comm/topology/TopologyFromZK.java | 144 ----------
.../java/org/apache/s4/comm/topology/ZNRecord.java | 17 ++-
.../java/org/apache/s4/comm/udp/UDPEmitter.java | 18 +-
.../java/org/apache/s4/comm/DeliveryTestUtil.java | 19 +-
.../java/org/apache/s4/comm/tcp/TCPCommTest.java | 5 +-
.../s4/comm/topology/AssignmentFromZKTest.java | 43 ---
.../s4/comm/topology/AssignmentsFromZKTest.java | 69 +++++
.../s4/comm/topology/ClustersFromZKTest.java | 97 +++++++
.../s4/comm/topology/TopologyFromZKTest.java | 78 ------
.../org/apache/s4/comm/topology/ZKBaseTest.java | 37 +--
.../java/org/apache/s4/comm/udp/UDPCommTest.java | 2 +-
.../java/org/apache/s4/fixtures/CommTestUtils.java | 4 +-
.../FileBasedClusterManagementTestModule.java | 77 ------
.../ZkBasedClusterManagementTestModule.java | 36 ++-
.../java/org/apache/s4/fixtures/ZkBasedTest.java | 47 +---
.../src/test/resources/default.s4.properties | 5 +-
.../src/test/resources/s4-comm-test.properties | 10 -
subprojects/s4-core/s4-core.gradle | 10 +-
.../src/main/java/org/apache/s4/core/App.java | 44 +++-
.../main/java/org/apache/s4/core/CustomModule.java | 94 -------
.../java/org/apache/s4/core/DefaultModule.java | 99 +++++++
.../src/main/java/org/apache/s4/core/Main.java | 5 +-
.../src/main/java/org/apache/s4/core/Receiver.java | 6 +-
.../main/java/org/apache/s4/core/RemoteSender.java | 28 ++-
.../java/org/apache/s4/core/RemoteSenders.java | 61 +++++
.../main/java/org/apache/s4/core/RemoteStream.java | 77 ++++++
.../src/main/java/org/apache/s4/core/Sender.java | 2 -
.../src/main/java/org/apache/s4/core/Server.java | 67 +----
.../src/main/java/org/apache/s4/core/Stream.java | 19 +--
.../java/org/apache/s4/core/adapter/Adapter.java | 26 --
.../org/apache/s4/core/adapter/AdapterMain.java | 53 ----
.../org/apache/s4/core/adapter/AdapterModule.java | 97 -------
.../org/apache/s4/core/adapter/RemoteStream.java | 60 ----
.../s4/deploy/DistributedDeploymentManager.java | 2 +-
.../test/java/org/apache/s4/core/TriggerTest.java | 18 +-
.../apache/s4/core/triggers/TriggeredModule.java | 6 -
.../apache/s4/deploy/TestAutomaticDeployment.java | 66 ++---
.../s4/deploy/prodcon/TestProducerConsumer.java | 30 ++-
.../java/org/apache/s4/fixtures/CoreTestUtils.java | 85 +++----
.../java/org/apache/s4/fixtures/SocketAdapter.java | 27 +-
.../org/apache/s4/fixtures/ZkBasedAppModule.java | 9 +-
.../org/apache/s4/wordcount/WordCountModule.java | 4 +-
.../org/apache/s4/wordcount/WordCountTest.java | 9 +
.../apache/s4/wordcount/zk/WordCountModuleZk.java | 7 -
.../apache/s4/wordcount/zk/WordCountTestZk.java | 46 ----
.../src/test/resources/default.s4.properties | 6 +-
.../resources/org.apache.s4.deploy.s4.properties | 11 -
.../test/resources/s4-counter-example.properties | 7 -
.../java/org/apache/s4/example/counter/Module.java | 81 +++----
.../apache/s4/example/fluent/counter/Module.java | 82 +++----
subprojects/s4-tools/s4-tools.gradle | 17 ++
.../java/org/apache/s4/tools/DefineCluster.java | 2 +-
.../src/main/java/org/apache/s4/tools/Deploy.java | 89 +++----
test-apps/s4-counter/build.gradle | 18 +-
test-apps/s4-showtime/build.gradle | 22 +-
test-apps/simple-deployable-app-1/build.gradle | 18 +-
test-apps/simple-deployable-app-2/build.gradle | 18 +-
test-apps/twitter-adapter/build.gradle | 8 +-
.../s4/example/twitter/TwitterInputAdapter.java | 13 +-
test-apps/twitter-counter/build.gradle | 5 +-
.../s4/example/twitter/TwitterCounterApp.java | 8 +-
89 files changed, 1866 insertions(+), 1933 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index fd75057..58ff560 100644
--- a/build.gradle
+++ b/build.gradle
@@ -73,7 +73,11 @@ libraries = [
zkclient: 'com.github.sgroschupf:zkclient:0.1',
diezel: 'net.ericaro:diezel-maven-plugin:1.0.0-beta-4',
jcommander: 'com.beust:jcommander:1.23',
- commons_io: 'commons-io:commons-io:2.1'
+ commons_io: 'commons-io:commons-io:2.1',
+ gradle_base_services: 'gradle-base-services:gradle-base-services:1.0-rc-3',
+ gradle_core: 'gradle-core:gradle-core:1.0-rc-3',
+ gradle_tooling_api: 'gradle-tooling-api:gradle-tooling-api:1.0-rc-3',
+ gradle_wrapper: 'gradle-wrapper:gradle-wrapper:1.0-rc-3'
]
subprojects {
@@ -94,6 +98,7 @@ subprojects {
/* Google. */
compile( libraries.guava )
compile( libraries.guice )
+ compile (libraries.guice_assist)
/* Logging. */
compile( libraries.slf4j )
@@ -109,6 +114,16 @@ subprojects {
compile( libraries.jcip )
compile( libraries.zk )
+// // added here since from 1.0 dependencies don't seem to
+// // be copied in through the application plugin (s4-tools)
+// compile libraries.jcommander
+// compile libraries.zkclient
+// compile libraries.commons_io
+// compile libraries.gradle_base_services
+// compile libraries.gradle_core
+// compile libraries.gradle_tooling_api
+// compile libraries.gradle_wrapper
+//
/* Testing. */
testCompile( libraries.junit )
}
@@ -124,7 +139,7 @@ subprojects {
dependsOnChildren()
-platformProjects = [project(':s4-base'), project(':s4-core'), project(':s4-comm')]
+platformProjects = [project(':s4-base'), project(':s4-core'), project(':s4-comm'), project(':s4-tools')]
configurations {
platformLibs
@@ -153,7 +168,7 @@ binDistImage = copySpec {
into ("platform/lib") {
from proj.sourceSets.main.resources
from proj.configurations.runtime.getFiles()
- from proj.configurations.archives.allArtifactFiles
+ from proj.configurations.archives.allArtifacts.files
}
}
}
@@ -177,7 +192,8 @@ task listJars << {
/* Generates the gradlew scripts.
http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
task wrapper(type: Wrapper) {
- gradleVersion = '1.0-milestone-3'
+ gradleVersion = '1.0-rc-3'
+ jarFile='lib/gradle-wrapper-1.0-rc-3.jar'
}
class Version {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/gradle/wrapper/gradle-wrapper.properties
----------------------------------------------------------------------
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 7515ec6..1d9b9c0 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
-distributionUrl=http\://repo.gradle.org/gradle/distributions/gradle-1.0-milestone-3-bin.zip
+distributionUrl=http\://services.gradle.org/distributions/gradle-1.0-rc-3-bin.zip
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/gradlew
----------------------------------------------------------------------
diff --git a/gradlew b/gradlew
index d8809f1..28e6f10 100755
--- a/gradlew
+++ b/gradlew
@@ -1,16 +1,16 @@
#!/bin/bash
##############################################################################
-## ##
-## Gradle wrapper script for UN*X ##
-## ##
+##
+## Gradle start up script for UN*X
+##
##############################################################################
-# Uncomment those lines to set JVM options. GRADLE_OPTS and JAVA_OPTS can be used together.
-# GRADLE_OPTS="$GRADLE_OPTS -Xmx512m"
-# JAVA_OPTS="$JAVA_OPTS -Xmx512m"
+# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+DEFAULT_JVM_OPTS=""
-GRADLE_APP_NAME=Gradle
+APP_NAME="Gradle"
+APP_BASE_NAME=`basename "$0"`
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
@@ -42,54 +42,51 @@ case "`uname`" in
;;
esac
-# Attempt to set JAVA_HOME if it's not already set.
-if [ -z "$JAVA_HOME" ] ; then
- if $darwin ; then
- [ -z "$JAVA_HOME" -a -d "/Library/Java/Home" ] && export JAVA_HOME="/Library/Java/Home"
- [ -z "$JAVA_HOME" -a -d "/System/Library/Frameworks/JavaVM.framework/Home" ] && export JAVA_HOME="/System/Library/Frameworks/JavaVM.framework/Home"
- else
- javaExecutable="`which javac`"
- [ -z "$javaExecutable" -o "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ] && die "JAVA_HOME not set and cannot find javac to deduce location, please set JAVA_HOME."
- # readlink(1) is not available as standard on Solaris 10.
- readLink=`which readlink`
- [ `expr "$readLink" : '\([^ ]*\)'` = "no" ] && die "JAVA_HOME not set and readlink not available, please set JAVA_HOME."
- javaExecutable="`readlink -f \"$javaExecutable\"`"
- javaHome="`dirname \"$javaExecutable\"`"
- javaHome=`expr "$javaHome" : '\(.*\)/bin'`
- export JAVA_HOME="$javaHome"
- fi
-fi
-
# For Cygwin, ensure paths are in UNIX format before anything is touched.
if $cygwin ; then
- [ -n "$JAVACMD" ] && JAVACMD=`cygpath --unix "$JAVACMD"`
[ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
fi
-STARTER_MAIN_CLASS=org.gradle.wrapper.GradleWrapperMain
-CLASSPATH=`dirname "$0"`/gradle/wrapper/gradle-wrapper.jar
-WRAPPER_PROPERTIES=`dirname "$0"`/gradle/wrapper/gradle-wrapper.properties
+# Attempt to set APP_HOME
+# Resolve links: $0 may be a link
+PRG="$0"
+# Need this for relative symlinks.
+while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "$PRG"`"/$link"
+ fi
+done
+SAVED="`pwd`"
+cd "`dirname \"$PRG\"`/"
+APP_HOME="`pwd -P`"
+cd "$SAVED"
+
+CLASSPATH=$APP_HOME/lib/gradle-wrapper-1.0-rc-3.jar
+
# Determine the Java command to use to start the JVM.
-if [ -z "$JAVACMD" ] ; then
- if [ -n "$JAVA_HOME" ] ; then
- if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
- # IBM's JDK on AIX uses strange locations for the executables
- JAVACMD="$JAVA_HOME/jre/sh/java"
- else
- JAVACMD="$JAVA_HOME/bin/java"
- fi
+if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
else
- JAVACMD="java"
+ JAVACMD="$JAVA_HOME/bin/java"
fi
-fi
-if [ ! -x "$JAVACMD" ] ; then
- die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
+ if [ ! -x "$JAVACMD" ] ; then
+ die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+ fi
+else
+ JAVACMD="java"
+ which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
-fi
-if [ -z "$JAVA_HOME" ] ; then
- warn "JAVA_HOME environment variable is not set"
fi
# Increase the maximum file descriptors if we can.
@@ -108,15 +105,14 @@ if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then
fi
fi
-# For Darwin, add GRADLE_APP_NAME to the JAVA_OPTS as -Xdock:name
+# For Darwin, add options to specify how the application appears in the dock
if $darwin; then
- JAVA_OPTS="$JAVA_OPTS -Xdock:name=$GRADLE_APP_NAME"
-# we may also want to set -Xdock:image
+ GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi
# For Cygwin, switch paths to Windows format before running java
if $cygwin ; then
- JAVA_HOME=`cygpath --path --mixed "$JAVA_HOME"`
+ APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
# We build the pattern for arguments to be converted via cygpath
@@ -143,7 +139,7 @@ if $cygwin ; then
eval `echo args$i`="\"$arg\""
fi
i=$((i+1))
- done
+ done
case $i in
(0) set -- ;;
(1) set -- "$args0" ;;
@@ -158,11 +154,11 @@ if $cygwin ; then
esac
fi
-GRADLE_APP_BASE_NAME=`basename "$0"`
+# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules
+function splitJvmOpts() {
+ JVM_OPTS=("$@")
+}
+eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
+JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
-exec "$JAVACMD" $JAVA_OPTS $GRADLE_OPTS \
- -classpath "$CLASSPATH" \
- -Dorg.gradle.appname="$GRADLE_APP_BASE_NAME" \
- -Dorg.gradle.wrapper.properties="$WRAPPER_PROPERTIES" \
- $STARTER_MAIN_CLASS \
- "$@"
+exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/gradlew.bat
----------------------------------------------------------------------
diff --git a/gradlew.bat b/gradlew.bat
index 4855abb..d410d61 100644
--- a/gradlew.bat
+++ b/gradlew.bat
@@ -1,24 +1,37 @@
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
-@rem ##
-@rem Gradle startup script for Windows ##
-@rem ##
+@rem
+@rem Gradle startup script for Windows
+@rem
@rem ##########################################################################
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
-@rem Uncomment those lines to set JVM options. GRADLE_OPTS and JAVA_OPTS can be used together.
-@rem set GRADLE_OPTS=%GRADLE_OPTS% -Xmx512m
-@rem set JAVA_OPTS=%JAVA_OPTS% -Xmx512m
+@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+set DEFAULT_JVM_OPTS=
set DIRNAME=%~dp0
-if "%DIRNAME%" == "" set DIRNAME=.\
+if "%DIRNAME%" == "" set DIRNAME=.
+set APP_BASE_NAME=%~n0
+set APP_HOME=%DIRNAME%
@rem Find java.exe
+if defined JAVA_HOME goto findJavaFromJavaHome
+
set JAVA_EXE=java.exe
-if not defined JAVA_HOME goto init
+%JAVA_EXE% -version >NUL 2>&1
+if "%ERRORLEVEL%" == "0" goto init
+
+echo.
+echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+echo.
+echo Please set the JAVA_HOME variable in your environment to match the
+echo location of your Java installation.
+goto fail
+
+:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
@@ -29,14 +42,14 @@ echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
-echo.
-goto end
+
+goto fail
:init
@rem Get command-line arguments, handling Windowz variants
if not "%OS%" == "Windows_NT" goto win9xME_args
-if "%eval[2+2]" == "4" goto 4NT_args
+if "%@eval[2+2]" == "4" goto 4NT_args
:win9xME_args
@rem Slurp the command line arguments.
@@ -56,27 +69,22 @@ set CMD_LINE_ARGS=%$
:execute
@rem Setup the command line
-set STARTER_MAIN_CLASS=org.gradle.wrapper.GradleWrapperMain
-set CLASSPATH=%DIRNAME%\gradle\wrapper\gradle-wrapper.jar
-set WRAPPER_PROPERTIES=%DIRNAME%\gradle\wrapper\gradle-wrapper.properties
-
-set GRADLE_OPTS=%JAVA_OPTS% %GRADLE_OPTS% -Dorg.gradle.wrapper.properties="%WRAPPER_PROPERTIES%"
+set CLASSPATH=%APP_HOME%\lib\gradle-wrapper-1.0-rc-3.jar
@rem Execute Gradle
-"%JAVA_EXE%" %GRADLE_OPTS% -classpath "%CLASSPATH%" %STARTER_MAIN_CLASS% %CMD_LINE_ARGS%
+"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
:end
@rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd
-if not "%OS%"=="Windows_NT" echo 1 > nul | choice /n /c:1
-
+:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
-if not "" == "%GRADLE_EXIT_CONSOLE%" exit "%ERRORLEVEL%"
-exit /b "%ERRORLEVEL%"
+if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
+exit /b 1
:mainEnd
if "%OS%"=="Windows_NT" endlocal
-:omega
\ No newline at end of file
+:omega
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/s4-comm.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/s4-comm.gradle b/subprojects/s4-comm/s4-comm.gradle
index b88239c..88abefd 100644
--- a/subprojects/s4-comm/s4-comm.gradle
+++ b/subprojects/s4-comm/s4-comm.gradle
@@ -15,7 +15,7 @@
*/
description = 'Implementation-specific components of the communication layer.'
-
+
dependencies {
compile project(":s4-base")
compile libraries.json
@@ -27,7 +27,7 @@ dependencies {
task testJar(type: Jar) {
baseName = "test-${project.archivesBaseName}"
- from sourceSets.test.classes
+ from sourceSets.test.output
}
configurations {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/Module.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/Module.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/Module.java
deleted file mode 100644
index 36b33fc..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/Module.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.comm;
-
-
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-/**
- * Default comm layer module. To change the module, modify property
- * <tt>comm.module</tt> in <tt>/s4-comm.properties</tt>
- *
- * @author Leo Neumeyer
- */
-public class Module extends AbstractModule {
-
- protected PropertiesConfiguration config = null;
-
- private void loadProperties(Binder binder) {
-
- try {
- InputStream is = this.getClass().getResourceAsStream(
- "/s4-comm.properties");
- config = new PropertiesConfiguration();
- config.load(is);
-
- System.out.println(ConfigurationUtils.toString(config));
- // TODO - validate properties.
-
- /* Make all properties injectable. Do we need this? */
- Names.bindProperties(binder,
- ConfigurationConverter.getProperties(config));
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
-
- @Override
- protected void configure() {
- if (config == null)
- loadProperties(binder());
-
- int numHosts = config.getList("cluster.hosts").size();
- boolean isCluster = numHosts > 1 ? true : false;
- bind(Boolean.class).annotatedWith(Names.named("isCluster"))
- .toInstance(Boolean.valueOf(isCluster));
-
- bind(Cluster.class);
-
- /* Configure static assignment using a configuration file. */
- bind(Assignment.class).to(AssignmentFromFile.class);
-
- /* Configure a static cluster topology using a configuration file. */
- bind(Topology.class).to(TopologyFromFile.class);
-
- // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
- // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
- //
- // bind(Emitter.class).to(QueueingEmitter.class);
- // bind(Listener.class).to(QueueingListener.class);
-
- /* Use the Netty comm layer implementation. */
- // bind(Emitter.class).to(NettyEmitter.class);
- // bind(Listener.class).to(NettyListener.class);
-
- /* Use a simple UDP comm layer implementation. */
- bind(Emitter.class).to(UDPEmitter.class);
- bind(Listener.class).to(UDPListener.class);
-
- /* The hashing function to map keys top partitions. */
- bind(Hasher.class).to(DefaultHasher.class);
-
- /* Use Kryo to serialize events. */
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
new file mode 100644
index 0000000..7623d52
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
@@ -0,0 +1,34 @@
+package org.apache.s4.comm.tcp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.comm.RemoteEmitterFactory;
+import org.apache.s4.comm.topology.Cluster;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * Manages the {@link RemoteEmitter} instances for sending messages to remote subclusters.
+ *
+ */
+@Singleton
+public class RemoteEmitters {
+
+ Map<Cluster, RemoteEmitter> emitters = new HashMap<Cluster, RemoteEmitter>();
+
+ @Inject
+ RemoteEmitterFactory emitterFactory;
+
+ public RemoteEmitter getEmitter(Cluster topology) {
+ RemoteEmitter emitter = emitters.get(topology);
+ if (emitter == null) {
+ emitter = emitterFactory.createRemoteEmitter(topology);
+ emitters.put(topology, emitter);
+ }
+ return emitter;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 90b4686..623172a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -11,8 +11,8 @@ import org.apache.s4.base.Emitter;
import org.apache.s4.base.EventMessage;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.topology.ClusterNode;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyChangeListener;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.ClusterChangeListener;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
@@ -35,12 +35,17 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.HashBiMap;
import com.google.inject.Inject;
-public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChangeListener {
+/**
+ *
+ * Sends messages through TCP, to the associated subcluster.
+ *
+ */
+public class TCPEmitter implements Emitter, ChannelFutureListener, ClusterChangeListener {
private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
private static final int BUFFER_SIZE = 10;
private static final int NUM_RETRIES = 10;
- private Topology topology;
+ private Cluster topology;
private final ClientBootstrap bootstrap;
@Inject
@@ -100,10 +105,10 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
private MessageQueuesPerPartition queuedMessages = new MessageQueuesPerPartition(true);
@Inject
- public TCPEmitter(Topology topology) throws InterruptedException {
+ public TCPEmitter(Cluster topology) throws InterruptedException {
this.topology = topology;
topology.addListener(this);
- int clusterSize = this.topology.getTopology().getNodes().size();
+ int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
partitionChannelMap = HashBiMap.create(clusterSize);
partitionNodeMap = HashBiMap.create(clusterSize);
@@ -117,8 +122,8 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
@Override
public ChannelPipeline getPipeline() {
ChannelPipeline p = Channels.pipeline();
- p.addLast("1", new LengthFieldPrepender(4));
- p.addLast("2", new TestHandler());
+ p.addLast("Framing", new LengthFieldPrepender(4));
+ p.addLast("ChannelStateMonitor", new ChannelStateMonitoringHandler());
return p;
}
});
@@ -131,11 +136,11 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
ClusterNode clusterNode = partitionNodeMap.get(partitionId);
if (clusterNode == null) {
- if (topology.getTopology().getNodes().size() == 0) {
+ if (topology.getPhysicalCluster().getNodes().size() == 0) {
logger.error("No node in cluster ");
return false;
}
- clusterNode = topology.getTopology().getNodes().get(partitionId);
+ clusterNode = topology.getPhysicalCluster().getNodes().get(partitionId);
partitionNodeMap.forcePut(partitionId, clusterNode);
}
@@ -184,37 +189,44 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
}
}
- /*
- * Try limiting the size of the send queue inside Netty
- */
- if (!channel.isWritable()) {
- synchronized (sendLock) {
- // check again now that we have the lock
- while (!channel.isWritable()) {
- try {
- sendLock.wait();
- } catch (InterruptedException ie) {
- return false;
- }
- }
- }
- }
-
- /*
- * Channel is available. Write messages in the following order: (1) Messages already on wire, (2) Previously
- * buffered messages, and (3) the Current Message
- *
- * Once the channel returns success delete from the messagesOnTheWire
- */
- byte[] messageBeingSent = null;
- // while ((messageBeingSent = messagesOnTheWire.peek(partitionId)) != null) {
- // writeMessageToChannel(channel, partitionId, messageBeingSent, false);
+ // /*
+ // * Try limiting the size of the send queue inside Netty
+
+ // FIXME this does not work: in case of a disconnection, the channel remains non writeable, and the lock is
+ // never released, hence blocking.
+ // should be fixed using: 1/ configurable timeouts (should drop pending messages after the timeout) 2/ make sure
+ // the handler is correctly responding to disconnections/reconnections 3/ there should be a lock per channel,
+ // no?
+
+ // */
+ // if (!channel.isWritable()) {
+ // synchronized (sendLock) {
+ // // check again now that we have the lock
+ // while (!channel.isWritable()) {
+ // try {
+ // sendLock.wait();
+ // } catch (InterruptedException ie) {
+ // return false;
+ // }
+ // }
+ // }
// }
- while ((messageBeingSent = queuedMessages.peek(partitionId)) != null) {
- writeMessageToChannel(channel, partitionId, messageBeingSent);
- queuedMessages.remove(partitionId);
- }
+ // /*
+ // * Channel is available. Write messages in the following order: (1) Messages already on wire, (2) Previously
+ // * buffered messages, and (3) the Current Message
+ // *
+ // * Once the channel returns success delete from the messagesOnTheWire
+ // */
+ // byte[] messageBeingSent = null;
+ // // while ((messageBeingSent = messagesOnTheWire.peek(partitionId)) != null) {
+ // // writeMessageToChannel(channel, partitionId, messageBeingSent, false);
+ // // }
+ //
+ // while ((messageBeingSent = queuedMessages.peek(partitionId)) != null) {
+ // writeMessageToChannel(channel, partitionId, messageBeingSent);
+ // queuedMessages.remove(partitionId);
+ // }
writeMessageToChannel(channel, partitionId, serDeser.serialize(message));
return true;
@@ -242,7 +254,7 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
/*
* Close the channels that correspond to changed partitions and update partitionNodeMap
*/
- for (ClusterNode clusterNode : topology.getTopology().getNodes()) {
+ for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
Integer partition = clusterNode.getPartition();
ClusterNode oldNode = partitionNodeMap.get(partition);
@@ -259,19 +271,20 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
@Override
public int getPartitionCount() {
// Number of nodes is not same as number of partitions
- return topology.getTopology().getPartitionCount();
+ return topology.getPhysicalCluster().getPartitionCount();
}
- class TestHandler extends SimpleChannelHandler {
+ class ChannelStateMonitoringHandler extends SimpleChannelHandler {
@Override
public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
// logger.info(String.format("%08x %08x %08x", e.getValue(),
// e.getChannel().getInterestOps(), Channel.OP_WRITE));
- synchronized (sendLock) {
- if (e.getChannel().isWritable()) {
- sendLock.notify();
- }
- }
+ // FIXME see above comments about fixing the buffering of messages
+ // synchronized (sendLock) {
+ // if (e.getChannel().isWritable()) {
+ // sendLock.notify();
+ // }
+ // }
ctx.sendUpstream(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index b3776e1..58348af 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -14,6 +14,7 @@ import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
@@ -25,21 +26,22 @@ import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
-
+/**
+ * Receives messages through TCP for the assigned subcluster.
+ *
+ */
public class TCPListener implements Listener {
private BlockingQueue<byte[]> handoffQueue = new SynchronousQueue<byte[]>();
private ClusterNode node;
private static final Logger logger = LoggerFactory.getLogger(TCPListener.class);
-
+
@Inject
public TCPListener(Assignment assignment) {
// wait for an assignment
node = assignment.assignClusterNode();
-
- ChannelFactory factory =
- new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool());
+
+ ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool());
ServerBootstrap bootstrap = new ServerBootstrap(factory);
@@ -48,38 +50,37 @@ public class TCPListener implements Listener {
ChannelPipeline p = Channels.pipeline();
p.addLast("1", new LengthFieldBasedFrameDecoder(999999, 0, 4, 0, 4));
p.addLast("2", new ChannelHandler(handoffQueue));
-
+
return p;
}
});
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
-
+
bootstrap.bind(new InetSocketAddress(node.getPort()));
}
-
+
public byte[] recv() {
try {
return handoffQueue.take();
} catch (InterruptedException e) {
- return null;
+ return null;
}
}
-
+
public int getPartitionId() {
return node.getPartition();
}
-
+
public class ChannelHandler extends SimpleChannelHandler {
private BlockingQueue<byte[]> handoffQueue;
-
+
public ChannelHandler(BlockingQueue<byte[]> handOffQueue) {
this.handoffQueue = handOffQueue;
}
-
- public void messageReceived(ChannelHandlerContext ctx,
- MessageEvent e) {
+
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
try {
handoffQueue.put(buffer.array()); // this holds up the Netty upstream I/O thread if
@@ -88,7 +89,7 @@ public class TCPListener implements Listener {
Thread.currentThread().interrupt();
}
}
-
+
public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
logger.error("Error", event.getCause());
if (context.getChannel().isOpen()) {
@@ -96,5 +97,12 @@ public class TCPListener implements Listener {
context.getChannel().close();
}
}
+
+ @Override
+ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ // TODO Auto-generated method stub
+ super.channelClosed(ctx, e);
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
index 0fa663f..7dfa2c8 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
@@ -1,14 +1,23 @@
package org.apache.s4.comm.tcp;
import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.comm.topology.RemoteTopology;
+import org.apache.s4.comm.topology.Cluster;
import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+/**
+ * Emitter to remote subclusters.
+ *
+ */
public class TCPRemoteEmitter extends TCPEmitter implements RemoteEmitter {
+ /**
+ * Sends to remote subclusters. This is dynamically created, through an injected factory, when new subclusters are
+ * discovered (as remote streams outputs)
+ */
@Inject
- public TCPRemoteEmitter(RemoteTopology topology) throws InterruptedException {
+ public TCPRemoteEmitter(@Assisted Cluster topology) throws InterruptedException {
super(topology);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
index 0466993..0458c70 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
@@ -1,9 +1,8 @@
package org.apache.s4.comm.tools;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.ClusterNode;
-import org.apache.s4.comm.topology.TopologyFromZK;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.exception.ZkException;
import org.apache.s4.comm.topology.ZNRecord;
import org.apache.s4.comm.topology.ZNRecordSerializer;
import org.apache.s4.comm.topology.ZkClient;
@@ -15,46 +14,38 @@ public class TaskSetup {
public TaskSetup(String zookeeperAddress) {
zkclient = new ZkClient(zookeeperAddress);
zkclient.setZkSerializer(new ZNRecordSerializer());
- zkclient.waitUntilConnected();
+ if (!zkclient.waitUntilConnected(10, TimeUnit.SECONDS)) {
+ throw new RuntimeException("Could not connect to ZooKeeper after 10 seconds.");
+ }
+ }
+
+ public void clean(String clusterName, String topologyName) {
+ zkclient.deleteRecursive("/s4/clusters/" + topologyName);
}
public void clean(String clusterName) {
zkclient.deleteRecursive("/" + clusterName);
}
- public void setup(String clusterName, int tasks, int initialPort) {
- zkclient.createPersistent("/" + clusterName + "/tasks", true);
- zkclient.createPersistent("/" + clusterName + "/process", true);
- zkclient.createPersistent("/" + clusterName + "/apps", true);
+ public void setup(String cluster, int tasks, int initialPort) {
+ try {
+ zkclient.createPersistent("/s4/streams", true);
+ } catch (ZkException ignored) {
+ // ignore if exists
+ }
+
+ zkclient.createPersistent("/s4/clusters/" + cluster + "/tasks", true);
+ zkclient.createPersistent("/s4/clusters/" + cluster + "/process", true);
+ zkclient.createPersistent("/s4/clusters/" + cluster + "/apps", true);
for (int i = 0; i < tasks; i++) {
String taskId = "Task-" + i;
ZNRecord record = new ZNRecord(taskId);
record.putSimpleField("taskId", taskId);
record.putSimpleField("port", String.valueOf(initialPort + i));
record.putSimpleField("partition", String.valueOf(i));
- record.putSimpleField("cluster", clusterName);
- zkclient.createPersistent("/" + clusterName + "/tasks/" + taskId,
- record);
+ record.putSimpleField("cluster", cluster);
+ zkclient.createPersistent("/s4/clusters/" + cluster + "/tasks/" + taskId, record);
}
}
- public static void main(String[] args) throws Exception {
- TaskSetup taskSetup = new TaskSetup("localhost:2181");
- String clusterName = "test-s4-cluster";
- taskSetup.clean(clusterName);
- taskSetup.setup(clusterName, 10, 1300);
- String zookeeperAddress = "localhost:2181";
- for (int i = 0; i < 10; i++) {
- AssignmentFromZK assignmentFromZK = new AssignmentFromZK(
- clusterName, zookeeperAddress, 30000, 30000);
- ClusterNode assignClusterNode = assignmentFromZK
- .assignClusterNode();
- System.out.println(i+"-->"+assignClusterNode);
- }
- TopologyFromZK topologyFromZK=new TopologyFromZK(clusterName, zookeeperAddress, 30000, 30000);
- Thread.sleep(3000);
- Cluster topology = topologyFromZK.getTopology();
- System.out.println(topology.getNodes().size());
- Thread.currentThread().join();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Assignment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Assignment.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Assignment.java
index fc12549..0cac238 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Assignment.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Assignment.java
@@ -2,12 +2,10 @@ package org.apache.s4.comm.topology;
/**
*
- * Upon startup an S4 process in a cluster must be assigned one and only one of
- * the available cluster nodes. Cluster nodes ({@link ClusterNode}) are defined
- * using a configuration mechanism at startup.
+ * Upon startup an S4 process in a cluster must be assigned one and only one of the available cluster nodes. Cluster
+ * nodes ({@link ClusterNode}) are defined using a configuration mechanism at startup.
*
- * The Assignment implementation is responsible for coordinating how cluster
- * nodes are uniquely assigned to processes.
+ * The Assignment implementation is responsible for coordinating how cluster nodes are uniquely assigned to processes.
*
*/
public interface Assignment {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromFile.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromFile.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromFile.java
deleted file mode 100644
index d82a818..0000000
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromFile.java
+++ /dev/null
@@ -1,127 +0,0 @@
-package org.apache.s4.comm.topology;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.net.InetAddress;
-import java.nio.channels.FileLock;
-
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.ClusterNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
-/**
- * Implements the assignment interface {@link Assignment} using a file lock.
- *
- */
-public class AssignmentFromFile implements Assignment {
- private static final Logger logger = LoggerFactory.getLogger(AssignmentFromFile.class);
- final private Cluster cluster;
- final private String lockDir;
-
- @Inject
- public AssignmentFromFile(Cluster cluster, @Named("cluster.lock_dir") String lockDir) {
- this.cluster = cluster;
- this.lockDir = lockDir;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.s4.comm.topology.Assignment#assignClusterNode()
- */
- @Override
- public ClusterNode assignClusterNode() {
- while (true) {
- try {
- for (ClusterNode node : cluster.getNodes()) {
- boolean partitionAvailable = canTakeupProcess(node);
- logger.info("Partition available: " + partitionAvailable);
- if (partitionAvailable) {
- boolean success = takeProcess(node);
- logger.info("Acquire partition:" + ((success) ? "success." : "failure."));
- if (success) {
- return node;
- }
- }
- }
- Thread.sleep(5000);
-
- } catch (Exception e) {
- logger.error("Exception in assignPartition Method:", e);
- }
- }
- }
-
- private boolean takeProcess(ClusterNode node) {
- File lockFile = null;
- try {
- // TODO:contruct from processConfig
- String lockFileName = createLockFileName(node);
- lockFile = new File(lockFileName);
- if (!lockFile.exists()) {
- FileOutputStream fos = new FileOutputStream(lockFile);
- FileLock fl = fos.getChannel().tryLock();
- if (fl != null) {
- String message = "Partition acquired by PID:" + getPID() + " HOST:"
- + InetAddress.getLocalHost().getHostName();
- fos.write(message.getBytes());
- fos.close();
- logger.info(message + " Lock File location: " + lockFile.getAbsolutePath());
- return true;
- }
- }
- } catch (Exception e) {
- logger.error("Exception trying to take up partition:" + node.getPartition(), e);
- } finally {
- if (lockFile != null) {
- lockFile.deleteOnExit();
- }
- }
- return false;
- }
-
- private String createLockFileName(ClusterNode node) {
- String lockFileName = "s4-" + node.getPartition() + ".lock";
- if (lockDir != null && lockDir.trim().length() > 0) {
- File file = new File(lockDir);
- if (!file.exists()) {
- file.mkdirs();
- }
- return lockDir + "/" + lockFileName;
- } else {
- return lockFileName;
- }
- }
-
- private boolean canTakeupProcess(ClusterNode node) {
- try {
- InetAddress inetAddress = InetAddress.getByName(node.getMachineName());
- logger.info("Host Name: " + InetAddress.getLocalHost().getCanonicalHostName());
- if (!node.getMachineName().equals("localhost")) {
- if (!InetAddress.getLocalHost().equals(inetAddress)) {
- return false;
- }
- }
- } catch (Exception e) {
- logger.error("Invalid host:" + node.getMachineName());
- return false;
- }
- String lockFileName = createLockFileName(node);
- File lockFile = new File(lockFileName);
- if (!lockFile.exists()) {
- return true;
- } else {
- logger.info("Partition taken up by another process lockFile:" + lockFileName);
- }
- return false;
- }
-
- private long getPID() {
- String processName = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
- return Long.parseLong(processName.split("@")[0]);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index 2cd3dbd..d172d73 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -13,6 +13,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
@@ -23,11 +24,7 @@ import com.google.inject.Inject;
import com.google.inject.name.Named;
public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateListener, IZkDataListener {
- private static final Logger logger = LoggerFactory.getLogger(TopologyFromZK.class);
- /*
- * Name of the cluster
- */
- private final String clusterName;
+ private static final Logger logger = LoggerFactory.getLogger(AssignmentFromZK.class);
/**
* Current state of connection with ZK
*/
@@ -64,15 +61,19 @@ public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateL
* Holds the reference to ClusterNode which points to the current partition owned
*/
AtomicReference<ClusterNode> clusterNodeRef;
+ private int connectionTimeout;
+ private String clusterName;
+ // TODO we currently have a single assignment per node (i.e. a node can only belong to 1 topology)
@Inject
public AssignmentFromZK(@Named("cluster.name") String clusterName,
@Named("cluster.zk_address") String zookeeperAddress,
@Named("cluster.zk_session_timeout") int sessionTimeout,
@Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
this.clusterName = clusterName;
- taskPath = "/" + clusterName + "/" + "tasks";
- processPath = "/" + clusterName + "/" + "process";
+ this.connectionTimeout = connectionTimeout;
+ taskPath = "/s4/clusters/" + clusterName + "/tasks";
+ processPath = "/s4/clusters/" + clusterName + "/process";
lock = new ReentrantLock();
clusterNodeRef = new AtomicReference<ClusterNode>();
taskAcquired = lock.newCondition();
@@ -89,7 +90,9 @@ public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateL
ZkSerializer serializer = new ZNRecordSerializer();
zkClient.setZkSerializer(serializer);
zkClient.subscribeStateChanges(this);
- zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
+ if (!zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS)) {
+ throw new Exception("cannot connect to zookeeper");
+ }
// bug in zkClient, it does not invoke handleNewSession the first time
// it connects
this.handleStateChanged(KeeperState.SyncConnected);
@@ -109,11 +112,17 @@ public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateL
@Override
public void handleStateChanged(KeeperState state) throws Exception {
this.state = state;
+ if (!state.equals(KeeperState.SyncConnected)) {
+ logger.warn("Session not connected for cluster [{}]: [{}]. Trying to reconnect", clusterName, state.name());
+ zkClient.close();
+ zkClient.connect(connectionTimeout, null);
+ handleNewSession();
+ }
}
@Override
public void handleNewSession() {
- logger.info("New session:" + zkClient.getSessionId());
+ logger.info("New session:" + zkClient.getSessionId() + "; state is : " + state.name());
currentlyOwningTask.set(false);
zkClient.subscribeChildChanges(taskPath, this);
zkClient.subscribeChildChanges(processPath, this);
@@ -174,13 +183,17 @@ public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateL
zkClient.createEphemeral(processPath + "/" + taskName, process);
} catch (Throwable e) {
- logger.warn("Exception trying to acquire task:" + taskName
- + " This is warning and can be ignored. " + e);
- // Any exception does not means we failed to acquire
- // task because we might have acquired task but there
- // was ZK connection loss
- // We will check again in the next section if we created
- // the process node successfully
+ if (e instanceof ZkNodeExistsException) {
+ logger.trace("Task already created");
+ } else {
+ logger.debug("Exception trying to acquire task:" + taskName
+ + " This is warning and can be ignored. " + e);
+ // Any exception does not means we failed to acquire
+ // task because we might have acquired task but there
+ // was ZK connection loss
+ // We will check again in the next section if we created
+ // the process node successfully
+ }
}
// check if the process node is created and we own it
Stat stat = zkClient.getStat(processPath + "/" + taskName);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
index e4ebb4f..6813f3f 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Cluster.java
@@ -1,134 +1,13 @@
package org.apache.s4.comm.topology;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.google.inject.name.Named;
-
/**
- *
- * The S4 physical cluster implementation.
+ * Represents a logical cluster
*
*/
-@Singleton
-public class Cluster {
-
- // TODO: do we need a Cluster interface to represent different types of
- // implementations?
-
- private static final Logger logger = LoggerFactory.getLogger(Cluster.class);
-
- List<ClusterNode> nodes = new ArrayList<ClusterNode>();
- String mode = "unicast";
- String name = "unknown";
-
- final private String[] hosts;
- final private String[] ports;
- final private int numNodes;
- private int numPartitions;
- public Cluster(int numPartitions){
- this.hosts = new String[] {};
- this.ports = new String[] {};
- this.numNodes = 0;
- this.numPartitions = numPartitions;
- }
- /**
- * Define the hosts and corresponding ports in the cluster.
- *
- * @param hosts
- * a comma separates list of host names.
- * @param ports
- * a comma separated list of ports.
- * @throws IOException
- * if number of hosts and ports don't match.
- */
- @Inject
- Cluster(@Named("cluster.hosts") String hosts,
- @Named("cluster.ports") String ports) throws IOException {
- if (hosts != null && hosts.length() > 0 && ports != null
- && ports.length() > 0) {
- this.ports = ports.split(",");
- this.hosts = hosts.split(",");
-
- if (this.ports.length != this.hosts.length) {
- logger.error("Number of hosts should match number of ports in properties file. hosts: "
- + hosts + " ports: " + ports);
- throw new IOException();
- }
-
- numNodes = this.hosts.length;
- for (int i = 0; i < numNodes; i++) {
- ClusterNode node = new ClusterNode(i,
- Integer.parseInt(this.ports[i]), this.hosts[i], "");
- nodes.add(node);
- logger.info("Added cluster node: " + this.hosts[i] + ":"
- + this.ports[i]);
- }
- numPartitions = numNodes;
- } else {
- this.hosts = new String[] {};
- this.ports = new String[] {};
- this.numNodes = 0;
-
- }
- }
-
- /**
- * Number of partitions in the cluster.
- * @return
- */
- public int getPartitionCount() {
- return numPartitions;
- }
-
- /**
- * @param node
- */
- public void addNode(ClusterNode node) {
- nodes.add(node);
- }
-
- /**
- * @return a list of {@link ClusterNode} objects available in the cluster.
- */
- public List<ClusterNode> getNodes() {
- return Collections.unmodifiableList(nodes);
- }
-
- // TODO: do we need mode and name? Making provate for now.
-
- @SuppressWarnings("unused")
- private String getMode() {
- return mode;
- }
-
- @SuppressWarnings("unused")
- private void setMode(String mode) {
- this.mode = mode;
- }
-
- @SuppressWarnings("unused")
- private String getName() {
- return name;
- }
-
- @SuppressWarnings("unused")
- private void setName(String name) {
- this.name = name;
- }
+public interface Cluster {
+ public PhysicalCluster getPhysicalCluster();
- public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append("{name=").append(name).append(",mode=").append(mode)
- .append(",type=").append(",nodes=").append(nodes).append("}");
- return sb.toString();
- }
+ public void addListener(ClusterChangeListener listener);
-}
\ No newline at end of file
+ public void removeListener(ClusterChangeListener listener);
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterChangeListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterChangeListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterChangeListener.java
new file mode 100644
index 0000000..e4452d1
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterChangeListener.java
@@ -0,0 +1,10 @@
+package org.apache.s4.comm.topology;
+
+/**
+ * Entities interested in changes occuring in topologies implement this listener and should register through the
+ * {@link Cluster} interface
+ *
+ */
+public interface ClusterChangeListener {
+ public void onChange();
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
new file mode 100644
index 0000000..82e7a5e
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -0,0 +1,198 @@
+package org.apache.s4.comm.topology;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener, IZkStateListener {
+
+ private static Logger logger = LoggerFactory.getLogger(ClusterFromZK.class);
+
+ private final AtomicReference<PhysicalCluster> clusterRef;
+ private final List<ClusterChangeListener> listeners;
+ private KeeperState state;
+ private final ZkClient zkClient;
+ private final String taskPath;
+ private final String processPath;
+ private final Lock lock;
+ private AtomicBoolean currentlyOwningTask;
+ private String machineId;
+ private String clusterName;
+
+ private int connectionTimeout;
+
+ /**
+ * only the local topology
+ */
+ @Inject
+ public ClusterFromZK(@Named("cluster.name") String clusterName,
+ @Named("cluster.zk_address") String zookeeperAddress,
+ @Named("cluster.zk_session_timeout") int sessionTimeout,
+ @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ this.connectionTimeout = connectionTimeout;
+ this.clusterName = clusterName;
+ this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
+ this.processPath = "/s4/clusters/" + clusterName + "/process";
+ lock = new ReentrantLock();
+ zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
+ ZkSerializer serializer = new ZNRecordSerializer();
+ zkClient.setZkSerializer(serializer);
+ zkClient.subscribeStateChanges(this);
+ if (!zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS)) {
+ throw new Exception("cannot connect to zookeeper");
+ }
+ try {
+ machineId = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ logger.warn("Unable to get hostname", e);
+ machineId = "UNKNOWN";
+ }
+ this.clusterRef = new AtomicReference<PhysicalCluster>();
+ this.listeners = new ArrayList<ClusterChangeListener>();
+ this.handleStateChanged(KeeperState.SyncConnected);
+ zkClient.subscribeChildChanges(taskPath, this);
+ zkClient.subscribeChildChanges(processPath, this);
+ // bug in zkClient, it does not invoke handleNewSession the first time
+ // it connects
+ this.handleNewSession();
+
+ }
+
+ /**
+ * any topology
+ */
+ public ClusterFromZK(String clusterName, ZkClient zkClient, String machineId) {
+
+ this.zkClient = zkClient;
+ this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
+ this.processPath = "/s4/clusters/" + clusterName + "/process";
+ this.clusterName = clusterName;
+ this.lock = new ReentrantLock();
+ this.machineId = machineId;
+ this.listeners = new ArrayList<ClusterChangeListener>();
+ this.clusterRef = new AtomicReference<PhysicalCluster>();
+ zkClient.subscribeChildChanges(taskPath, this);
+ zkClient.subscribeChildChanges(processPath, this);
+ }
+
+ @Override
+ public PhysicalCluster getPhysicalCluster() {
+ return clusterRef.get();
+ }
+
+ @Override
+ public void addListener(ClusterChangeListener listener) {
+ logger.info("Adding topology change listener:" + listener);
+ listeners.add(listener);
+ }
+
+ @Override
+ public void removeListener(ClusterChangeListener listener) {
+ logger.info("Removing topology change listener:" + listener);
+ listeners.remove(listener);
+ }
+
+ @Override
+ public void handleChildChange(String paramString, List<String> paramList) throws Exception {
+ doProcess();
+ }
+
+ void doProcess() {
+ lock.lock();
+ try {
+ refreshTopology();
+ } catch (Exception e) {
+ logger.error("", e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void refreshTopology() throws Exception {
+ List<String> processes = zkClient.getChildren(processPath);
+ List<String> tasks = zkClient.getChildren(taskPath);
+ PhysicalCluster cluster = new PhysicalCluster(tasks.size());
+ for (int i = 0; i < processes.size(); i++) {
+ cluster.setName(clusterName);
+ ZNRecord process = zkClient.readData(processPath + "/" + processes.get(i), true);
+ if (process != null) {
+ int partition = Integer.parseInt(process.getSimpleField("partition"));
+ String host = process.getSimpleField("host");
+ int port = Integer.parseInt(process.getSimpleField("port"));
+ String taskId = process.getSimpleField("taskId");
+ ClusterNode node = new ClusterNode(partition, port, host, taskId);
+ cluster.addNode(node);
+ }
+ }
+ logger.info("Changing cluster topology to " + cluster + " from " + clusterRef.get());
+ clusterRef.set(cluster);
+ // Notify all changeListeners about the topology change
+ for (ClusterChangeListener listener : listeners) {
+ listener.onChange();
+ }
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception {
+ doProcess();
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception {
+ doProcess();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((clusterName == null) ? 0 : clusterName.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ClusterFromZK other = (ClusterFromZK) obj;
+ if (clusterName == null) {
+ if (other.clusterName != null)
+ return false;
+ } else if (!clusterName.equals(other.clusterName))
+ return false;
+ return true;
+ }
+
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception {
+ // TODO we should reconnect only if we hold the zookeeper connection (i.e. this is the local cluster)
+ }
+
+ @Override
+ public void handleNewSession() throws Exception {
+ doProcess();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Clusters.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Clusters.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Clusters.java
new file mode 100644
index 0000000..aa8b78f
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/Clusters.java
@@ -0,0 +1,12 @@
+package org.apache.s4.comm.topology;
+
+/**
+ * Represents clusters related to the current node (clusters to which this node belongs, and connected clusters that may
+ * receive messages from this node)
+ *
+ */
+public interface Clusters {
+
+ Cluster getCluster(String clusterName);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
new file mode 100644
index 0000000..c5445d3
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
@@ -0,0 +1,103 @@
+package org.apache.s4.comm.topology;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Monitors all clusters
+ *
+ */
+public class ClustersFromZK implements Clusters, IZkStateListener {
+ private static final Logger logger = LoggerFactory.getLogger(ClustersFromZK.class);
+ private KeeperState state;
+ private final ZkClient zkClient;
+ private final Lock lock;
+ private String machineId;
+ private Map<String, ClusterFromZK> clusters = new HashMap<String, ClusterFromZK>();
+ private int connectionTimeout;
+ private String clusterName;
+
+ @Inject
+ public ClustersFromZK(@Named("cluster.name") String clusterName,
+ @Named("cluster.zk_address") String zookeeperAddress,
+ @Named("cluster.zk_session_timeout") int sessionTimeout,
+ @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ this.clusterName = clusterName;
+ this.connectionTimeout = connectionTimeout;
+ lock = new ReentrantLock();
+ zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
+ ZkSerializer serializer = new ZNRecordSerializer();
+ zkClient.setZkSerializer(serializer);
+ zkClient.subscribeStateChanges(this);
+ zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
+ try {
+ machineId = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ logger.warn("Unable to get hostname", e);
+ machineId = "UNKNOWN";
+ }
+ // bug in zkClient, it does not invoke handleNewSession the first time
+ // it connects
+ this.handleStateChanged(KeeperState.SyncConnected);
+
+ this.handleNewSession();
+
+ }
+
+ /**
+ * One method to do any processing if there is a change in ZK, all callbacks will be processed sequentially
+ */
+ private void doProcess() {
+ lock.lock();
+ try {
+ for (Map.Entry<String, ClusterFromZK> cluster : clusters.entrySet()) {
+ cluster.getValue().doProcess();
+ }
+ } catch (Exception e) {
+ logger.warn("Exception in tryToAcquireTask", e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception {
+ this.state = state;
+ if (!state.equals(KeeperState.SyncConnected)) {
+ logger.warn("Session not connected for cluster [{}]: [{}]. Trying to reconnect", clusterName, state.name());
+ zkClient.connect(connectionTimeout, null);
+ handleNewSession();
+ }
+ }
+
+ @Override
+ public void handleNewSession() throws Exception {
+ logger.info("New session:" + zkClient.getSessionId());
+ List<String> clusterNames = zkClient.getChildren("/s4/clusters");
+ for (String clusterName : clusterNames) {
+ ClusterFromZK cluster = new ClusterFromZK(clusterName, zkClient, machineId);
+ clusters.put(clusterName, cluster);
+ }
+ doProcess();
+ }
+
+ public Cluster getCluster(String clusterName) {
+ return clusters.get(clusterName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
new file mode 100644
index 0000000..32348df
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
@@ -0,0 +1,125 @@
+package org.apache.s4.comm.topology;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * The S4 physical cluster implementation.
+ *
+ */
+public class PhysicalCluster {
+
+ // TODO: do we need a Cluster interface to represent different types of
+ // implementations?
+
+ private static final Logger logger = LoggerFactory.getLogger(PhysicalCluster.class);
+
+ List<ClusterNode> nodes = new ArrayList<ClusterNode>();
+ String mode = "unicast";
+ String name = "unknown";
+
+ final private String[] hosts;
+ final private String[] ports;
+ final private int numNodes;
+ private int numPartitions;
+
+ public PhysicalCluster(int numPartitions) {
+ this.hosts = new String[] {};
+ this.ports = new String[] {};
+ this.numNodes = 0;
+ this.numPartitions = numPartitions;
+ }
+
+ /**
+ * Define the hosts and corresponding ports in the cluster.
+ *
+ * @param hosts
+ * a comma separates list of host names.
+ * @param ports
+ * a comma separated list of ports.
+ * @throws IOException
+ * if number of hosts and ports don't match.
+ */
+ PhysicalCluster(String hosts, String ports) throws IOException {
+ if (hosts != null && hosts.length() > 0 && ports != null && ports.length() > 0) {
+ this.ports = ports.split(",");
+ this.hosts = hosts.split(",");
+
+ if (this.ports.length != this.hosts.length) {
+ logger.error("Number of hosts should match number of ports in properties file. hosts: " + hosts
+ + " ports: " + ports);
+ throw new IOException();
+ }
+
+ numNodes = this.hosts.length;
+ for (int i = 0; i < numNodes; i++) {
+ ClusterNode node = new ClusterNode(i, Integer.parseInt(this.ports[i]), this.hosts[i], "");
+ nodes.add(node);
+ logger.info("Added cluster node: " + this.hosts[i] + ":" + this.ports[i]);
+ }
+ numPartitions = numNodes;
+ } else {
+ this.hosts = new String[] {};
+ this.ports = new String[] {};
+ this.numNodes = 0;
+
+ }
+ }
+
+ /**
+ * Number of partitions in the cluster.
+ *
+ * @return
+ */
+ public int getPartitionCount() {
+ return numPartitions;
+ }
+
+ /**
+ * @param node
+ */
+ public void addNode(ClusterNode node) {
+ nodes.add(node);
+ }
+
+ /**
+ * @return a list of {@link ClusterNode} objects available in the cluster.
+ */
+ public List<ClusterNode> getNodes() {
+ return Collections.unmodifiableList(nodes);
+ }
+
+ // TODO: do we need mode and name? Making provate for now.
+
+ @SuppressWarnings("unused")
+ private String getMode() {
+ return mode;
+ }
+
+ @SuppressWarnings("unused")
+ private void setMode(String mode) {
+ this.mode = mode;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("{ nbNodes=").append(nodes.size()).append(",name=").append(name).append(",mode=").append(mode)
+ .append(",type=").append(",nodes=").append(nodes).append("}");
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteCluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteCluster.java
new file mode 100644
index 0000000..2b9d866
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteCluster.java
@@ -0,0 +1,10 @@
+package org.apache.s4.comm.topology;
+
+/**
+ *
+ * Represents a logical cluster external to the current cluster
+ *
+ */
+public interface RemoteCluster extends Cluster {
+
+}