You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 17:13:00 UTC

git commit: s4-33:s4.fixtures refactored across s4-comm and s4-core

Updated Branches:
  refs/heads/piper 8b642d182 -> 111959da9


s4-33:s4.fixtures refactored across s4-comm and s4-core


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/111959da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/111959da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/111959da

Branch: refs/heads/piper
Commit: 111959da9ee348424f3a21540691f206f4517283
Parents: 8b642d1
Author: Karthik Kambatla <kk...@cs.purdue.edu>
Authored: Thu Dec 22 03:30:53 2011 -0500
Committer: Matthieu Morel <mm...@apache.org>
Committed: Tue Jan 3 18:08:36 2012 +0100

----------------------------------------------------------------------
 subprojects/s4-comm/s4-comm.gradle                 |   15 +-
 .../java/org/apache/s4/fixtures/CommTestUtils.java |  439 ++++++++++++++
 .../FileBasedClusterManagementTestModule.java      |   78 +++
 .../ZkBasedClusterManagementTestModule.java        |   79 +++
 subprojects/s4-core/s4-core.gradle                 |    1 +
 .../test/java/org/apache/s4/core/TriggerTest.java  |   16 +-
 .../java/org/apache/s4/core/TriggerablePE.java     |    4 +-
 .../apache/s4/core/apploading/AppLoadingTest.java  |   27 +-
 .../org/apache/s4/core/apploading/SimpleApp.java   |    4 +-
 .../org/apache/s4/core/apploading/SimplePE.java    |    4 +-
 .../java/org/apache/s4/fixtures/CoreTestUtils.java |   24 +
 .../FileBasedClusterManagementTestModule.java      |   78 ---
 .../java/org/apache/s4/fixtures/TestUtils.java     |  451 ---------------
 .../ZkBasedClusterManagementTestModule.java        |   79 ---
 .../org/apache/s4/wordcount/WordClassifierPE.java  |    8 +-
 .../org/apache/s4/wordcount/WordCountTest.java     |   22 +-
 .../apache/s4/wordcount/zk/WordCountTestZk.java    |   26 +-
 17 files changed, 690 insertions(+), 665 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-comm/s4-comm.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/s4-comm.gradle b/subprojects/s4-comm/s4-comm.gradle
index b266c9b..b88239c 100644
--- a/subprojects/s4-comm/s4-comm.gradle
+++ b/subprojects/s4-comm/s4-comm.gradle
@@ -20,11 +20,24 @@ dependencies {
     compile project(":s4-base")
     compile libraries.json
     compile libraries.gson
- 	compile libraries.kryo
+    compile libraries.kryo
     compile libraries.netty
     compile libraries.zkclient
 }
 
