You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 17:13:00 UTC
git commit: s4-33:s4.fixtures refactored across s4-comm and s4-core
Updated Branches:
refs/heads/piper 8b642d182 -> 111959da9
s4-33:s4.fixtures refactored across s4-comm and s4-core
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/111959da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/111959da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/111959da
Branch: refs/heads/piper
Commit: 111959da9ee348424f3a21540691f206f4517283
Parents: 8b642d1
Author: Karthik Kambatla <kk...@cs.purdue.edu>
Authored: Thu Dec 22 03:30:53 2011 -0500
Committer: Matthieu Morel <mm...@apache.org>
Committed: Tue Jan 3 18:08:36 2012 +0100
----------------------------------------------------------------------
subprojects/s4-comm/s4-comm.gradle | 15 +-
.../java/org/apache/s4/fixtures/CommTestUtils.java | 439 ++++++++++++++
.../FileBasedClusterManagementTestModule.java | 78 +++
.../ZkBasedClusterManagementTestModule.java | 79 +++
subprojects/s4-core/s4-core.gradle | 1 +
.../test/java/org/apache/s4/core/TriggerTest.java | 16 +-
.../java/org/apache/s4/core/TriggerablePE.java | 4 +-
.../apache/s4/core/apploading/AppLoadingTest.java | 27 +-
.../org/apache/s4/core/apploading/SimpleApp.java | 4 +-
.../org/apache/s4/core/apploading/SimplePE.java | 4 +-
.../java/org/apache/s4/fixtures/CoreTestUtils.java | 24 +
.../FileBasedClusterManagementTestModule.java | 78 ---
.../java/org/apache/s4/fixtures/TestUtils.java | 451 ---------------
.../ZkBasedClusterManagementTestModule.java | 79 ---
.../org/apache/s4/wordcount/WordClassifierPE.java | 8 +-
.../org/apache/s4/wordcount/WordCountTest.java | 22 +-
.../apache/s4/wordcount/zk/WordCountTestZk.java | 26 +-
17 files changed, 690 insertions(+), 665 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-comm/s4-comm.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/s4-comm.gradle b/subprojects/s4-comm/s4-comm.gradle
index b266c9b..b88239c 100644
--- a/subprojects/s4-comm/s4-comm.gradle
+++ b/subprojects/s4-comm/s4-comm.gradle
@@ -20,11 +20,24 @@ dependencies {
compile project(":s4-base")
compile libraries.json
compile libraries.gson
- compile libraries.kryo
+ compile libraries.kryo
compile libraries.netty
compile libraries.zkclient
}
+task testJar(type: Jar) {
+ baseName = "test-${project.archivesBaseName}"
+ from sourceSets.test.classes
+}
+
+configurations {
+ tests
+}
+
+artifacts {
+ tests testJar
+}
+
test {
forkEvery=1
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
new file mode 100644
index 0000000..984fdfa
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
@@ -0,0 +1,439 @@
+package org.apache.s4.fixtures;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Contains static methods that can be used in tests for things such as: - files utilities: strings <-> files
+ * conversion, directory recursive delete etc... - starting local instances for zookeeper and bookkeeper - distributed
+ * latches through zookeeper - etc...
+ *
+ */
+public class CommTestUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(CommTestUtils.class);
+
+ public static final int ZK_PORT = 21810;
+ public static final int INITIAL_BOOKIE_PORT = 5000;
+ public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp");
+ public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
+ + "storage");
+ public static ServerSocket serverSocket;
+ static {
+ logger.info("Storage dir: " + DEFAULT_STORAGE_DIR);
+ }
+
+ protected static Process forkProcess(String mainClass, String... args) throws IOException, InterruptedException {
+ List<String> cmdList = new ArrayList<String>();
+ cmdList.add("java");
+ cmdList.add("-cp");
+ cmdList.add(System.getProperty("java.class.path"));
+ // cmdList.add("-Xdebug");
+ // cmdList.add("-Xnoagent");
+ //
+ // cmdList.add("-Xrunjdwp:transport=dt_socket,address=8788,server=y,suspend=n");
+
+ cmdList.add(mainClass);
+ for (String arg : args) {
+ cmdList.add(arg);
+ }
+
+ System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
+ ProcessBuilder pb = new ProcessBuilder(cmdList);
+
+ pb.directory(new File(System.getProperty("user.dir")));
+ pb.redirectErrorStream();
+ final Process process = pb.start();
+
+ // TODO some synchro with s4 platform ready state
+ Thread.sleep(2000);
+ try {
+ int exitValue = process.exitValue();
+ Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
+ } catch (IllegalThreadStateException ignored) {
+ }
+
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ String line;
+ try {
+ line = br.readLine();
+ while (line != null) {
+ System.out.println(line);
+ line = br.readLine();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }).start();
+
+ return process;
+ }
+
+ public static void killS4App(Process forkedApp) throws IOException, InterruptedException {
+ if (forkedApp != null) {
+ forkedApp.destroy();
+ }
+ }
+
+ public static void writeStringToFile(String s, File f) throws IOException {
+ if (f.exists()) {
+ if (!f.delete()) {
+ throw new RuntimeException("Cannot delete file " + f.getAbsolutePath());
+ }
+ }
+
+ FileWriter fw = null;
+ try {
+ if (!f.createNewFile()) {
+ throw new RuntimeException("Cannot create new file : " + f.getAbsolutePath());
+ }
+ fw = new FileWriter(f);
+
+ fw.write(s);
+ } catch (IOException e) {
+ throw (e);
+ } finally {
+ if (fw != null) {
+ try {
+ fw.close();
+ } catch (IOException e) {
+ throw (e);
+ }
+ }
+ }
+ }
+
+ public static String readFile(File f) throws IOException {
+ BufferedReader br = null;
+ try {
+ br = new BufferedReader(new FileReader(f));
+ StringBuilder sb = new StringBuilder();
+ String line = br.readLine();
+ while (line != null) {
+ sb.append(line);
+ line = br.readLine();
+ if (line != null) {
+ sb.append("\n");
+ }
+ }
+ return sb.toString();
+ } finally {
+ if (br != null) {
+ try {
+ br.close();
+ } catch (IOException e) {
+ throw (e);
+ }
+ }
+ }
+
+ }
+
+ public static NIOServerCnxn.Factory startZookeeperServer() throws IOException, InterruptedException,
+ KeeperException {
+
+ final File zkDataDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
+ + "zookeeper" + File.separator + "data");
+ if (zkDataDir.exists()) {
+ CommTestUtils.deleteDirectoryContents(zkDataDir);
+ } else {
+ zkDataDir.mkdirs();
+ }
+
+ ZooKeeperServer zks = new ZooKeeperServer(zkDataDir, zkDataDir, 3000);
+ NIOServerCnxn.Factory nioZookeeperConnectionFactory = new NIOServerCnxn.Factory(new InetSocketAddress(ZK_PORT));
+ nioZookeeperConnectionFactory.startup(zks);
+ Assert.assertTrue("waiting for server being up", waitForServerUp("localhost", ZK_PORT, 4000));
+ return nioZookeeperConnectionFactory;
+
+ }
+
+ public static void stopZookeeperServer(NIOServerCnxn.Factory f) throws IOException, InterruptedException {
+ if (f != null) {
+ f.shutdown();
+ Assert.assertTrue("waiting for server down", waitForServerDown("localhost", ZK_PORT, 3000));
+ }
+ }
+
+ public static void deleteDirectoryContents(File dir) {
+ File[] files = dir.listFiles();
+ for (File file : files) {
+ if (file.isDirectory()) {
+ deleteDirectoryContents(file);
+ }
+ if (!file.delete()) {
+ throw new RuntimeException("could not delete : " + file);
+ }
+ }
+ }
+
+ public static String readFileAsString(File f) throws IOException {
+ FileReader fr = new FileReader(f);
+ StringBuilder sb = new StringBuilder("");
+ BufferedReader br = new BufferedReader(fr);
+ String line = br.readLine();
+ while (line != null) {
+ sb.append(line);
+ line = br.readLine();
+ if (line != null) {
+ sb.append("\n");
+ }
+ }
+ return sb.toString();
+
+ }
+
+ // TODO factor this code (see BasicFSStateStorage) - or use commons io or
+ // guava
+ public static byte[] readFileAsByteArray(File file) throws Exception {
+ FileInputStream in = null;
+ try {
+ in = new FileInputStream(file);
+
+ long length = file.length();
+
+ /*
+ * Arrays can only be created using int types, so ensure that the file size is not too big before we
+ * downcast to create the array.
+ */
+ if (length > Integer.MAX_VALUE) {
+ throw new IOException("Error file is too large: " + file.getName() + " " + length + " bytes");
+ }
+
+ byte[] buffer = new byte[(int) length];
+ int offSet = 0;
+ int numRead = 0;
+
+ while (offSet < buffer.length && (numRead = in.read(buffer, offSet, buffer.length - offSet)) >= 0) {
+ offSet += numRead;
+ }
+
+ if (offSet < buffer.length) {
+ throw new IOException("Error, could not read entire file: " + file.getName() + " " + offSet + "/"
+ + buffer.length + " bytes read");
+ }
+
+ in.close();
+ return buffer;
+
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ }
+ }
+
+ public static ZooKeeper createZkClient() throws IOException {
+ final ZooKeeper zk = new ZooKeeper("localhost:" + ZK_PORT, 4000, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ }
+ });
+ return zk;
+ }
+
+ public static void watchAndSignalCreation(String path, final CountDownLatch latch, final ZooKeeper zk)
+ throws KeeperException, InterruptedException {
+
+ // by default delete existing nodes with same path
+ watchAndSignalCreation(path, latch, zk, false);
+ }
+
+ public static void watchAndSignalCreation(String path, final CountDownLatch latch, final ZooKeeper zk,
+ boolean deleteIfExists) throws KeeperException, InterruptedException {
+
+ if (zk.exists(path, false) != null) {
+ if (deleteIfExists) {
+ zk.delete(path, -1);
+ } else {
+ latch.countDown();
+ }
+ }
+ zk.exists(path, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (EventType.NodeCreated.equals(event.getType())) {
+ latch.countDown();
+ }
+ }
+ });
+ }
+
+ public static void watchAndSignalChangedChildren(String path, final CountDownLatch latch, final ZooKeeper zk)
+ throws KeeperException, InterruptedException {
+
+ zk.getChildren(path, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (EventType.NodeChildrenChanged.equals(event.getType())) {
+ latch.countDown();
+ }
+ }
+ });
+ }
+
+ // from zookeeper's codebase
+ public static boolean waitForServerUp(String host, int port, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ // if there are multiple hostports, just take the first one
+ String result = send4LetterWord(host, port, "stat");
+ if (result.startsWith("Zookeeper version:")) {
+ return true;
+ }
+ } catch (IOException ignored) {
+ // ignore as this is expected
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+ // from zookeeper's codebase
+ public static String send4LetterWord(String host, int port, String cmd) throws IOException {
+ Socket sock = new Socket(host, port);
+ BufferedReader reader = null;
+ try {
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write(cmd.getBytes());
+ outstream.flush();
+ // this replicates NC - close the output stream before reading
+ sock.shutdownOutput();
+
+ reader = new BufferedReader(new InputStreamReader(sock.getInputStream()));
+ StringBuilder sb = new StringBuilder();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ sb.append(line + "\n");
+ }
+ return sb.toString();
+ } finally {
+ sock.close();
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+
+ // from zookeeper's codebase
+ public static boolean waitForServerDown(String host, int port, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ send4LetterWord(host, port, "stat");
+ } catch (IOException e) {
+ return true;
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+ public static void cleanupTmpDirs() {
+ if (CommTestUtils.DEFAULT_TEST_OUTPUT_DIR.exists()) {
+ deleteDirectoryContents(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR);
+ }
+ CommTestUtils.DEFAULT_STORAGE_DIR.mkdirs();
+
+ }
+
+ public static void stopSocketAdapter() throws IOException {
+ if (serverSocket != null) {
+ serverSocket.close();
+ }
+ }
+
+ /**
+ * gradle and eclipse have different directories for output files This is justified here
+ * http://gradle.1045684.n5.nabble.com/Changing-default-IDE-output-directories-td3335478.html#a3337433
+ *
+ * A consequence is that for tests to reference compiled files, we need to resolve the corresponding directory at
+ * runtime.
+ *
+ * This is what this method does
+ *
+ * @return directory containing the compiled test classes for this project and execution environment.
+ */
+ public static File findDirForCompiledTestClasses() {
+ String userDir = System.getProperty("user.dir");
+ String classpath = System.getProperty("java.class.path");
+ System.out.println(userDir);
+ System.out.println(classpath);
+ if (classpath.contains(userDir + "/bin")) {
+ // eclipse classpath
+ return new File(userDir + "/bin");
+ } else if (classpath.contains(userDir + "/build/classes/test")) {
+ // gradle classpath
+ return new File(userDir + "/build/classes/test");
+ } else {
+ // TODO other IDEs
+ throw new RuntimeException("Cannot find path for compiled test classes");
+ }
+
+ }
+
+ public static void injectIntoStringSocketAdapter(String string) throws IOException {
+ Socket socket = null;
+ PrintWriter writer = null;
+ try {
+ socket = new Socket("localhost", 12000);
+ writer = new PrintWriter(socket.getOutputStream(), true);
+ writer.println(string);
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.exit(-1);
+ } finally {
+ if (socket != null) {
+ socket.close();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
new file mode 100644
index 0000000..b189350
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
@@ -0,0 +1,78 @@
+package org.apache.s4.fixtures;
+
+import java.io.InputStream;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+public abstract class FileBasedClusterManagementTestModule<T> extends AbstractModule {
+
+ protected PropertiesConfiguration config = null;
+ private final Class<?> appClass;
+
+ protected FileBasedClusterManagementTestModule() {
+ // infer actual app class through "super type tokens" (this simple code
+ // assumes actual module class is a direct subclass from this one)
+ ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
+ Type[] fieldArgTypes = pt.getActualTypeArguments();
+ this.appClass = (Class<?>) fieldArgTypes[0];
+ }
+
+ private void loadProperties(Binder binder) {
+
+ try {
+ InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
+ config = new PropertiesConfiguration();
+ config.load(is);
+ config.setProperty("cluster.lock_dir",
+ config.getString("cluster.lock_dir").replace("{user.dir}", System.getProperty("java.io.tmpdir")));
+ System.out.println(ConfigurationUtils.toString(config));
+ // TODO - validate properties.
+
+ /* Make all properties injectable. Do we need this? */
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ protected void configure() {
+ if (config == null) {
+ loadProperties(binder());
+ }
+ bind(appClass);
+ bind(Cluster.class);
+ bind(Hasher.class).to(DefaultHasher.class);
+ bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+ bind(Assignment.class).to(AssignmentFromFile.class);
+ bind(Topology.class).to(TopologyFromFile.class);
+ bind(Emitter.class).to(UDPEmitter.class);
+ bind(Listener.class).to(UDPListener.class);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
new file mode 100644
index 0000000..be3739b
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
@@ -0,0 +1,79 @@
+package org.apache.s4.fixtures;
+
+import java.io.InputStream;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromZK;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+// also uses netty
+public class ZkBasedClusterManagementTestModule<T> extends AbstractModule {
+
+ protected PropertiesConfiguration config = null;
+ private final Class<?> appClass;
+
+ protected ZkBasedClusterManagementTestModule() {
+ // infer actual app class through "super type tokens" (this simple code
+ // assumes actual module class is a direct subclass from this one)
+ ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
+ Type[] fieldArgTypes = pt.getActualTypeArguments();
+ this.appClass = (Class<?>) fieldArgTypes[0];
+ }
+
+ private void loadProperties(Binder binder) {
+
+ try {
+ InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
+ config = new PropertiesConfiguration();
+ config.load(is);
+ config.setProperty("cluster.zk_address",
+ config.getString("cluster.zk_address").replaceFirst("\\w+:\\d+", "localhost:" + CommTestUtils.ZK_PORT));
+ System.out.println(ConfigurationUtils.toString(config));
+ // TODO - validate properties.
+
+ /* Make all properties injectable. Do we need this? */
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ protected void configure() {
+ if (config == null) {
+ loadProperties(binder());
+ }
+ bind(appClass);
+ bind(Cluster.class);
+ bind(Hasher.class).to(DefaultHasher.class);
+ bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+ bind(Assignment.class).to(AssignmentFromZK.class);
+ bind(Topology.class).to(TopologyFromZK.class);
+ bind(Emitter.class).to(TCPEmitter.class);
+ bind(Listener.class).to(TCPListener.class);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/s4-core.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/s4-core.gradle b/subprojects/s4-core/s4-core.gradle
index fe5466c..1bdbb6c 100644
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@ -19,6 +19,7 @@ description = 'The S4 core platform.'
dependencies {
compile project(":s4-base")
compile project(":s4-comm")
+ testCompile project(path: ':s4-comm', configuration: 'tests')
}
test {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
index 6d55029..90718ab 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
@@ -7,7 +7,7 @@ import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
-import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.fixtures.CommTestUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;
@@ -35,8 +35,8 @@ public abstract class TriggerTest {
@Before
public void prepare() throws IOException, InterruptedException, KeeperException {
- TestUtils.cleanupTmpDirs();
- zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+ CommTestUtils.cleanupTmpDirs();
+ zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
}
@After
@@ -45,11 +45,11 @@ public abstract class TriggerTest {
app.close();
app = null;
}
- TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+ CommTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
}
protected CountDownLatch createTriggerAppAndSendEvent() throws IOException, KeeperException, InterruptedException {
- final ZooKeeper zk = TestUtils.createZkClient();
+ final ZooKeeper zk = CommTestUtils.createZkClient();
Injector injector = Guice.createInjector(new TriggeredModule());
app = injector.getInstance(TriggeredApp.class);
app.init();
@@ -58,12 +58,12 @@ public abstract class TriggerTest {
String time1 = String.valueOf(System.currentTimeMillis());
CountDownLatch signalEvent1Processed = new CountDownLatch(1);
- TestUtils.watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, zk);
+ CommTestUtils.watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, zk);
CountDownLatch signalEvent1Triggered = new CountDownLatch(1);
- TestUtils.watchAndSignalCreation("/onTrigger[StringEvent]@" + time1, signalEvent1Triggered, zk);
+ CommTestUtils.watchAndSignalCreation("/onTrigger[StringEvent]@" + time1, signalEvent1Triggered, zk);
- TestUtils.injectIntoStringSocketAdapter(time1);
+ CommTestUtils.injectIntoStringSocketAdapter(time1);
// check event processed
Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerablePE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerablePE.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerablePE.java
index 3a35a76..d8008f4 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerablePE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerablePE.java
@@ -2,7 +2,7 @@ package org.apache.s4.core;
import java.io.IOException;
-import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.wordcount.StringEvent;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -38,7 +38,7 @@ public class TriggerablePE extends ProcessingElement implements Watcher {
public void onCreate() {
if (zk == null) {
try {
- zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
+ zk = new ZooKeeper("localhost:" + CommTestUtils.ZK_PORT, 4000, this);
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/AppLoadingTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/AppLoadingTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/AppLoadingTest.java
index fec8fe9..ee8abdf 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/AppLoadingTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/AppLoadingTest.java
@@ -14,7 +14,7 @@ import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import org.apache.s4.core.Server;
-import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.fixtures.CoreTestUtils;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;
import org.junit.After;
@@ -23,7 +23,6 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
-
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
@@ -40,9 +39,9 @@ public class AppLoadingTest {
@Before
public void prepare() throws Exception {
- TestUtils.cleanupTmpDirs();
- zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
- final ZooKeeper zk = TestUtils.createZkClient();
+ CoreTestUtils.cleanupTmpDirs();
+ zookeeperServerConnectionFactory = CoreTestUtils.startZookeeperServer();
+ final ZooKeeper zk = CoreTestUtils.createZkClient();
try {
zk.delete("/simpleAppCreated", -1);
} catch (Exception ignored) {
@@ -53,8 +52,8 @@ public class AppLoadingTest {
@After
public void cleanup() throws Exception {
- TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
- TestUtils.killS4App(forkedApp);
+ CoreTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+ CoreTestUtils.killS4App(forkedApp);
}
@Ignore("fix paths")
@@ -68,7 +67,7 @@ public class AppLoadingTest {
generateS4RFromDirectoryContents(rootAppDir, appFilesDir, "counterExample",
"org.apache.s4.example.counter.MyApp");
- forkedApp = TestUtils.forkS4Node();
+ forkedApp = CoreTestUtils.forkS4Node();
Thread.sleep(15000);
}
@@ -129,9 +128,9 @@ public class AppLoadingTest {
// TODO fix paths
- final ZooKeeper zk = TestUtils.createZkClient();
+ final ZooKeeper zk = CoreTestUtils.createZkClient();
- File rootAppDir = TestUtils.findDirForCompiledTestClasses();
+ File rootAppDir = CoreTestUtils.findDirForCompiledTestClasses();
File appFilesDir = new File(rootAppDir, "test/s4/core/apploading");
// 1. create app jar and place it in tmp/s4-apps
@@ -139,22 +138,22 @@ public class AppLoadingTest {
CountDownLatch signalAppStarted = new CountDownLatch(1);
// 2. start s4 node and check results
- forkedApp = TestUtils.forkS4Node();
+ forkedApp = CoreTestUtils.forkS4Node();
// TODO wait for ready state (zk node available)
Thread.sleep(5000);
// note: make sure we don't delete existing znode if it was already created
- TestUtils.watchAndSignalCreation("/simpleAppCreated", signalAppStarted, zk, false);
+ CoreTestUtils.watchAndSignalCreation("/simpleAppCreated", signalAppStarted, zk, false);
Assert.assertTrue(signalAppStarted.await(20, TimeUnit.SECONDS));
String time1 = String.valueOf(System.currentTimeMillis());
CountDownLatch signalEvent1Processed = new CountDownLatch(1);
- TestUtils.watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, zk);
+ CoreTestUtils.watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, zk);
- TestUtils.injectIntoStringSocketAdapter(time1);
+ CoreTestUtils.injectIntoStringSocketAdapter(time1);
// check event processed
Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java
index 08831cb..f01c211 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimpleApp.java
@@ -5,7 +5,7 @@ import java.io.IOException;
import org.apache.s4.core.App;
import org.apache.s4.core.Stream;
import org.apache.s4.fixtures.SocketAdapter;
-import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.wordcount.SentenceKeyFinder;
import org.apache.s4.wordcount.StringEvent;
import org.apache.zookeeper.CreateMode;
@@ -21,7 +21,7 @@ public class SimpleApp extends App {
@Override
protected void onStart() {
try {
- final ZooKeeper zk = TestUtils.createZkClient();
+ final ZooKeeper zk = CommTestUtils.createZkClient();
zk.create("/simpleAppCreated", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.close();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimplePE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimplePE.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimplePE.java
index e6a9976..a8b7831 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimplePE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/apploading/SimplePE.java
@@ -4,7 +4,7 @@ import java.io.IOException;
import org.apache.s4.core.App;
import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.wordcount.StringEvent;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -44,7 +44,7 @@ public class SimplePE extends ProcessingElement implements Watcher {
protected void onCreate() {
if (zk == null) {
try {
- zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
+ zk = new ZooKeeper("localhost:" + CommTestUtils.ZK_PORT, 4000, this);
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
new file mode 100644
index 0000000..8dc2159
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -0,0 +1,24 @@
+package org.apache.s4.fixtures;
+
+import java.io.IOException;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.Main;
+import org.apache.s4.fixtures.CommTestUtils;
+
+/**
+ * Contains static methods that can be used in tests for things such as: - files utilities: strings <-> files
+ * conversion, directory recursive delete etc... - starting local instances for zookeeper and bookkeeper - distributed
+ * latches through zookeeper - etc...
+ *
+ */
+public class CoreTestUtils extends CommTestUtils {
+
+ public static Process forkS4App(Class<?> moduleClass, Class<?> appClass) throws IOException, InterruptedException {
+ return forkProcess(App.class.getName(), moduleClass.getName(), appClass.getName());
+ }
+
+ public static Process forkS4Node() throws IOException, InterruptedException {
+ return forkProcess(Main.class.getName(), new String[] {});
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
deleted file mode 100644
index b189350..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/FileBasedClusterManagementTestModule.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package org.apache.s4.fixtures;
-
-import java.io.InputStream;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-public abstract class FileBasedClusterManagementTestModule<T> extends AbstractModule {
-
- protected PropertiesConfiguration config = null;
- private final Class<?> appClass;
-
- protected FileBasedClusterManagementTestModule() {
- // infer actual app class through "super type tokens" (this simple code
- // assumes actual module class is a direct subclass from this one)
- ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
- Type[] fieldArgTypes = pt.getActualTypeArguments();
- this.appClass = (Class<?>) fieldArgTypes[0];
- }
-
- private void loadProperties(Binder binder) {
-
- try {
- InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
- config = new PropertiesConfiguration();
- config.load(is);
- config.setProperty("cluster.lock_dir",
- config.getString("cluster.lock_dir").replace("{user.dir}", System.getProperty("java.io.tmpdir")));
- System.out.println(ConfigurationUtils.toString(config));
- // TODO - validate properties.
-
- /* Make all properties injectable. Do we need this? */
- Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
-
- @Override
- protected void configure() {
- if (config == null) {
- loadProperties(binder());
- }
- bind(appClass);
- bind(Cluster.class);
- bind(Hasher.class).to(DefaultHasher.class);
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
- bind(Assignment.class).to(AssignmentFromFile.class);
- bind(Topology.class).to(TopologyFromFile.class);
- bind(Emitter.class).to(UDPEmitter.class);
- bind(Listener.class).to(UDPListener.class);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/TestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/TestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/TestUtils.java
deleted file mode 100644
index 91690bd..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/TestUtils.java
+++ /dev/null
@@ -1,451 +0,0 @@
-package org.apache.s4.fixtures;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import junit.framework.Assert;
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.Main;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.server.NIOServerCnxn;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Contains static methods that can be used in tests for things such as: - files utilities: strings <-> files
- * conversion, directory recursive delete etc... - starting local instances for zookeeper and bookkeeper - distributed
- * latches through zookeeper - etc...
- *
- */
-public class TestUtils {
-
- private static final Logger logger = LoggerFactory.getLogger(TestUtils.class);
-
- public static final int ZK_PORT = 21810;
- public static final int INITIAL_BOOKIE_PORT = 5000;
- public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp");
- public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
- + "storage");
- public static ServerSocket serverSocket;
- static {
- logger.info("Storage dir: " + DEFAULT_STORAGE_DIR);
- }
-
- public static Process forkS4App(Class<?> moduleClass, Class<?> appClass) throws IOException, InterruptedException {
- return forkProcess(App.class.getName(), moduleClass.getName(), appClass.getName());
- }
-
- public static Process forkS4Node() throws IOException, InterruptedException {
- return forkProcess(Main.class.getName(), new String[] {});
- }
-
- private static Process forkProcess(String mainClass, String... args) throws IOException, InterruptedException {
- List<String> cmdList = new ArrayList<String>();
- cmdList.add("java");
- cmdList.add("-cp");
- cmdList.add(System.getProperty("java.class.path"));
- // cmdList.add("-Xdebug");
- // cmdList.add("-Xnoagent");
- //
- // cmdList.add("-Xrunjdwp:transport=dt_socket,address=8788,server=y,suspend=n");
-
- cmdList.add(mainClass);
- for (String arg : args) {
- cmdList.add(arg);
- }
-
- System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
- ProcessBuilder pb = new ProcessBuilder(cmdList);
-
- pb.directory(new File(System.getProperty("user.dir")));
- pb.redirectErrorStream();
- final Process process = pb.start();
-
- // TODO some synchro with s4 platform ready state
- Thread.sleep(2000);
- try {
- int exitValue = process.exitValue();
- Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
- } catch (IllegalThreadStateException ignored) {
- }
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
- String line;
- try {
- line = br.readLine();
- while (line != null) {
- System.out.println(line);
- line = br.readLine();
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }).start();
-
- return process;
- }
-
- public static void killS4App(Process forkedApp) throws IOException, InterruptedException {
- if (forkedApp != null) {
- forkedApp.destroy();
- }
- }
-
- public static void writeStringToFile(String s, File f) throws IOException {
- if (f.exists()) {
- if (!f.delete()) {
- throw new RuntimeException("Cannot delete file " + f.getAbsolutePath());
- }
- }
-
- FileWriter fw = null;
- try {
- if (!f.createNewFile()) {
- throw new RuntimeException("Cannot create new file : " + f.getAbsolutePath());
- }
- fw = new FileWriter(f);
-
- fw.write(s);
- } catch (IOException e) {
- throw (e);
- } finally {
- if (fw != null) {
- try {
- fw.close();
- } catch (IOException e) {
- throw (e);
- }
- }
- }
- }
-
- public static String readFile(File f) throws IOException {
- BufferedReader br = null;
- try {
- br = new BufferedReader(new FileReader(f));
- StringBuilder sb = new StringBuilder();
- String line = br.readLine();
- while (line != null) {
- sb.append(line);
- line = br.readLine();
- if (line != null) {
- sb.append("\n");
- }
- }
- return sb.toString();
- } finally {
- if (br != null) {
- try {
- br.close();
- } catch (IOException e) {
- throw (e);
- }
- }
- }
-
- }
-
- public static NIOServerCnxn.Factory startZookeeperServer() throws IOException, InterruptedException,
- KeeperException {
-
- List<String> cmdList = new ArrayList<String>();
- final File zkDataDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
- + "zookeeper" + File.separator + "data");
- if (zkDataDir.exists()) {
- TestUtils.deleteDirectoryContents(zkDataDir);
- } else {
- zkDataDir.mkdirs();
- }
-
- ZooKeeperServer zks = new ZooKeeperServer(zkDataDir, zkDataDir, 3000);
- NIOServerCnxn.Factory nioZookeeperConnectionFactory = new NIOServerCnxn.Factory(new InetSocketAddress(ZK_PORT));
- nioZookeeperConnectionFactory.startup(zks);
- Assert.assertTrue("waiting for server being up", waitForServerUp("localhost", ZK_PORT, 4000));
- return nioZookeeperConnectionFactory;
-
- }
-
- public static void stopZookeeperServer(NIOServerCnxn.Factory f) throws IOException, InterruptedException {
- if (f != null) {
- f.shutdown();
- Assert.assertTrue("waiting for server down", waitForServerDown("localhost", ZK_PORT, 3000));
- }
- }
-
- public static void deleteDirectoryContents(File dir) {
- File[] files = dir.listFiles();
- for (File file : files) {
- if (file.isDirectory()) {
- deleteDirectoryContents(file);
- }
- if (!file.delete()) {
- throw new RuntimeException("could not delete : " + file);
- }
- }
- }
-
- public static String readFileAsString(File f) throws IOException {
- FileReader fr = new FileReader(f);
- StringBuilder sb = new StringBuilder("");
- BufferedReader br = new BufferedReader(fr);
- String line = br.readLine();
- while (line != null) {
- sb.append(line);
- line = br.readLine();
- if (line != null) {
- sb.append("\n");
- }
- }
- return sb.toString();
-
- }
-
- // TODO factor this code (see BasicFSStateStorage) - or use commons io or
- // guava
- public static byte[] readFileAsByteArray(File file) throws Exception {
- FileInputStream in = null;
- try {
- in = new FileInputStream(file);
-
- long length = file.length();
-
- /*
- * Arrays can only be created using int types, so ensure that the file size is not too big before we
- * downcast to create the array.
- */
- if (length > Integer.MAX_VALUE) {
- throw new IOException("Error file is too large: " + file.getName() + " " + length + " bytes");
- }
-
- byte[] buffer = new byte[(int) length];
- int offSet = 0;
- int numRead = 0;
-
- while (offSet < buffer.length && (numRead = in.read(buffer, offSet, buffer.length - offSet)) >= 0) {
- offSet += numRead;
- }
-
- if (offSet < buffer.length) {
- throw new IOException("Error, could not read entire file: " + file.getName() + " " + offSet + "/"
- + buffer.length + " bytes read");
- }
-
- in.close();
- return buffer;
-
- } finally {
- if (in != null) {
- in.close();
- }
- }
- }
-
- public static ZooKeeper createZkClient() throws IOException {
- final ZooKeeper zk = new ZooKeeper("localhost:" + ZK_PORT, 4000, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- }
- });
- return zk;
- }
-
- public static void watchAndSignalCreation(String path, final CountDownLatch latch, final ZooKeeper zk)
- throws KeeperException, InterruptedException {
-
- // by default delete existing nodes with same path
- watchAndSignalCreation(path, latch, zk, false);
- }
-
- public static void watchAndSignalCreation(String path, final CountDownLatch latch, final ZooKeeper zk,
- boolean deleteIfExists) throws KeeperException, InterruptedException {
-
- if (zk.exists(path, false) != null) {
- if (deleteIfExists) {
- zk.delete(path, -1);
- } else {
- latch.countDown();
- }
- }
- zk.exists(path, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- if (EventType.NodeCreated.equals(event.getType())) {
- latch.countDown();
- }
- }
- });
- }
-
- public static void watchAndSignalChangedChildren(String path, final CountDownLatch latch, final ZooKeeper zk)
- throws KeeperException, InterruptedException {
-
- zk.getChildren(path, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- if (EventType.NodeChildrenChanged.equals(event.getType())) {
- latch.countDown();
- }
- }
- });
- }
-
- // from zookeeper's codebase
- public static boolean waitForServerUp(String host, int port, long timeout) {
- long start = System.currentTimeMillis();
- while (true) {
- try {
- // if there are multiple hostports, just take the first one
- String result = send4LetterWord(host, port, "stat");
- if (result.startsWith("Zookeeper version:")) {
- return true;
- }
- } catch (IOException ignored) {
- // ignore as this is expected
- }
-
- if (System.currentTimeMillis() > start + timeout) {
- break;
- }
- try {
- Thread.sleep(250);
- } catch (InterruptedException e) {
- // ignore
- }
- }
- return false;
- }
-
- // from zookeeper's codebase
- public static String send4LetterWord(String host, int port, String cmd) throws IOException {
- Socket sock = new Socket(host, port);
- BufferedReader reader = null;
- try {
- OutputStream outstream = sock.getOutputStream();
- outstream.write(cmd.getBytes());
- outstream.flush();
- // this replicates NC - close the output stream before reading
- sock.shutdownOutput();
-
- reader = new BufferedReader(new InputStreamReader(sock.getInputStream()));
- StringBuilder sb = new StringBuilder();
- String line;
- while ((line = reader.readLine()) != null) {
- sb.append(line + "\n");
- }
- return sb.toString();
- } finally {
- sock.close();
- if (reader != null) {
- reader.close();
- }
- }
- }
-
- // from zookeeper's codebase
- public static boolean waitForServerDown(String host, int port, long timeout) {
- long start = System.currentTimeMillis();
- while (true) {
- try {
- send4LetterWord(host, port, "stat");
- } catch (IOException e) {
- return true;
- }
-
- if (System.currentTimeMillis() > start + timeout) {
- break;
- }
- try {
- Thread.sleep(250);
- } catch (InterruptedException e) {
- // ignore
- }
- }
- return false;
- }
-
- public static void cleanupTmpDirs() {
- if (TestUtils.DEFAULT_TEST_OUTPUT_DIR.exists()) {
- deleteDirectoryContents(TestUtils.DEFAULT_TEST_OUTPUT_DIR);
- }
- TestUtils.DEFAULT_STORAGE_DIR.mkdirs();
-
- }
-
- public static void stopSocketAdapter() throws IOException {
- if (serverSocket != null) {
- serverSocket.close();
- }
- }
-
- /**
- * gradle and eclipse have different directories for output files This is justified here
- * http://gradle.1045684.n5.nabble.com/Changing-default-IDE-output-directories-td3335478.html#a3337433
- *
- * A consequence is that for tests to reference compiled files, we need to resolve the corresponding directory at
- * runtime.
- *
- * This is what this method does
- *
- * @return directory containing the compiled test classes for this project and execution environment.
- */
- public static File findDirForCompiledTestClasses() {
- String userDir = System.getProperty("user.dir");
- String classpath = System.getProperty("java.class.path");
- System.out.println(userDir);
- System.out.println(classpath);
- if (classpath.contains(userDir + "/bin")) {
- // eclipse classpath
- return new File(userDir + "/bin");
- } else if (classpath.contains(userDir + "/build/classes/test")) {
- // gradle classpath
- return new File(userDir + "/build/classes/test");
- } else {
- // TODO other IDEs
- throw new RuntimeException("Cannot find path for compiled test classes");
- }
-
- }
-
- public static void injectIntoStringSocketAdapter(String string) throws IOException {
- Socket socket = null;
- PrintWriter writer = null;
- try {
- socket = new Socket("localhost", 12000);
- writer = new PrintWriter(socket.getOutputStream(), true);
- writer.println(string);
- } catch (IOException e) {
- e.printStackTrace();
- System.exit(-1);
- } finally {
- if (socket != null) {
- socket.close();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
deleted file mode 100644
index f5959a2..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedClusterManagementTestModule.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package org.apache.s4.fixtures;
-
-import java.io.InputStream;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.tcp.TCPListener;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromZK;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-// also uses netty
-public class ZkBasedClusterManagementTestModule<T> extends AbstractModule {
-
- protected PropertiesConfiguration config = null;
- private final Class<?> appClass;
-
- protected ZkBasedClusterManagementTestModule() {
- // infer actual app class through "super type tokens" (this simple code
- // assumes actual module class is a direct subclass from this one)
- ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
- Type[] fieldArgTypes = pt.getActualTypeArguments();
- this.appClass = (Class<?>) fieldArgTypes[0];
- }
-
- private void loadProperties(Binder binder) {
-
- try {
- InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
- config = new PropertiesConfiguration();
- config.load(is);
- config.setProperty("cluster.zk_address",
- config.getString("cluster.zk_address").replaceFirst("\\w+:\\d+", "localhost:" + TestUtils.ZK_PORT));
- System.out.println(ConfigurationUtils.toString(config));
- // TODO - validate properties.
-
- /* Make all properties injectable. Do we need this? */
- Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
-
- @Override
- protected void configure() {
- if (config == null) {
- loadProperties(binder());
- }
- bind(appClass);
- bind(Cluster.class);
- bind(Hasher.class).to(DefaultHasher.class);
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
- bind(Assignment.class).to(AssignmentFromZK.class);
- bind(Topology.class).to(TopologyFromZK.class);
- bind(Emitter.class).to(TCPEmitter.class);
- bind(Listener.class).to(TCPListener.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
index 48f5b23..820dbe5 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
@@ -11,7 +11,7 @@ import java.util.concurrent.CountDownLatch;
import org.apache.log4j.Logger;
import org.apache.s4.core.App;
import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.fixtures.CommTestUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -52,7 +52,7 @@ public class WordClassifierPE extends ProcessingElement implements Watcher {
}
++counter;
if (counter == WordCountTest.TOTAL_WORDS) {
- File results = new File(TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
+ File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
if (results.exists()) {
if (!results.delete()) {
throw new RuntimeException("cannot delete results file");
@@ -63,7 +63,7 @@ public class WordClassifierPE extends ProcessingElement implements Watcher {
for (Entry<String, Integer> entry : entrySet) {
sb.append(entry.getKey() + "=" + entry.getValue() + ";");
}
- TestUtils.writeStringToFile(sb.toString(), results);
+ CommTestUtils.writeStringToFile(sb.toString(), results);
zk.create("/textProcessed", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
@@ -78,7 +78,7 @@ public class WordClassifierPE extends ProcessingElement implements Watcher {
// check if we are allowed to continue
if (null == zk.exists("/continue_" + counter, null)) {
CountDownLatch latch = new CountDownLatch(1);
- TestUtils.watchAndSignalCreation("/continue_" + counter, latch, zk);
+ CommTestUtils.watchAndSignalCreation("/continue_" + counter, latch, zk);
latch.await();
} else {
zk.delete("/continue_" + counter, -1);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 9439e29..c40a050 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -8,7 +8,7 @@ import java.util.concurrent.CountDownLatch;
import junit.framework.Assert;
import org.apache.s4.core.App;
-import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.fixtures.CommTestUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
@@ -34,8 +34,8 @@ public class WordCountTest {
@Before
public void prepare() throws IOException, InterruptedException, KeeperException {
- TestUtils.cleanupTmpDirs();
- zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+ CommTestUtils.cleanupTmpDirs();
+ zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
}
@@ -58,13 +58,13 @@ public class WordCountTest {
@Test
public void testSimple() throws Exception {
- final ZooKeeper zk = TestUtils.createZkClient();
+ final ZooKeeper zk = CommTestUtils.createZkClient();
App.main(new String[]{WordCountModule.class.getName(), WordCountApp.class.getName()});
CountDownLatch signalTextProcessed = new CountDownLatch(1);
- TestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed,
+ CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed,
zk);
// add authorizations for processing
@@ -73,20 +73,20 @@ public class WordCountTest {
zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
}
- TestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
- TestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
- TestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
+ CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
+ CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
+ CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
signalTextProcessed.await();
- File results = new File(TestUtils.DEFAULT_TEST_OUTPUT_DIR
+ File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR
+ File.separator + "wordcount");
- String s = TestUtils.readFile(results);
+ String s = CommTestUtils.readFile(results);
Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
}
@After
public void cleanup() throws IOException, InterruptedException {
- TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+ CommTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/111959da/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
index 6e89e54..3db216b 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
@@ -14,7 +14,7 @@ import org.apache.s4.comm.tools.TaskSetup;
import org.apache.s4.comm.topology.AssignmentFromZK;
import org.apache.s4.comm.topology.ClusterNode;
import org.apache.s4.core.App;
-import org.apache.s4.fixtures.TestUtils;
+import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.wordcount.WordCountApp;
import org.apache.s4.wordcount.WordCountModule;
import org.apache.zookeeper.CreateMode;
@@ -32,9 +32,9 @@ public class WordCountTestZk {
@Before
public void prepare() {
- String dataDir = TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "data";
- String logDir = TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "logs";
- TestUtils.cleanupTmpDirs();
+ String dataDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "data";
+ String logDir = CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "logs";
+ CommTestUtils.cleanupTmpDirs();
IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
@@ -44,11 +44,11 @@ public class WordCountTestZk {
}
};
- zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, TestUtils.ZK_PORT);
+ zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, CommTestUtils.ZK_PORT);
zkServer.start();
// zkClient = zkServer.getZkClient();
- String zookeeperAddress = "localhost:" + TestUtils.ZK_PORT;
+ String zookeeperAddress = "localhost:" + CommTestUtils.ZK_PORT;
zkClient = new ZkClient(zookeeperAddress, 10000, 10000);
ZkClient zkClient2 = new ZkClient(zookeeperAddress, 10000, 10000);
@@ -81,23 +81,23 @@ public class WordCountTestZk {
@Test
public void test() throws Exception {
- final ZooKeeper zk = TestUtils.createZkClient();
+ final ZooKeeper zk = CommTestUtils.createZkClient();
App.main(new String[] { WordCountModuleZk.class.getName(), WordCountApp.class.getName() });
CountDownLatch signalTextProcessed = new CountDownLatch(1);
- TestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
+ CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
// add authorizations for processing
for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
- TestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
- TestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
- TestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
+ CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
+ CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
+ CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
signalTextProcessed.await();
- File results = new File(TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
- String s = TestUtils.readFile(results);
+ File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
+ String s = CommTestUtils.readFile(results);
Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
}