+task testJar(type: Jar) {
+    baseName = "test-${project.archivesBaseName}"
+    from sourceSets.test.classes
+}
+
+configurations {
+    tests
+}
+
+artifacts {
+    tests testJar
+}
+
 test {
     forkEvery=1
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
new file mode 100644
index 0000000..984fdfa
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
@@ -0,0 +1,439 @@
+package org.apache.s4.fixtures;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Contains static methods that can be used in tests for things such as: - files utilities: strings <-> files
+ * conversion, directory recursive delete etc... - starting local instances for zookeeper and bookkeeper - distributed
+ * latches through zookeeper - etc...
+ * 
+ */
+public class CommTestUtils {
+
+    private static final Logger logger = LoggerFactory.getLogger(CommTestUtils.class);
+
+    public static final int ZK_PORT = 21810;
+    public static final int INITIAL_BOOKIE_PORT = 5000;
+    public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp");
+    public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
+            + "storage");
+    public static ServerSocket serverSocket;
+    static {
+        logger.info("Storage dir: " + DEFAULT_STORAGE_DIR);
+    }
+
+    protected static Process forkProcess(String mainClass, String... args) throws IOException, InterruptedException {
+        List<String> cmdList = new ArrayList<String>();
+        cmdList.add("java");
+        cmdList.add("-cp");
+        cmdList.add(System.getProperty("java.class.path"));
+        // cmdList.add("-Xdebug");
+        // cmdList.add("-Xnoagent");
+        //
+        // cmdList.add("-Xrunjdwp:transport=dt_socket,address=8788,server=y,suspend=n");
+
+        cmdList.add(mainClass);
+        for (String arg : args) {
+            cmdList.add(arg);
+        }
+
+        System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
+        ProcessBuilder pb = new ProcessBuilder(cmdList);
+
+        pb.directory(new File(System.getProperty("user.dir")));
+        pb.redirectErrorStream();
+        final Process process = pb.start();
+
+        // TODO some synchro with s4 platform ready state
+        Thread.sleep(2000);
+        try {
+            int exitValue = process.exitValue();
+            Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
+        } catch (IllegalThreadStateException ignored) {
+        }
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
+                String line;
+                try {
+                    line = br.readLine();
+                    while (line != null) {
+                        System.out.println(line);
+                        line = br.readLine();
+                    }
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }).start();
+
+        return process;
+    }
+
+    public static void killS4App(Process forkedApp) throws IOException, InterruptedException {
+        if (forkedApp != null) {
+            forkedApp.destroy();
+        }
+    }
+
+    public static void writeStringToFile(String s, File f) throws IOException {
+        if (f.exists()) {
+            if (!f.delete()) {
+                throw new RuntimeException("Cannot delete file " + f.getAbsolutePath());
+            }
+        }
+
+        FileWriter fw = null;
+        try {
+            if (!f.createNewFile()) {
+                throw new RuntimeException("Cannot create new file : " + f.getAbsolutePath());
+            }
+            fw = new FileWriter(f);
+
+            fw.write(s);
+        } catch (IOException e) {
+            throw (e);
+        } finally {
+            if (fw != null) {
+                try {
+                    fw.close();
+                } catch (IOException e) {
+                    throw (e);
+                }
+            }
+        }
+    }
+
+    public static String readFile(File f) throws IOException {
+        BufferedReader br = null;
+        try {
+            br = new BufferedReader(new FileReader(f));
+            StringBuilder sb = new StringBuilder();
+            String line = br.readLine();
+            while (line != null) {
+                sb.append(line);
+                line = br.readLine();
+                if (line != null) {
+                    sb.append("\n");
+                }
+            }
+            return sb.toString();
+        } finally {
+            if (br != null) {
+                try {
+                    br.close();
+                } catch (IOException e) {
+                    throw (e);
+                }
+            }
+        }
+
+    }
+
+    public static NIOServerCnxn.Factory startZookeeperServer() throws IOException, InterruptedException,
+            KeeperException {
+
+        final File zkDataDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
+                + "zookeeper" + File.separator + "data");
+        if (zkDataDir.exists()) {
+            CommTestUtils.deleteDirectoryContents(zkDataDir);
+        } else {
+            zkDataDir.mkdirs();
+        }
+
+        ZooKeeperServer zks = new ZooKeeperServer(zkDataDir, zkDataDir, 3000);
+        NIOServerCnxn.Factory nioZookeeperConnectionFactory = new NIOServerCnxn.Factory(new InetSocketAddress(ZK_PORT));
+        nioZookeeperConnectionFactory.startup(zks);
+        Assert.assertTrue("waiting for server being up", waitForServerUp("localhost", ZK_PORT, 4000));
+        return nioZookeeperConnectionFactory;
+
+    }
+
+    public static void stopZookeeperServer(NIOServerCnxn.Factory f) throws IOException, InterruptedException {
+        if (f != null) {
+            f.shutdown();
+            Assert.assertTrue("waiting for server down", waitForServerDown("localhost", ZK_PORT, 3000));
+        }
+    }
+
+    public static void deleteDirectoryContents(File dir) {
+        File[] files = dir.listFiles();
+        for (File file : files) {
+            if (file.isDirectory()) {
+                deleteDirectoryContents(file);
+            }
+            if (!file.delete()) {
+                throw new RuntimeException("could not delete : " + file);
+            }
+        }
+    }
+
+    public static String readFileAsString(File f) throws IOException {
+        FileReader fr = new FileReader(f);
+        StringBuilder sb = new StringBuilder("");
+        BufferedReader br = new BufferedReader(fr);
+        String line = br.readLine();
+        while (line != null) {
+            sb.append(line);
+            line = br.readLine();
+            if (line != null) {
+                sb.append("\n");
+            }
+        }
+        return sb.toString();
+
+    }
+
+    // TODO factor this code (see BasicFSStateStorage) - or use commons io or
+    // guava
+    public static byte[] readFileAsByteArray(File file) throws Exception {
+        FileInputStream in = null;
+        try {
+            in = new FileInputStream(file);
+
+            long length = file.length();
+
+            /*
+             * Arrays can only be created using int types, so ensure that the file size is not too big before we
+             * downcast to create the array.
+             */
+            if (length > Integer.MAX_VALUE) {
+                throw new IOException("Error file is too large: " + file.getName() + " " + length + " bytes");
+            }
+
+            byte[] buffer = new byte[(int) length];
+            int offSet = 0;
+            int numRead = 0;
+
+            while (offSet < buffer.length && (numRead = in.read(buffer, offSet, buffer.length - offSet)) >= 0) {
+                offSet += numRead;
+            }
+
+            if (offSet < buffer.length) {
+                throw new IOException("Error, could not read entire file: " + file.getName() + " " + offSet + "/"
+                        + buffer.length + " bytes read");
+            }
+
+            in.close();
+            return buffer;
+
+        } finally {
+            if (in != null) {
+                in.close();
+            }
+        }
+    }
+
+    public static ZooKeeper createZkClient() throws IOException {
+        final ZooKeeper zk = new ZooKeeper("localhost:" + ZK_PORT, 4000, new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+            }
+        });
+        return zk;
+    }
+
+    public static void watchAndSignalCreation(String path, final CountDownLatch latch, final ZooKeeper zk)
+            throws KeeperException, InterruptedException {
+
+        // by default delete existing nodes with same path
+        watchAndSignalCreation(path, latch, zk, false);
+    }
+
+    public static void watchAndSignalCreation(String path, final CountDownLatch latch, final ZooKeeper zk,
+            boolean deleteIfExists) throws KeeperException, InterruptedException {
+
+        if (zk.exists(path, false) != null) {
+            if (deleteIfExists) {
+                zk.delete(path, -1);
+            } else {
+                latch.countDown();
+            }
+        }
+        zk.exists(path, new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                if (EventType.NodeCreated.equals(event.getType())) {
+                    latch.countDown();
+                }
+            }
+        });
+    }
+
+    public static void watchAndSignalChangedChildren(String path, final CountDownLatch latch, final ZooKeeper zk)
+            throws KeeperException, InterruptedException {
+
+        zk.getChildren(path, new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                if (EventType.NodeChildrenChanged.equals(event.getType())) {
+                    latch.countDown();
+                }
+            }
+        });
+    }
+
+    // from zookeeper's codebase
+    public static boolean waitForServerUp(String host, int port, long timeout) {
+        long start = System.currentTimeMillis();
+        while (true) {
+            try {
+                // if there are multiple hostports, just take the first one
+                String result = send4LetterWord(host, port, "stat");
+                if (result.startsWith("Zookeeper version:")) {
+                    return true;
+                }
+            } catch (IOException ignored) {
+                // ignore as this is expected
+            }
+
+            if (System.currentTimeMillis() > start + timeout) {
+                break;
+            }
+            try {
+                Thread.sleep(250);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+        return false;
+    }
+
+    // from zookeeper's codebase
+    public static String send4LetterWord(String host, int port, String cmd) throws IOException {
+        Socket sock = new Socket(host, port);
+        BufferedReader reader = null;
+        try {
+            OutputStream outstream = sock.getOutputStream();
+            outstream.write(cmd.getBytes());
+            outstream.flush();
+            // this replicates NC - close the output stream before reading
+            sock.shutdownOutput();
+
+            reader = new BufferedReader(new InputStreamReader(sock.getInputStream()));
+            StringBuilder sb = new StringBuilder();
+            String line;
+            while ((line = reader.readLine()) != null) {
+                sb.append(line + "\n");
+            }
+            return sb.toString();
+        } finally {
+            sock.close();
+            if (reader != null) {
+                reader.close();
+            }
+        }
+    }
+
+    // from zookeeper's codebase
+    public static boolean waitForServerDown(String host, int port, long timeout) {
+        long start = System.currentTimeMillis();
+        while (true) {
+            try {
+                send4LetterWord(host, port, "stat");
+            } catch (IOException e) {
+                return true;
+            }
+
+            if (System.currentTimeMillis() > start + timeout) {
+                break;
+            }
+            try {
+                Thread.sleep(250);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+        return false;
+    }
+
+    public static void cleanupTmpDirs() {
+        if (CommTestUtils.DEFAULT_TEST_OUTPUT_DIR.exists()) {
+            deleteDirectoryContents(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR);
+        }
+        CommTestUtils.DEFAULT_STORAGE_DIR.mkdirs();
+
+    }
+
+    public static void stopSocketAdapter() throws IOException {
+        if (serverSocket != null) {
+            serverSocket.close();
+        }
+    }
+
+    /**
+     * gradle and eclipse have different directories for output files This is justified here
+     * http://gradle.1045684.n5.nabble.com/Changing-default-IDE-output-directories-td3335478.html#a3337433
+     * 
+     * A consequence is that for tests to reference compiled files, we need to resolve the corresponding directory at
+     * runtime.
+     * 
+     * This is what this method does
+     * 
+     * @return directory containing the compiled test classes for this project and execution environment.
+     */
+    public static File findDirForCompiledTestClasses() {
+        String userDir = System.getProperty("user.dir");
+        String classpath = System.getProperty("java.class.path");
+        System.out.println(userDir);
+        System.out.println(classpath);
+        if (classpath.contains(userDir + "/bin")) {
+            // eclipse classpath
+            return new File(userDir + "/bin");
+        } else if (classpath.contains(userDir + "/build/classes/test")) {
+            // gradle classpath
+            return new File(userDir + "/build/classes/test");
+        } else {
+            // TODO other IDEs
+            throw new RuntimeException("Cannot find path for compiled test classes");
+        }
+
+    }
+
+    public static void injectIntoStringSocketAdapter(String string) throws IOException {
+        Socket socket = null;
+        PrintWriter writer = null;
+        try {
+            socket = new Socket("localhost", 12000);
+            writer = new PrintWriter(socket.getOutputStream(), true);
+            writer.println(string);
+        } catch (IOException e) {
+            e.printStackTrace();
+            System.exit(-1);
+        } finally {
+            if (socket != null) {
+                socket.close();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
new file mode 100644
index 0000000..b189350
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
@@ -0,0 +1,78 @@
+package org.apache.s4.fixtures;
+
+import java.io.InputStream;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+public abstract class FileBasedClusterManagementTestModule<T> extends AbstractModule {
+
+    protected PropertiesConfiguration config = null;
+    private final Class<?> appClass;
+
+    protected FileBasedClusterManagementTestModule() {
+        // infer actual app class through "super type tokens" (this simple code
+        // assumes actual module class is a direct subclass from this one)
+        ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
+        Type[] fieldArgTypes = pt.getActualTypeArguments();
+        this.appClass = (Class<?>) fieldArgTypes[0];
+    }
+
+    private void loadProperties(Binder binder) {
+
+        try {
+            InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
+            config = new PropertiesConfiguration();
+            config.load(is);
+            config.setProperty("cluster.lock_dir",
+                    config.getString("cluster.lock_dir").replace("{user.dir}", System.getProperty("java.io.tmpdir")));
+            System.out.println(ConfigurationUtils.toString(config));
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void configure() {
+        if (config == null) {
+            loadProperties(binder());
+        }
+        bind(appClass);
+        bind(Cluster.class);
+        bind(Hasher.class).to(DefaultHasher.class);
+        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+        bind(Assignment.class).to(AssignmentFromFile.class);
+        bind(Topology.class).to(TopologyFromFile.class);
+        bind(Emitter.class).to(UDPEmitter.class);
+        bind(Listener.class).to(UDPListener.class);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
new file mode 100644
index 0000000..be3739b
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
@@ -0,0 +1,79 @@
+package org.apache.s4.fixtures;
+
+import java.io.InputStream;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromZK;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+// also uses netty
+public class ZkBasedClusterManagementTestModule<T> extends AbstractModule {
+
+    protected PropertiesConfiguration config = null;
+    private final Class<?> appClass;
+
+    protected ZkBasedClusterManagementTestModule() {
+        // infer actual app class through "super type tokens" (this simple code
+        // assumes actual module class is a direct subclass from this one)
+        ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
+        Type[] fieldArgTypes = pt.getActualTypeArguments();
+        this.appClass = (Class<?>) fieldArgTypes[0];
+    }
+
+    private void loadProperties(Binder binder) {
+
+        try {
+            InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
+            config = new PropertiesConfiguration();
+            config.load(is);
+            config.setProperty("cluster.zk_address",
+                    config.getString("cluster.zk_address").replaceFirst("\\w+:\\d+", "localhost:" + CommTestUtils.ZK_PORT));
+            System.out.println(ConfigurationUtils.toString(config));
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void configure() {
+        if (config == null) {
+            loadProperties(binder());
+        }
+        bind(appClass);
+        bind(Cluster.class);
+        bind(Hasher.class).to(DefaultHasher.class);
+        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+        bind(Assignment.class).to(AssignmentFromZK.class);
+        bind(Topology.class).to(TopologyFromZK.class);
+        bind(Emitter.class).to(TCPEmitter.class);
+        bind(Listener.class).to(TCPListener.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/s4-core.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/s4-core.gradle b/subprojects/s4-core/s4-core.gradle
index fe5466c..1bdbb6c 100644
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@ -19,6 +19,7 @@ description = 'The S4 core platform.'
 dependencies {
     compile project(":s4-base")
     compile project(":s4-comm")
+    testCompile project(path: ':s4-comm', configuration: 'tests')
 }
 
 test {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
index 6d55029..90718ab 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
@@ -7,7 +7,7 @@ import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
 
-import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.NIOServerCnxn.Factory;
@@ -35,8 +35,8 @@ public abstract class TriggerTest {
 
     @Before
     public void prepare() throws IOException, InterruptedException, KeeperException {
-        TestUtils.cleanupTmpDirs();
-        zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+        CommTestUtils.cleanupTmpDirs();
+        zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
     }
 
     @After
@@ -45,11 +45,11 @@ public abstract class TriggerTest {
             app.close();
             app = null;
         }
-        TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+        CommTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
     }
 
     protected CountDownLatch createTriggerAppAndSendEvent() throws IOException, KeeperException, InterruptedException {
-        final ZooKeeper zk = TestUtils.createZkClient();
+        final ZooKeeper zk = CommTestUtils.createZkClient();
         Injector injector = Guice.createInjector(new TriggeredModule());
         app = injector.getInstance(TriggeredApp.class);
         app.init();
@@ -58,12 +58,12 @@ public abstract class TriggerTest {
         String time1 = String.valueOf(System.currentTimeMillis());
 
         CountDownLatch signalEvent1Processed = new CountDownLatch(1);
-        TestUtils.watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, zk);
+        CommTestUtils.watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, zk);
         
         CountDownLatch signalEvent1Triggered = new CountDownLatch(1);
-        TestUtils.watchAndSignalCreation("/onTrigger[StringEvent]@" + time1, signalEvent1Triggered, zk);
+        CommTestUtils.watchAndSignalCreation("/onTrigger[StringEvent]@" + time1, signalEvent1Triggered, zk);
 
-        TestUtils.injectIntoStringSocketAdapter(time1);
+        CommTestUtils.injectIntoStringSocketAdapter(time1);
 
         // check event processed
         Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerablePE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerablePE.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerablePE.java
index 3a35a76..d8008f4 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerablePE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerablePE.java
@@ -2,7 +2,7 @@ package org.apache.s4.core;
 
 import java.io.IOException;
 
-import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.wordcount.StringEvent;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -38,7 +38,7 @@ public class TriggerablePE extends ProcessingElement implements Watcher {
     public void onCreate() {
         if (zk == null) {
             try {
-                zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
+                zk = new ZooKeeper("localhost:" + CommTestUtils.ZK_PORT, 4000, this);
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/AppLoadingTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/AppLoadingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/AppLoadingTest.java
index fec8fe9..ee8abdf 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/AppLoadingTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/AppLoadingTest.java
@@ -14,7 +14,7 @@ import java.util.jar.JarEntry;
 import java.util.jar.JarOutputStream;
 
 import org.apache.s4.core.Server;
-import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.fixtures.CoreTestUtils;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.NIOServerCnxn.Factory;
 import org.junit.After;
@@ -23,7 +23,6 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
-
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
 
@@ -40,9 +39,9 @@ public class AppLoadingTest {
 
     @Before
     public void prepare() throws Exception {
-        TestUtils.cleanupTmpDirs();
-        zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
-        final ZooKeeper zk = TestUtils.createZkClient();
+        CoreTestUtils.cleanupTmpDirs();
+        zookeeperServerConnectionFactory = CoreTestUtils.startZookeeperServer();
+        final ZooKeeper zk = CoreTestUtils.createZkClient();
         try {
             zk.delete("/simpleAppCreated", -1);
         } catch (Exception ignored) {
@@ -53,8 +52,8 @@ public class AppLoadingTest {
 
     @After
     public void cleanup() throws Exception {
-        TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
-        TestUtils.killS4App(forkedApp);
+        CoreTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+        CoreTestUtils.killS4App(forkedApp);
     }
 
     @Ignore("fix paths")
@@ -68,7 +67,7 @@ public class AppLoadingTest {
         generateS4RFromDirectoryContents(rootAppDir, appFilesDir, "counterExample",
                 "org.apache.s4.example.counter.MyApp");
 
-        forkedApp = TestUtils.forkS4Node();
+        forkedApp = CoreTestUtils.forkS4Node();
         Thread.sleep(15000);
     }
 
@@ -129,9 +128,9 @@ public class AppLoadingTest {
 
         // TODO fix paths
 
-        final ZooKeeper zk = TestUtils.createZkClient();
+        final ZooKeeper zk = CoreTestUtils.createZkClient();
 
-        File rootAppDir = TestUtils.findDirForCompiledTestClasses();
+        File rootAppDir = CoreTestUtils.findDirForCompiledTestClasses();
 
         File appFilesDir = new File(rootAppDir, "test/s4/core/apploading");
         // 1. create app jar and place it in tmp/s4-apps
@@ -139,22 +138,22 @@ public class AppLoadingTest {
 
         CountDownLatch signalAppStarted = new CountDownLatch(1);
         // 2. start s4 node and check results
-        forkedApp = TestUtils.forkS4Node();
+        forkedApp = CoreTestUtils.forkS4Node();
 
         // TODO wait for ready state (zk node available)
         Thread.sleep(5000);
 
         // note: make sure we don't delete existing znode if it was already created
-        TestUtils.watchAndSignalCreation("/simpleAppCreated", signalAppStarted, zk, false);
+        CoreTestUtils.watchAndSignalCreation("/simpleAppCreated", signalAppStarted, zk, false);
 
         Assert.assertTrue(signalAppStarted.await(20, TimeUnit.SECONDS));
 
         String time1 = String.valueOf(System.currentTimeMillis());
 
         CountDownLatch signalEvent1Processed = new CountDownLatch(1);
-        TestUtils.watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, zk);
+        CoreTestUtils.watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, zk);
 
-        TestUtils.injectIntoStringSocketAdapter(time1);
+        CoreTestUtils.injectIntoStringSocketAdapter(time1);
 
         // check event processed
         Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java
index 08831cb..f01c211 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java
@@ -5,7 +5,7 @@ import java.io.IOException;
 import org.apache.s4.core.App;
 import org.apache.s4.core.Stream;
 import org.apache.s4.fixtures.SocketAdapter;
-import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.wordcount.SentenceKeyFinder;
 import org.apache.s4.wordcount.StringEvent;
 import org.apache.zookeeper.CreateMode;
@@ -21,7 +21,7 @@ public class SimpleApp extends App {
     @Override
     protected void onStart() {
         try {
-            final ZooKeeper zk = TestUtils.createZkClient();
+            final ZooKeeper zk = CommTestUtils.createZkClient();
             zk.create("/simpleAppCreated", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
             zk.close();
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimplePE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimplePE.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimplePE.java
index e6a9976..a8b7831 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimplePE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimplePE.java
@@ -4,7 +4,7 @@ import java.io.IOException;
 
 import org.apache.s4.core.App;
 import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.wordcount.StringEvent;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -44,7 +44,7 @@ public class SimplePE  extends ProcessingElement implements Watcher {
     protected void onCreate() {
         if (zk == null) {
             try {
-                zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
+                zk = new ZooKeeper("localhost:" + CommTestUtils.ZK_PORT, 4000, this);
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
new file mode 100644
index 0000000..8dc2159
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -0,0 +1,24 @@
+package org.apache.s4.fixtures;
+
+import java.io.IOException;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.Main;
+import org.apache.s4.fixtures.CommTestUtils;
+
+/**
+ * Contains static methods that can be used in tests for things such as: - files utilities: strings <-> files
+ * conversion, directory recursive delete etc... - starting local instances for zookeeper and bookkeeper - distributed
+ * latches through zookeeper - etc...
+ * 
+ */
+public class CoreTestUtils extends CommTestUtils {
+
+    public static Process forkS4App(Class<?> moduleClass, Class<?> appClass) throws IOException, InterruptedException {
+        return forkProcess(App.class.getName(), moduleClass.getName(), appClass.getName());
+    }
+
+    public static Process forkS4Node() throws IOException, InterruptedException {
+        return forkProcess(Main.class.getName(), new String[] {});
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
deleted file mode 100644
index b189350..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package org.apache.s4.fixtures;
-
-import java.io.InputStream;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-public abstract class FileBasedClusterManagementTestModule<T> extends AbstractModule {
-
-    protected PropertiesConfiguration config = null;
-    private final Class<?> appClass;
-
-    protected FileBasedClusterManagementTestModule() {
-        // infer actual app class through "super type tokens" (this simple code
-        // assumes actual module class is a direct subclass from this one)
-        ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
-        Type[] fieldArgTypes = pt.getActualTypeArguments();
-        this.appClass = (Class<?>) fieldArgTypes[0];
-    }
-
-    private void loadProperties(Binder binder) {
-
-        try {
-            InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
-            config = new PropertiesConfiguration();
-            config.load(is);
-            config.setProperty("cluster.lock_dir",
-                    config.getString("cluster.lock_dir").replace("{user.dir}", System.getProperty("java.io.tmpdir")));
-            System.out.println(ConfigurationUtils.toString(config));
-            // TODO - validate properties.
-
-            /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-        } catch (ConfigurationException e) {
-            binder.addError(e);
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    protected void configure() {
-        if (config == null) {
-            loadProperties(binder());
-        }
-        bind(appClass);
-        bind(Cluster.class);
-        bind(Hasher.class).to(DefaultHasher.class);
-        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-        bind(Assignment.class).to(AssignmentFromFile.class);
-        bind(Topology.class).to(TopologyFromFile.class);
-        bind(Emitter.class).to(UDPEmitter.class);
-        bind(Listener.class).to(UDPListener.class);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/TestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/TestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/TestUtils.java
deleted file mode 100644
index 91690bd..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/TestUtils.java
+++ /dev/null
@@ -1,451 +0,0 @@
-package org.apache.s4.fixtures;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import junit.framework.Assert;
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.Main;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.NIOServerCnxn;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Contains static methods that can be used in tests for things such as: - files utilities: strings <-> files
- * conversion, directory recursive delete etc... - starting local instances for zookeeper and bookkeeper - distributed
- * latches through zookeeper - etc...
- * 
- */
-public class TestUtils {
-
-    private static final Logger logger = LoggerFactory.getLogger(TestUtils.class);
-
-    public static final int ZK_PORT = 21810;
-    public static final int INITIAL_BOOKIE_PORT = 5000;
-    public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp");
-    public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
-            + "storage");
-    public static ServerSocket serverSocket;
-    static {
-        logger.info("Storage dir: " + DEFAULT_STORAGE_DIR);
-    }
-
-    public static Process forkS4App(Class<?> moduleClass, Class<?> appClass) throws IOException, InterruptedException {
-        return forkProcess(App.class.getName(), moduleClass.getName(), appClass.getName());
-    }
-
-    public static Process forkS4Node() throws IOException, InterruptedException {
-        return forkProcess(Main.class.getName(), new String[] {});
-    }
-
-    private static Process forkProcess(String mainClass, String... args) throws IOException, InterruptedException {
-        List<String> cmdList = new ArrayList<String>();
-        cmdList.add("java");
-        cmdList.add("-cp");
-        cmdList.add(System.getProperty("java.class.path"));
-        // cmdList.add("-Xdebug");
-        // cmdList.add("-Xnoagent");
-        //
-        // cmdList.add("-Xrunjdwp:transport=dt_socket,address=8788,server=y,suspend=n");
-
-        cmdList.add(mainClass);
-        for (String arg : args) {
-            cmdList.add(arg);
-        }
-
-        System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
-        ProcessBuilder pb = new ProcessBuilder(cmdList);
-
-        pb.directory(new File(System.getProperty("user.dir")));
-        pb.redirectErrorStream();
-        final Process process = pb.start();
-
-        // TODO some synchro with s4 platform ready state
-        Thread.sleep(2000);
-        try {
-            int exitValue = process.exitValue();
-            Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
-        } catch (IllegalThreadStateException ignored) {
-        }
-
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
-                String line;
-                try {
-                    line = br.readLine();
-                    while (line != null) {
-                        System.out.println(line);
-                        line = br.readLine();
-                    }
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        }).start();
-
-        return process;
-    }
-
-    public static void killS4App(Process forkedApp) throws IOException, InterruptedException {
-        if (forkedApp != null) {
-            forkedApp.destroy();
-        }
-    }
-
-    public static void writeStringToFile(String s, File f) throws IOException {
-        if (f.exists()) {
-            if (!f.delete()) {
-                throw new RuntimeException("Cannot delete file " + f.getAbsolutePath());
-            }
-        }
-
-        FileWriter fw = null;
-        try {
-            if (!f.createNewFile()) {
-                throw new RuntimeException("Cannot create new file : " + f.getAbsolutePath());
-            }
-            fw = new FileWriter(f);
-
-            fw.write(s);
-        } catch (IOException e) {
-            throw (e);
-        } finally {
-            if (fw != null) {
-                try {
-                    fw.close();
-                } catch (IOException e) {
-                    throw (e);
-                }
-            }
-        }
-    }
-
-    public static String readFile(File f) throws IOException {
-        BufferedReader br = null;
-        try {
-            br = new BufferedReader(new FileReader(f));
-            StringBuilder sb = new StringBuilder();
-            String line = br.readLine();
-            while (line != null) {
-                sb.append(line);
-                line = br.readLine();
-                if (line != null) {
-                    sb.append("\n");
-                }
-            }
-            return sb.toString();
-        } finally {
-            if (br != null) {
-                try {
-                    br.close();
-                } catch (IOException e) {
-                    throw (e);
-                }
-            }
-        }
-
-    }
-
-    public static NIOServerCnxn.Factory startZookeeperServer() throws IOException, InterruptedException,
-            KeeperException {
-
-        List<String> cmdList = new ArrayList<String>();
-        final File zkDataDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
-                + "zookeeper" + File.separator + "data");
-        if (zkDataDir.exists()) {
-            TestUtils.deleteDirectoryContents(zkDataDir);
-        } else {
-            zkDataDir.mkdirs();
-        }
-
-        ZooKeeperServer zks = new ZooKeeperServer(zkDataDir, zkDataDir, 3000);
-        NIOServerCnxn.Factory nioZookeeperConnectionFactory = new NIOServerCnxn.Factory(new InetSocketAddress(ZK_PORT));
-        nioZookeeperConnectionFactory.startup(zks);
-        Assert.assertTrue("waiting for server being up", waitForServerUp("localhost", ZK_PORT, 4000));
-        return nioZookeeperConnectionFactory;
-
-    }
-
-    public static void stopZookeeperServer(NIOServerCnxn.Factory f) throws IOException, InterruptedException {
-        if (f != null) {
-            f.shutdown();
-            Assert.assertTrue("waiting for server down", waitForServerDown("localhost", ZK_PORT, 3000));
-        }
-    }
-
-    public static void deleteDirectoryContents(File dir) {
-        File[] files = dir.listFiles();
-        for (File file : files) {
-            if (file.isDirectory()) {
-                deleteDirectoryContents(file);
-            }
-            if (!file.delete()) {
-                throw new RuntimeException("could not delete : " + file);
-            }
-        }
-    }
-
-    public static String readFileAsString(File f) throws IOException {
-        FileReader fr = new FileReader(f);
-        StringBuilder sb = new StringBuilder("");
-        BufferedReader br = new BufferedReader(fr);
-        String line = br.readLine();
-        while (line != null) {
-            sb.append(line);
-            line = br.readLine();
-            if (line != null) {
-                sb.append("\n");
-            }
-        }
-        return sb.toString();
-
-    }
-
-    // TODO factor this code (see BasicFSStateStorage) - or use commons io or
-    // guava
-    public static byte[] readFileAsByteArray(File file) throws Exception {
-        FileInputStream in = null;
-        try {
-            in = new FileInputStream(file);
-
-            long length = file.length();
-
-            /*
-             * Arrays can only be created using int types, so ensure that the file size is not too big before we
-             * downcast to create the array.
-             */
-            if (length > Integer.MAX_VALUE) {
-                throw new IOException("Error file is too large: " + file.getName() + " " + length + " bytes");
-            }
-
-            byte[] buffer = new byte[(int) length];
-            int offSet = 0;
-            int numRead = 0;
-
-            while (offSet < buffer.length && (numRead = in.read(buffer, offSet, buffer.length - offSet)) >= 0) {
-                offSet += numRead;
-            }
-
-            if (offSet < buffer.length) {
-                throw new IOException("Error, could not read entire file: " + file.getName() + " " + offSet + "/"
-                        + buffer.length + " bytes read");
-            }
-
-            in.close();
-            return buffer;
-
-        } finally {
-            if (in != null) {
-                in.close();
-            }
-        }
-    }
-
-    public static ZooKeeper createZkClient() throws IOException {
-        final ZooKeeper zk = new ZooKeeper("localhost:" + ZK_PORT, 4000, new Watcher() {
-            @Override
-            public void process(WatchedEvent event) {
-            }
-        });
-        return zk;
-    }
-
-    public static void watchAndSignalCreation(String path, final CountDownLatch latch, final ZooKeeper zk)
-            throws KeeperException, InterruptedException {
-
-        // by default delete existing nodes with same path
-        watchAndSignalCreation(path, latch, zk, false);
-    }
-
-    public static void watchAndSignalCreation(String path, final CountDownLatch latch, final ZooKeeper zk,
-            boolean deleteIfExists) throws KeeperException, InterruptedException {
-
-        if (zk.exists(path, false) != null) {
-            if (deleteIfExists) {
-                zk.delete(path, -1);
-            } else {
-                latch.countDown();
-            }
-        }
-        zk.exists(path, new Watcher() {
-            @Override
-            public void process(WatchedEvent event) {
-                if (EventType.NodeCreated.equals(event.getType())) {
-                    latch.countDown();
-                }
-            }
-        });
-    }
-
-    public static void watchAndSignalChangedChildren(String path, final CountDownLatch latch, final ZooKeeper zk)
-            throws KeeperException, InterruptedException {
-
-        zk.getChildren(path, new Watcher() {
-            @Override
-            public void process(WatchedEvent event) {
-                if (EventType.NodeChildrenChanged.equals(event.getType())) {
-                    latch.countDown();
-                }
-            }
-        });
-    }
-
-    // from zookeeper's codebase
-    public static boolean waitForServerUp(String host, int port, long timeout) {
-        long start = System.currentTimeMillis();
-        while (true) {
-            try {
-                // if there are multiple hostports, just take the first one
-                String result = send4LetterWord(host, port, "stat");
-                if (result.startsWith("Zookeeper version:")) {
-                    return true;
-                }
-            } catch (IOException ignored) {
-                // ignore as this is expected
-            }
-
-            if (System.currentTimeMillis() > start + timeout) {
-                break;
-            }
-            try {
-                Thread.sleep(250);
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        }
-        return false;
-    }
-
-    // from zookeeper's codebase
-    public static String send4LetterWord(String host, int port, String cmd) throws IOException {
-        Socket sock = new Socket(host, port);
-        BufferedReader reader = null;
-        try {
-            OutputStream outstream = sock.getOutputStream();
-            outstream.write(cmd.getBytes());
-            outstream.flush();
-            // this replicates NC - close the output stream before reading
-            sock.shutdownOutput();
-
-            reader = new BufferedReader(new InputStreamReader(sock.getInputStream()));
-            StringBuilder sb = new StringBuilder();
-            String line;
-            while ((line = reader.readLine()) != null) {
-                sb.append(line + "\n");
-            }
-            return sb.toString();
-        } finally {
-            sock.close();
-            if (reader != null) {
-                reader.close();
-            }
-        }
-    }
-
-    // from zookeeper's codebase
-    public static boolean waitForServerDown(String host, int port, long timeout) {
-        long start = System.currentTimeMillis();
-        while (true) {
-            try {
-                send4LetterWord(host, port, "stat");
-            } catch (IOException e) {
-                return true;
-            }
-
-            if (System.currentTimeMillis() > start + timeout) {
-                break;
-            }
-            try {
-                Thread.sleep(250);
-            } catch (InterruptedException e) {
-                // ignore
-            }
-        }
-        return false;
-    }
-
-    public static void cleanupTmpDirs() {
-        if (TestUtils.DEFAULT_TEST_OUTPUT_DIR.exists()) {
-            deleteDirectoryContents(TestUtils.DEFAULT_TEST_OUTPUT_DIR);
-        }
-        TestUtils.DEFAULT_STORAGE_DIR.mkdirs();
-
-    }
-
-    public static void stopSocketAdapter() throws IOException {
-        if (serverSocket != null) {
-            serverSocket.close();
-        }
-    }
-
-    /**
-     * gradle and eclipse have different directories for output files This is justified here
-     * http://gradle.1045684.n5.nabble.com/Changing-default-IDE-output-directories-td3335478.html#a3337433
-     * 
-     * A consequence is that for tests to reference compiled files, we need to resolve the corresponding directory at
-     * runtime.
-     * 
-     * This is what this method does
-     * 
-     * @return directory containing the compiled test classes for this project and execution environment.
-     */
-    public static File findDirForCompiledTestClasses() {
-        String userDir = System.getProperty("user.dir");
-        String classpath = System.getProperty("java.class.path");
-        System.out.println(userDir);
-        System.out.println(classpath);
-        if (classpath.contains(userDir + "/bin")) {
-            // eclipse classpath
-            return new File(userDir + "/bin");
-        } else if (classpath.contains(userDir + "/build/classes/test")) {
-            // gradle classpath
-            return new File(userDir + "/build/classes/test");
-        } else {
-            // TODO other IDEs
-            throw new RuntimeException("Cannot find path for compiled test classes");
-        }
-
-    }
-
-    public static void injectIntoStringSocketAdapter(String string) throws IOException {
-        Socket socket = null;
-        PrintWriter writer = null;
-        try {
-            socket = new Socket("localhost", 12000);
-            writer = new PrintWriter(socket.getOutputStream(), true);
-            writer.println(string);
-        } catch (IOException e) {
-            e.printStackTrace();
-            System.exit(-1);
-        } finally {
-            if (socket != null) {
-                socket.close();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
deleted file mode 100644
index f5959a2..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package org.apache.s4.fixtures;
-
-import java.io.InputStream;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.tcp.TCPListener;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromZK;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-// also uses netty
-public class ZkBasedClusterManagementTestModule<T> extends AbstractModule {
-
-    protected PropertiesConfiguration config = null;
-    private final Class<?> appClass;
-
-    protected ZkBasedClusterManagementTestModule() {
-        // infer actual app class through "super type tokens" (this simple code
-        // assumes actual module class is a direct subclass from this one)
-        ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
-        Type[] fieldArgTypes = pt.getActualTypeArguments();
-        this.appClass = (Class<?>) fieldArgTypes[0];
-    }
-
-    private void loadProperties(Binder binder) {
-
-        try {
-            InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
-            config = new PropertiesConfiguration();
-            config.load(is);
-            config.setProperty("cluster.zk_address",
-                    config.getString("cluster.zk_address").replaceFirst("\\w+:\\d+", "localhost:" + TestUtils.ZK_PORT));
-            System.out.println(ConfigurationUtils.toString(config));
-            // TODO - validate properties.
-
-            /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-        } catch (ConfigurationException e) {
-            binder.addError(e);
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    protected void configure() {
-        if (config == null) {
-            loadProperties(binder());
-        }
-        bind(appClass);
-        bind(Cluster.class);
-        bind(Hasher.class).to(DefaultHasher.class);
-        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-        bind(Assignment.class).to(AssignmentFromZK.class);
-        bind(Topology.class).to(TopologyFromZK.class);
-        bind(Emitter.class).to(TCPEmitter.class);
-        bind(Listener.class).to(TCPListener.class);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
index 48f5b23..820dbe5 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
@@ -11,7 +11,7 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.log4j.Logger;
 import org.apache.s4.core.App;
 import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -52,7 +52,7 @@ public class WordClassifierPE extends ProcessingElement implements Watcher {
             }
             ++counter;
             if (counter == WordCountTest.TOTAL_WORDS) {
-                File results = new File(TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
+                File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
                 if (results.exists()) {
                     if (!results.delete()) {
                         throw new RuntimeException("cannot delete results file");
@@ -63,7 +63,7 @@ public class WordClassifierPE extends ProcessingElement implements Watcher {
                 for (Entry<String, Integer> entry : entrySet) {
                     sb.append(entry.getKey() + "=" + entry.getValue() + ";");
                 }
-                TestUtils.writeStringToFile(sb.toString(), results);
+                CommTestUtils.writeStringToFile(sb.toString(), results);
 
                 zk.create("/textProcessed", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
             } else {
@@ -78,7 +78,7 @@ public class WordClassifierPE extends ProcessingElement implements Watcher {
                 // check if we are allowed to continue
                 if (null == zk.exists("/continue_" + counter, null)) {
                     CountDownLatch latch = new CountDownLatch(1);
-                    TestUtils.watchAndSignalCreation("/continue_" + counter, latch, zk);
+                    CommTestUtils.watchAndSignalCreation("/continue_" + counter, latch, zk);
                     latch.await();
                 } else {
                     zk.delete("/continue_" + counter, -1);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 9439e29..c40a050 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -8,7 +8,7 @@ import java.util.concurrent.CountDownLatch;
 import junit.framework.Assert;
 
 import org.apache.s4.core.App;
-import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -34,8 +34,8 @@ public class WordCountTest {
     
     @Before
     public void prepare() throws IOException, InterruptedException, KeeperException {
-        TestUtils.cleanupTmpDirs();
-        zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+        CommTestUtils.cleanupTmpDirs();
+        zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
 
     }
 
@@ -58,13 +58,13 @@ public class WordCountTest {
     @Test
     public void testSimple() throws Exception {
         
-        final ZooKeeper zk = TestUtils.createZkClient();
+        final ZooKeeper zk = CommTestUtils.createZkClient();
         
         App.main(new String[]{WordCountModule.class.getName(), WordCountApp.class.getName()});
         
 
         CountDownLatch signalTextProcessed = new CountDownLatch(1);
-        TestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed,
+        CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed,
                 zk);
         
         // add authorizations for processing
@@ -73,20 +73,20 @@ public class WordCountTest {
             zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
                     CreateMode.EPHEMERAL);
         }
-        TestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
-        TestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
-        TestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
+        CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
+        CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
+        CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
         signalTextProcessed.await();
-        File results = new File(TestUtils.DEFAULT_TEST_OUTPUT_DIR
+        File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR
                 + File.separator + "wordcount");
-        String s = TestUtils.readFile(results);
+        String s = CommTestUtils.readFile(results);
         Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
         
     }
 
     @After
     public void cleanup() throws IOException, InterruptedException {
-        TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+        CommTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
index 6e89e54..3db216b 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
@@ -14,7 +14,7 @@ import org.apache.s4.comm.tools.TaskSetup;
 import org.apache.s4.comm.topology.AssignmentFromZK;
 import org.apache.s4.comm.topology.ClusterNode;
 import org.apache.s4.core.App;
-import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.wordcount.WordCountApp;
 import org.apache.s4.wordcount.WordCountModule;
 import org.apache.zookeeper.CreateMode;
@@ -32,9 +32,9 @@ public class WordCountTestZk {
     @Before
     public void prepare() {
 
-        String dataDir = TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "data";
-        String logDir = TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "logs";
-        TestUtils.cleanupTmpDirs();
+        String dataDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "data";
+        String logDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "logs";
+        CommTestUtils.cleanupTmpDirs();
 
         IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
 
@@ -44,11 +44,11 @@ public class WordCountTestZk {
             }
         };
 
-        zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, TestUtils.ZK_PORT);
+        zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, CommTestUtils.ZK_PORT);
         zkServer.start();
 
         // zkClient = zkServer.getZkClient();
-        String zookeeperAddress = "localhost:" + TestUtils.ZK_PORT;
+        String zookeeperAddress = "localhost:" + CommTestUtils.ZK_PORT;
         zkClient = new ZkClient(zookeeperAddress, 10000, 10000);
 
         ZkClient zkClient2 = new ZkClient(zookeeperAddress, 10000, 10000);
@@ -81,23 +81,23 @@ public class WordCountTestZk {
     @Test
     public void test() throws Exception {
 
-        final ZooKeeper zk = TestUtils.createZkClient();
+        final ZooKeeper zk = CommTestUtils.createZkClient();
 
         App.main(new String[] { WordCountModuleZk.class.getName(), WordCountApp.class.getName() });
 
         CountDownLatch signalTextProcessed = new CountDownLatch(1);
-        TestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
+        CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
 
         // add authorizations for processing
         for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
             zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
         }
-        TestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
-        TestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
-        TestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
+        CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
+        CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
+        CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
         signalTextProcessed.await();
-        File results = new File(TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
-        String s = TestUtils.readFile(results);
+        File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
+        String s = CommTestUtils.readFile(results);
         Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
 
     }