You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/15 22:35:04 UTC
[3/3] S4-24 dynamic app deployment
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
new file mode 100644
index 0000000..279ccaa
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -0,0 +1,381 @@
+package org.apache.s4.deploy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+
+/**
+ * Tests deployment of packaged applications <br>
+ * - loaded from local apps directory <br>
+ * - deployed through zookeeper notification <br>
+ * - ... from the file system <br>
+ * - ... or from a web server
+ *
+ */
+public class TestAutomaticDeployment {
+
+ private Factory zookeeperServerConnectionFactory;
+ private Process forkedNode;
+ private ZkClient zkClient;
+ private String clusterName;
+ private HttpServer httpServer;
+ private static File tmpAppsDir;
+
+ @BeforeClass
+ public static void createS4RFiles() throws Exception {
+ tmpAppsDir = Files.createTempDir();
+
+ File gradlewFile = CoreTestUtils.findGradlewInRootDir();
+
+ CoreTestUtils.callGradleTask(gradlewFile, new File(gradlewFile.getParentFile().getAbsolutePath()
+ + "/test-apps/simple-deployable-app-1/build.gradle"), "installS4R", new String[] { "appsDir="
+ + tmpAppsDir.getAbsolutePath() });
+
+ CoreTestUtils.callGradleTask(gradlewFile, new File(gradlewFile.getParentFile().getAbsolutePath()
+ + "/test-apps/simple-deployable-app-2/build.gradle"), "installS4R", new String[] { "appsDir="
+ + tmpAppsDir.getAbsolutePath() });
+ }
+
+ @Before
+ public void cleanLocalAppsDir() throws ConfigurationException {
+ PropertiesConfiguration config = loadConfig();
+
+ if (!new File(config.getString("appsDir")).exists()) {
+ Assert.assertTrue(new File(config.getString("appsDir")).mkdirs());
+ } else {
+ if (!config.getString("appsDir").startsWith("/tmp")) {
+ Assert.fail("apps dir should a subdir of /tmp for safety");
+ }
+ CommTestUtils.deleteDirectoryContents(new File(config.getString("appsDir")));
+ }
+ }
+
+ private PropertiesConfiguration loadConfig() throws ConfigurationException {
+ InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
+ PropertiesConfiguration config = new PropertiesConfiguration();
+ config.load(is);
+ return config;
+ }
+
+ @Test
+ public void testInitialDeploymentFromFileSystem() throws Exception {
+
+ File s4rToDeploy = new File(loadConfig().getString("appsDir") + File.separator + "testapp"
+ + System.currentTimeMillis() + ".s4r");
+
+ Assert.assertTrue(ByteStreams.copy(
+ Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
+ + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
+
+ initializeS4Node();
+
+ final String uri = s4rToDeploy.toURI().toString();
+
+ assertDeployment(uri, true);
+
+ }
+
+ @Test
+ public void testZkTriggeredDeploymentFromFileSystem() throws Exception {
+
+ initializeS4Node();
+
+ Assert.assertFalse(zkClient.exists(AppConstants.INITIALIZED_ZNODE_1));
+
+ File s4rToDeploy = File.createTempFile("testapp" + System.currentTimeMillis(), "s4r");
+
+ Assert.assertTrue(ByteStreams.copy(
+ Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
+ + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
+
+ final String uri = s4rToDeploy.toURI().toString();
+
+ assertDeployment(uri, false);
+
+ }
+
+ private void assertDeployment(final String uri, boolean initial) throws KeeperException, InterruptedException,
+ IOException {
+ CountDownLatch signalAppInitialized = new CountDownLatch(1);
+ CountDownLatch signalAppStarted = new CountDownLatch(1);
+ CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalAppInitialized,
+ CommTestUtils.createZkClient());
+ CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalAppStarted,
+ CommTestUtils.createZkClient());
+
+ if (!initial) {
+ ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
+ record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
+ zkClient.create("/" + clusterName + "/apps/testApp", record, CreateMode.PERSISTENT);
+ }
+
+ Assert.assertTrue(signalAppInitialized.await(10, TimeUnit.SECONDS));
+ Assert.assertTrue(signalAppStarted.await(10, TimeUnit.SECONDS));
+
+ String time1 = String.valueOf(System.currentTimeMillis());
+
+ CountDownLatch signalEvent1Processed = new CountDownLatch(1);
+ CommTestUtils
+ .watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, CommTestUtils.createZkClient());
+
+ CoreTestUtils.injectIntoStringSocketAdapter(time1);
+
+ // check event processed
+ Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));
+ }
+
+ private void assertMultipleAppsDeployment(String uri1, String uri2) throws KeeperException, InterruptedException,
+ IOException {
+ CountDownLatch signalApp1Initialized = new CountDownLatch(1);
+ CountDownLatch signalApp1Started = new CountDownLatch(1);
+
+ CountDownLatch signalApp2Initialized = new CountDownLatch(1);
+ CountDownLatch signalApp2Started = new CountDownLatch(1);
+
+ CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalApp1Initialized,
+ CommTestUtils.createZkClient());
+ CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_2, signalApp1Started,
+ CommTestUtils.createZkClient());
+
+ CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_2, signalApp2Initialized,
+ CommTestUtils.createZkClient());
+ CommTestUtils.watchAndSignalCreation(AppConstants.STARTED_ZNODE_2, signalApp2Started,
+ CommTestUtils.createZkClient());
+
+ ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()) + "-app1");
+ record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uri1);
+ zkClient.create("/" + clusterName + "/apps/testApp1", record1, CreateMode.PERSISTENT);
+
+ ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()) + "-app2");
+ record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uri2);
+ zkClient.create("/" + clusterName + "/apps/testApp2", record2, CreateMode.PERSISTENT);
+
+ Assert.assertTrue(signalApp1Initialized.await(10, TimeUnit.SECONDS));
+ Assert.assertTrue(signalApp1Started.await(10, TimeUnit.SECONDS));
+
+ Assert.assertTrue(signalApp2Initialized.await(10, TimeUnit.SECONDS));
+ Assert.assertTrue(signalApp2Started.await(10, TimeUnit.SECONDS));
+
+ }
+
+ @Test
+ public void testZkTriggeredDeploymentFromHttp() throws Exception {
+ initializeS4Node();
+
+ Assert.assertFalse(zkClient.exists(AppConstants.INITIALIZED_ZNODE_1));
+
+ File tmpDir = Files.createTempDir();
+
+ File s4rToDeploy = new File(tmpDir, String.valueOf(System.currentTimeMillis()));
+
+ Assert.assertTrue(ByteStreams.copy(
+ Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
+ + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
+
+ // we start a
+ InetSocketAddress addr = new InetSocketAddress(8080);
+ httpServer = HttpServer.create(addr, 0);
+
+ httpServer.createContext("/s4", new MyHandler(tmpDir));
+ httpServer.setExecutor(Executors.newCachedThreadPool());
+ httpServer.start();
+
+ assertDeployment("http://localhost:8080/s4/" + s4rToDeploy.getName(), false);
+
+ }
+
+ /**
+ *
+ * Tests that classes with same signature are loaded in different class loaders (through the S4RLoader), even when
+ * referenced through reflection, and even when referencing classes present in the classpath of the S4 node
+ *
+ * Works in the following manner:
+ *
+ * - we have app1 and app2, very simple apps
+ *
+ * - app1 and app2 have 3 classes with same name: A, AppConstants and TestApp
+ *
+ * - app1 in addition has a PE and a socket adapter so that it can react to injected events
+ *
+ * - upon initialization of the application, TestApp writes a znode in Zookeeper, corresponding to the application
+ * index (1 or 2), using the corresponding constant from the AppConstant class (which is part of the S4 node
+ * classpath, and therefore loaded by the standard classloader, not from an s4 app classloader)
+ *
+ * - upon startup of the application, TestApp creates A by reflection, and A writes a znode specific to the current
+ * app
+ *
+ * - app1 and app2 are generated through gradle scripts, called by executing the "gradlew" executable at the root of
+ * the project, and using the build.gradle file available for these applications
+ *
+ * - app1 and app2 s4r archives are copied to a web server and published to Zookeeper
+ *
+ * - they automatically get deployed, and we verify that 2 apps are correctly started, therefore that classes
+ * TestApp and A were independently loaded for independent applications
+ *
+ */
+
+ @Test
+ public void testZkTriggeredDeploymentFromHttpForMultipleApps() throws Exception {
+ initializeS4Node();
+ Assert.assertFalse(zkClient.exists(AppConstants.INITIALIZED_ZNODE_1));
+ Assert.assertFalse(zkClient.exists(AppConstants.INITIALIZED_ZNODE_2));
+
+ File tmpDir = Files.createTempDir();
+
+ File s4rToDeployForApp1 = new File(tmpDir, String.valueOf(System.currentTimeMillis()) + "-app1");
+ File s4rToDeployForApp2 = new File(tmpDir, String.valueOf(System.currentTimeMillis()) + "-app2");
+
+ Assert.assertTrue(ByteStreams.copy(
+ Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
+ + "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r")),
+ Files.newOutputStreamSupplier(s4rToDeployForApp1)) > 0);
+ Assert.assertTrue(ByteStreams.copy(
+ Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath()
+ + "/simple-deployable-app-2-0.0.0-SNAPSHOT.s4r")),
+ Files.newOutputStreamSupplier(s4rToDeployForApp2)) > 0);
+
+ // we start a
+ InetSocketAddress addr = new InetSocketAddress(8080);
+ httpServer = HttpServer.create(addr, 0);
+
+ httpServer.createContext("/s4", new MyHandler(tmpDir));
+ httpServer.setExecutor(Executors.newCachedThreadPool());
+ httpServer.start();
+
+ assertMultipleAppsDeployment("http://localhost:8080/s4/" + s4rToDeployForApp1.getName(),
+ "http://localhost:8080/s4/" + s4rToDeployForApp2.getName());
+
+ }
+
+ private void initializeS4Node() throws ConfigurationException, IOException, InterruptedException {
+ // 0. package s4 app
+ // TODO this is currently done offline, and the app contains the TestApp class copied from the one in the
+ // current package .
+
+ // 1. start s4 nodes. Check that no app is deployed.
+ InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
+ PropertiesConfiguration config = new PropertiesConfiguration();
+ config.load(is);
+
+ clusterName = config.getString("cluster.name");
+ TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
+ taskSetup.clean(clusterName);
+ taskSetup.setup(clusterName, 1);
+
+ zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ List<String> processes = zkClient.getChildren("/" + clusterName + "/process");
+ Assert.assertTrue(processes.size() == 0);
+ final CountDownLatch signalProcessesReady = new CountDownLatch(1);
+
+ zkClient.subscribeChildChanges("/" + clusterName + "/process", new IZkChildListener() {
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+ if (currentChilds.size() == 2) {
+ signalProcessesReady.countDown();
+ }
+
+ }
+ });
+
+ File tmpConfig = File.createTempFile("tmp", "config");
+ Assert.assertTrue(ByteStreams.copy(getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties"),
+ Files.newOutputStreamSupplier(tmpConfig)) > 0);
+ forkedNode = CoreTestUtils.forkS4Node(new String[] { tmpConfig.getAbsolutePath() });
+
+ // TODO synchro with ready state from zk
+ Thread.sleep(10000);
+ // Assert.assertTrue(signalProcessesReady.await(10, TimeUnit.SECONDS));
+
+ }
+
+ @Before
+ public void prepare() throws Exception {
+ CommTestUtils.cleanupTmpDirs();
+ zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
+ final ZooKeeper zk = CommTestUtils.createZkClient();
+ try {
+ zk.delete("/simpleAppCreated", -1);
+ } catch (Exception ignored) {
+ }
+
+ zk.close();
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ CommTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+ CommTestUtils.killS4App(forkedNode);
+ if (httpServer != null) {
+ httpServer.stop(0);
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+
+ System.out.println("Server is listening on port 8080");
+ }
+}
+
+class MyHandler implements HttpHandler {
+
+ File tmpDir;
+
+ public MyHandler(File tmpDir) {
+ this.tmpDir = tmpDir;
+ }
+
+ @Override
+ public void handle(HttpExchange exchange) throws IOException {
+ String requestMethod = exchange.getRequestMethod();
+ if (requestMethod.equalsIgnoreCase("GET")) {
+ String fileName = exchange.getRequestURI().getPath().substring("/s4/".length());
+ Headers responseHeaders = exchange.getResponseHeaders();
+ responseHeaders.set(HttpHeaders.Names.CONTENT_TYPE, HttpHeaders.Values.BYTES);
+ exchange.sendResponseHeaders(200, Files.toByteArray(new File(tmpDir, fileName)).length);
+
+ OutputStream responseBody = exchange.getResponseBody();
+
+ ByteStreams.copy(new FileInputStream(new File(tmpDir, fileName)), responseBody);
+
+ responseBody.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
new file mode 100644
index 0000000..f5f3912
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
@@ -0,0 +1,62 @@
+package org.apache.s4.deploy;
+
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromZK;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+public class TestModule extends AbstractModule {
+
+ private PropertiesConfiguration config;
+
+ private void loadProperties(Binder binder) {
+
+ try {
+ InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
+ config = new PropertiesConfiguration();
+ config.load(is);
+ System.out.println(ConfigurationUtils.toString(config));
+ // TODO - validate properties.
+
+ /* Make all properties injectable. Do we need this? */
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ protected void configure() {
+ if (config == null) {
+ loadProperties(binder());
+ }
+ bind(Cluster.class);
+ bind(Hasher.class).to(DefaultHasher.class);
+ bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+ bind(Assignment.class).to(AssignmentFromZK.class);
+ bind(Topology.class).to(TopologyFromZK.class);
+ bind(Emitter.class).to(TCPEmitter.class);
+ bind(Listener.class).to(TCPListener.class);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 8dc2159..060f82a 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -1,10 +1,19 @@
package org.apache.s4.fixtures;
+import java.io.BufferedReader;
+import java.io.File;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import junit.framework.Assert;
import org.apache.s4.core.App;
import org.apache.s4.core.Main;
-import org.apache.s4.fixtures.CommTestUtils;
+
+import com.google.common.io.PatternFilenameFilter;
/**
* Contains static methods that can be used in tests for things such as: - files utilities: strings <-> files
@@ -21,4 +30,76 @@ public class CoreTestUtils extends CommTestUtils {
public static Process forkS4Node() throws IOException, InterruptedException {
return forkProcess(Main.class.getName(), new String[] {});
}
+
+ public static Process forkS4Node(String[] args) throws IOException, InterruptedException {
+ return forkProcess(Main.class.getName(), args);
+ }
+
+ public static File findGradlewInRootDir() {
+ File gradlewFile = null;
+ if (new File(System.getProperty("user.dir")).listFiles(new PatternFilenameFilter("gradlew")).length == 1) {
+ gradlewFile = new File(System.getProperty("user.dir") + File.separator + "gradlew");
+ } else {
+ if (new File(System.getProperty("user.dir")).getParentFile().getParentFile()
+ .listFiles(new PatternFilenameFilter("gradlew")).length == 1) {
+ gradlewFile = new File(new File(System.getProperty("user.dir")).getParentFile().getParentFile()
+ .getAbsolutePath()
+ + File.separator + "gradlew");
+ } else {
+ Assert.fail("Cannot find gradlew executable in [" + System.getProperty("user.dir") + "] or ["
+ + new File(System.getProperty("user.dir")).getParentFile().getAbsolutePath() + "]");
+ }
+ }
+ return gradlewFile;
+ }
+
+ public static void callGradleTask(File gradlewFile, File buildFile, String taskName, String[] params)
+ throws Exception {
+
+ List<String> cmdList = new ArrayList<String>();
+ cmdList.add(gradlewFile.getAbsolutePath());
+ cmdList.add("-c");
+ cmdList.add(gradlewFile.getParentFile().getAbsolutePath() + "/settings.gradle");
+ cmdList.add("-b");
+ cmdList.add(buildFile.getAbsolutePath());
+ cmdList.add(taskName);
+ if (params.length > 0) {
+ for (int i = 0; i < params.length; i++) {
+ cmdList.add("-P" + params[i]);
+ }
+ }
+
+ System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
+ ProcessBuilder pb = new ProcessBuilder(cmdList);
+
+ pb.directory(buildFile.getParentFile());
+ pb.redirectErrorStream();
+ final Process process = pb.start();
+
+ process.waitFor();
+
+ // try {
+ // int exitValue = process.exitValue();
+ // Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
+ // } catch (IllegalThreadStateException ignored) {
+ // }
+
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ String line;
+ try {
+ line = br.readLine();
+ while (line != null) {
+ System.out.println(line);
+ line = br.readLine();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }).start();
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index c40a050..e12853d 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -1,6 +1,5 @@
package org.apache.s4.wordcount;
-
import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
@@ -18,9 +17,8 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-
public class WordCountTest {
-
+
public static final String SENTENCE_1 = "to be or not to be doobie doobie da";
public static final int SENTENCE_1_TOTAL_WORDS = SENTENCE_1.split(" ").length;
public static final String SENTENCE_2 = "doobie doobie da";
@@ -28,10 +26,9 @@ public class WordCountTest {
public static final String SENTENCE_3 = "doobie";
public static final int SENTENCE_3_TOTAL_WORDS = SENTENCE_3.split(" ").length;
public static final String FLAG = ";";
- public static int TOTAL_WORDS = SENTENCE_1_TOTAL_WORDS
- + SENTENCE_2_TOTAL_WORDS + SENTENCE_3_TOTAL_WORDS;
+ public static int TOTAL_WORDS = SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + SENTENCE_3_TOTAL_WORDS;
private static Factory zookeeperServerConnectionFactory;
-
+
@Before
public void prepare() throws IOException, InterruptedException, KeeperException {
CommTestUtils.cleanupTmpDirs();
@@ -45,10 +42,8 @@ public class WordCountTest {
*
*
*
- * sentences words word counts
- * Adapter ------------> WordSplitterPE -----------> WordCounterPE -------------> WordClassifierPE
- * key = "sentence" key = word key="classifier"
- * (should be *)
+ * sentences words word counts Adapter ------------> WordSplitterPE -----------> WordCounterPE ------------->
+ * WordClassifierPE key = "sentence" key = word key="classifier" (should be *)
*
*
* The test consists in checking that words are correctly counted.
@@ -57,31 +52,25 @@ public class WordCountTest {
*/
@Test
public void testSimple() throws Exception {
-
final ZooKeeper zk = CommTestUtils.createZkClient();
-
- App.main(new String[]{WordCountModule.class.getName(), WordCountApp.class.getName()});
-
+
+ App.main(new String[] { WordCountModule.class.getName(), WordCountApp.class.getName() });
CountDownLatch signalTextProcessed = new CountDownLatch(1);
- CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed,
- zk);
-
+ CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
+
// add authorizations for processing
- for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS
- + 1; i++) {
- zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL);
+ for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
+ zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
signalTextProcessed.await();
- File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR
- + File.separator + "wordcount");
+ File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
String s = CommTestUtils.readFile(results);
Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
-
+
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
index fb1d249..6f900c3 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
@@ -1,19 +1,23 @@
package org.apache.s4.wordcount.zk;
-import static org.apache.s4.wordcount.WordCountTest.*;
+import static org.apache.s4.wordcount.WordCountTest.SENTENCE_1;
+import static org.apache.s4.wordcount.WordCountTest.SENTENCE_1_TOTAL_WORDS;
+import static org.apache.s4.wordcount.WordCountTest.SENTENCE_2;
+import static org.apache.s4.wordcount.WordCountTest.SENTENCE_2_TOTAL_WORDS;
+import static org.apache.s4.wordcount.WordCountTest.SENTENCE_3;
+
import java.io.File;
import java.util.concurrent.CountDownLatch;
import junit.framework.Assert;
-import org.apache.s4.core.App;
+import org.apache.s4.core.Main;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.ZkBasedTest;
import org.apache.s4.wordcount.WordCountApp;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
-
+import org.apache.zookeeper.ZooKeeper;
import org.junit.Test;
public class WordCountTestZk extends ZkBasedTest {
@@ -22,7 +26,7 @@ public class WordCountTestZk extends ZkBasedTest {
final ZooKeeper zk = CommTestUtils.createZkClient();
- App.main(new String[] { WordCountModuleZk.class.getName(), WordCountApp.class.getName() });
+ Main.main(new String[] { WordCountModuleZk.class.getName(), WordCountApp.class.getName() });
CountDownLatch signalTextProcessed = new CountDownLatch(1);
CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/subprojects/s4-core/src/test/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/default.s4.properties b/subprojects/s4-core/src/test/resources/default.s4.properties
index 0e31dfa..62fc7d5 100644
--- a/subprojects/s4-core/src/test/resources/default.s4.properties
+++ b/subprojects/s4-core/src/test/resources/default.s4.properties
@@ -4,6 +4,6 @@ cluster.hosts = localhost
cluster.ports = 5077
cluster.lock_dir = {user.dir}/tmp
cluster.name = s4-test-cluster
-cluster.zk_address = localhost:2181
+cluster.zk_address = localhost:21810
cluster.zk_session_timeout = 10000
-cluster.zk_connection_timeout = 10000
\ No newline at end of file
+cluster.zk_connection_timeout = 10000
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
new file mode 100644
index 0000000..d74927f
--- /dev/null
+++ b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
@@ -0,0 +1,11 @@
+comm.queue_emmiter_size = 8000
+comm.queue_listener_size = 8000
+cluster.hosts = localhost
+cluster.ports = 5077
+cluster.name = s4-test-cluster
+cluster.zk_address = localhost:21810
+cluster.zk_session_timeout = 10000
+cluster.zk_connection_timeout = 10000
+comm.module = org.apache.s4.deploy.TestModule
+s4.logger_level = TRACE
+appsDir=/tmp/deploy-test
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/subprojects/s4-example/s4-example.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/s4-example.gradle b/subprojects/s4-example/s4-example.gradle
index 664496e..bbeb11b 100644
--- a/subprojects/s4-example/s4-example.gradle
+++ b/subprojects/s4-example/s4-example.gradle
@@ -21,4 +21,5 @@ dependencies {
compile project( ":s4-core" )
compile project( ":s4-comm" )
compile libraries.ejml
+ compile libraries.junit
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-1/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-1/build.gradle b/test-apps/simple-deployable-app-1/build.gradle
new file mode 100644
index 0000000..37b1daf
--- /dev/null
+++ b/test-apps/simple-deployable-app-1/build.gradle
@@ -0,0 +1,223 @@
+/*
+* Copyright 2010 the original author or authors.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/**
+* Apache S4 Application Build File
+*
+* Use this script to buils and package S4 apps.
+*
+* Run 'gradle install' on the s4 project to publish to your local maven repo.
+*
+* TODO: This should probably be distributed as an s4 plugin for Gradle.
+* TODO: There seem to be to be similarities with the war and jetty plugins. (war -> s4r, jetty -> s4Run).
+* We should make it easy to test the app from this script by a running a test task that starts and stops
+* an s4 server. See: http://www.gradle.org/releases/1.0-milestone-3/docs/userguide/userguide_single.html#war_plugin
+*
+* This is an interesting discussion:
+* http://gradle.1045684.n5.nabble.com/Exclude-properties-file-from-war-td3365147.html
+*
+*/
+
+/* Set the destination where we want to install the apps. */
+//s4AppInstallDir = "/tmp/s4Apps" // TODO: decide how to standarize dirs, use env var?
+
+s4AppInstallDir = hasProperty('appsDir') ? "$appsDir" : "/tmp/appsDir"
+
+s4Version = '0.5.0-SNAPSHOT'
+description = 'Apache S4 App'
+//defaultTasks 'installS4R'
+archivesBaseName = "$project.name"
+distRootFolder = "$archivesBaseName-${-> version}"
+
+
+// Append the suffix 'SNAPSHOT' when the build is not for release.
+version = new Version(major: 0, minor: 0, bugfix: 0, isRelease: false)
+group = 'org.apache.s4'
+
+apply plugin: 'java'
+apply plugin: 'eclipse'
+
+/* The app classname is set automatically from the source files. */
+def appClassname = ''
+
+/* Set Java version. */
+sourceCompatibility = 1.6
+targetCompatibility = 1.6
+
+
+
+/* All project libraries must be defined here. */
+libraries = [
+ json: 'org.json:json:20090211',
+ guava: 'com.google.guava:guava:10.0.1',
+ gson: 'com.google.code.gson:gson:1.6',
+ guice: 'com.google.inject:guice:3.0',
+ guice_assist: 'com.google.inject.extensions:guice-assistedinject:3.0',
+ guice_grapher: 'com.google.inject:guice-grapher:3.0',
+ flexjson: 'net.sf.flexjson:flexjson:2.1',
+ bcel: 'org.apache.bcel:bcel:5.2',
+ jakarta_regexp: 'jakarta-regexp:jakarta-regexp:1.4',
+ kryo: 'com.googlecode:kryo:1.04',
+ netty: 'org.jboss.netty:netty:3.2.5.Final',
+ reflectasm: 'com.esotericsoftware:reflectasm:0.8',
+ minlog: 'com.esotericsoftware:minlog:1.2',
+ asm: 'asm:asm:3.2',
+ commons_io: 'commons-io:commons-io:2.0.1',
+ commons_config: 'commons-configuration:commons-configuration:1.6',
+ commons_codec: 'commons-codec:commons-codec:1.4',
+ commons_httpclient: 'commons-httpclient:commons-httpclient:3.1',
+ commons_coll: 'net.sourceforge.collections:collections-generic:4.01', // Use this lib until the commons collection with Generics is released.
+ slf4j: 'org.slf4j:slf4j-api:1.6.1',
+ logback_core: 'ch.qos.logback:logback-core:0.9.29',
+ logback_classic: 'ch.qos.logback:logback-classic:0.9.29',
+ zk: 'org.apache.zookeeper:zookeeper:3.3.1',
+ jcip: 'net.jcip:jcip-annotations:1.0',
+ junit: 'junit:junit:4.10',
+ ]
+
+
+dependencies {
+
+ /* S4 Platform. We only need the API, not the transitive dependencies. */
+// s4Libs.each { module ->
+// compile( module ) //{ transitive = false }
+// s4API( module )
+// }
+
+ compile project(":s4-base")
+ compile project(":s4-comm")
+ compile project(":s4-core")
+
+ /* Logging. */
+ compile( libraries.slf4j )
+ compile( libraries.logback_core )
+ compile( libraries.logback_classic )
+
+ /* Commons. */
+ compile( libraries.commons_io )
+ compile( libraries.commons_config )
+ compile( libraries.commons_coll )
+
+ /* Misc. */
+ compile( libraries.jcip )
+
+ /* Testing. */
+ testCompile( libraries.junit )
+}
+
+/* Set the manifest attributes for the S4 archive here.
+* TODO: separate custom properties from std ones and set custom properties at the top of the build script.
+*/
+manifest.mainAttributes(
+ provider: 'gradle',
+ 'Implementation-Url': 'http://incubator.apache.org/projects/s4.html',
+ 'Implementation-Version': version,
+ 'Implementation-Vendor': 'Apache S4',
+ 'Implementation-Vendor-Id': 's4app',
+ 'S4-App-Class': appClassname, // gets set by the s4r task.
+ 'S4-Version': s4Version
+ )
+
+appDependencies = ( configurations.compile )
+
+/* This task will extract all the class files and create a fat jar. We set the manifest and the extension to make it an S4 archive file. */
+// TODO: exclude schenma files as needed (not critical) see: http://forums.gradle.org/gradle/topics/using_gradle_to_fat_jar_a_spring_project
+task s4r(type: Jar) {
+ dependsOn jar
+ from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
+ from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+ manifest = project.manifest
+ extension = 's4r'
+
+ /* Set class name in manifest. Parse source files until we find a class that extends App.
+ * Get fully qualified Java class name and set attribute in Manifest.
+ */
+ sourceSets.main.allSource.files.each { File file ->
+ if (appClassname =="" || appClassname == "UNKNOWN") {
+ // only execute the closure for this file if we haven't already found the app class name
+ appClassname = getAppClassname(file)
+ if(appClassname != "") {
+ manifest.mainAttributes('S4-App-Class': appClassname)
+ }
+ }
+ }
+
+ if (appClassname == "UNKNOWN") {
+
+ println "Couldn't find App class in source files...aborting."
+ exit(1)
+ }
+}
+
+/* List the artifacts that will br added to the s4 archive (and explode if needed). */
+s4r << {
+ appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
+ configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+
+ /* This is for debugging. */
+ //configurations.s4All.each { File file -> println 's4All: ' + file.name }
+ //deployableDependencies.each { File file -> println 'Deploy: ' + file.name }
+
+ // more debugging statements.
+ //sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
+
+}
+
+/* Install the S4 archive to the install directory. */
+task installS4R (type: Copy) {
+ dependsOn s4r
+ from s4r.archivePath
+ into s4AppInstallDir
+}
+
+/* Generates the gradlew scripts.
+http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
+task wrapper(type: Wrapper) { gradleVersion = '1.0-milestone-3' }
+
+
+/* Parse source file to get the app classname so we can use it in the manifest.
+* TODO: Use a real Java parser. (This is not skippong comments for example.)
+*/
+def getAppClassname(file) {
+ def classname = "UNKNOWN"
+ lines= file.readLines()
+ for(line in lines) {
+
+ def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
+ if(pn) {
+ packageName = pn[0][1] + "."
+ }
+
+ def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
+ if (an) {
+ classname = packageName + an[0][1]
+ println "Found app class name: " + classname
+ break
+ }
+ }
+ classname
+}
+
+class Version {
+ int major
+ int minor
+ int bugfix
+ boolean isRelease
+
+ String toString() {
+ "$major.$minor.$bugfix${isRelease ? '' : '-SNAPSHOT'}"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/A.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/A.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/A.java
new file mode 100644
index 0000000..45bc37b
--- /dev/null
+++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/A.java
@@ -0,0 +1,17 @@
+package org.apache.s4.deploy;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.s4.deploy.AppConstants;
+
+public class A {
+
+ public A(ZkClient zkClient) {
+ try {
+ zkClient.createEphemeral(AppConstants.STARTED_ZNODE_1, null);
+ } catch (Exception e) {
+ System.exit(-1);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/AppConstants.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/AppConstants.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/AppConstants.java
new file mode 100644
index 0000000..a12b6ea
--- /dev/null
+++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/AppConstants.java
@@ -0,0 +1,10 @@
+package org.apache.s4.deploy;
+
+public class AppConstants {
+ public static final String STARTED_ZNODE_1 = "/s4-test/test-app-1-started";
+ public static final String INITIALIZED_ZNODE_1 = "/s4-test/test-app-1-initialized";
+
+ public static final String STARTED_ZNODE_2 = "/s4-test/test-app-2-started";
+ public static final String INITIALIZED_ZNODE_2 = "/s4-test/test-app-2-initialized";
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
new file mode 100644
index 0000000..809cebd
--- /dev/null
+++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
@@ -0,0 +1,63 @@
+package org.apache.s4.deploy;
+
+import java.io.IOException;
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.LoggerFactory;
+
+public class SimplePE extends ProcessingElement implements Watcher {
+
+ private ZooKeeper zk;
+
+ public SimplePE() {
+ }
+
+ public SimplePE(App app) {
+ super(app);
+ }
+
+ public void onEvent(org.apache.s4.base.Event event) {
+ try {
+ LoggerFactory.getLogger(getClass()).debug("processing envent {}", event.get("line"));
+ zk.create("/onEvent@" + event.get("line"), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.close();
+ } catch (KeeperException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ protected void onCreate() {
+ if (zk == null) {
+ try {
+ zk = new ZooKeeper("localhost:" + 21810, 4000, this);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ @Override
+ protected void onRemove() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ // TODO Auto-generated method stub
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SocketAdapter.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SocketAdapter.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SocketAdapter.java
new file mode 100644
index 0000000..5ee7197
--- /dev/null
+++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SocketAdapter.java
@@ -0,0 +1,80 @@
+package org.apache.s4.deploy;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.Stream;
+
+public class SocketAdapter {
+
+ static ServerSocket serverSocket;
+
+ /**
+ * Listens to incoming sentence and forwards them to a sentence Stream. Each sentence is sent through a new socket
+ * connection
+ *
+ * @param stream
+ * @throws IOException
+ */
+ public SocketAdapter(final Stream<Event> stream) throws IOException {
+ Thread t = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ serverSocket = null;
+ Socket connectedSocket;
+ BufferedReader in = null;
+ try {
+ serverSocket = new ServerSocket(12000);
+ while (true) {
+ connectedSocket = serverSocket.accept();
+ in = new BufferedReader(new InputStreamReader(connectedSocket.getInputStream()));
+
+ String line = in.readLine();
+ System.out.println("read: " + line);
+ Event event = new Event();
+ event.put("line", String.class, line);
+ stream.put(event);
+ connectedSocket.close();
+ }
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.exit(-1);
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ if (serverSocket != null) {
+ try {
+ serverSocket.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+ });
+ t.start();
+
+ }
+
+ public void close() {
+ if (serverSocket != null) {
+ try {
+ serverSocket.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
new file mode 100644
index 0000000..25b98a6
--- /dev/null
+++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
@@ -0,0 +1,60 @@
+package org.apache.s4.deploy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+import org.apache.s4.core.Stream;
+import org.apache.zookeeper.CreateMode;
+
+public class TestApp extends App {
+
+ private ZkClient zkClient;
+ private SocketAdapter socketAdapter;
+
+ @Override
+ protected void onClose() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onInit() {
+ try {
+ try {
+ SimplePE prototype = createPE(SimplePE.class);
+ Stream<Event> stream = createStream("stream", new KeyFinder<Event>() {
+ public java.util.List<String> get(Event event) {
+ return new ArrayList<String>() {
+ {
+ add("line");
+ }
+ };
+ }
+ }, prototype);
+ socketAdapter = new SocketAdapter(stream);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ zkClient = new ZkClient("localhost:" + 21810);
+ if (!zkClient.exists("/s4-test")) {
+ zkClient.create("/s4-test", null, CreateMode.PERSISTENT);
+ }
+ zkClient.createEphemeral(AppConstants.INITIALIZED_ZNODE_1, null);
+ } catch (Exception e) {
+ System.exit(-1);
+ }
+ }
+
+ @Override
+ protected void onStart() {
+ try {
+ Class.forName("org.apache.s4.deploy.A").getConstructor(ZkClient.class).newInstance(zkClient);
+ } catch (Exception e) {
+ System.exit(-1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-2/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-2/build.gradle b/test-apps/simple-deployable-app-2/build.gradle
new file mode 100644
index 0000000..37b1daf
--- /dev/null
+++ b/test-apps/simple-deployable-app-2/build.gradle
@@ -0,0 +1,223 @@
+/*
+* Copyright 2010 the original author or authors.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/**
+* Apache S4 Application Build File
+*
+* Use this script to buils and package S4 apps.
+*
+* Run 'gradle install' on the s4 project to publish to your local maven repo.
+*
+* TODO: This should probably be distributed as an s4 plugin for Gradle.
+* TODO: There seem to be to be similarities with the war and jetty plugins. (war -> s4r, jetty -> s4Run).
+* We should make it easy to test the app from this script by a running a test task that starts and stops
+* an s4 server. See: http://www.gradle.org/releases/1.0-milestone-3/docs/userguide/userguide_single.html#war_plugin
+*
+* This is an interesting discussion:
+* http://gradle.1045684.n5.nabble.com/Exclude-properties-file-from-war-td3365147.html
+*
+*/
+
+/* Set the destination where we want to install the apps. */
+//s4AppInstallDir = "/tmp/s4Apps" // TODO: decide how to standarize dirs, use env var?
+
+s4AppInstallDir = hasProperty('appsDir') ? "$appsDir" : "/tmp/appsDir"
+
+s4Version = '0.5.0-SNAPSHOT'
+description = 'Apache S4 App'
+//defaultTasks 'installS4R'
+archivesBaseName = "$project.name"
+distRootFolder = "$archivesBaseName-${-> version}"
+
+
+// Append the suffix 'SNAPSHOT' when the build is not for release.
+version = new Version(major: 0, minor: 0, bugfix: 0, isRelease: false)
+group = 'org.apache.s4'
+
+apply plugin: 'java'
+apply plugin: 'eclipse'
+
+/* The app classname is set automatically from the source files. */
+def appClassname = ''
+
+/* Set Java version. */
+sourceCompatibility = 1.6
+targetCompatibility = 1.6
+
+
+
+/* All project libraries must be defined here. */
+libraries = [
+ json: 'org.json:json:20090211',
+ guava: 'com.google.guava:guava:10.0.1',
+ gson: 'com.google.code.gson:gson:1.6',
+ guice: 'com.google.inject:guice:3.0',
+ guice_assist: 'com.google.inject.extensions:guice-assistedinject:3.0',
+ guice_grapher: 'com.google.inject:guice-grapher:3.0',
+ flexjson: 'net.sf.flexjson:flexjson:2.1',
+ bcel: 'org.apache.bcel:bcel:5.2',
+ jakarta_regexp: 'jakarta-regexp:jakarta-regexp:1.4',
+ kryo: 'com.googlecode:kryo:1.04',
+ netty: 'org.jboss.netty:netty:3.2.5.Final',
+ reflectasm: 'com.esotericsoftware:reflectasm:0.8',
+ minlog: 'com.esotericsoftware:minlog:1.2',
+ asm: 'asm:asm:3.2',
+ commons_io: 'commons-io:commons-io:2.0.1',
+ commons_config: 'commons-configuration:commons-configuration:1.6',
+ commons_codec: 'commons-codec:commons-codec:1.4',
+ commons_httpclient: 'commons-httpclient:commons-httpclient:3.1',
+ commons_coll: 'net.sourceforge.collections:collections-generic:4.01', // Use this lib until the commons collection with Generics is released.
+ slf4j: 'org.slf4j:slf4j-api:1.6.1',
+ logback_core: 'ch.qos.logback:logback-core:0.9.29',
+ logback_classic: 'ch.qos.logback:logback-classic:0.9.29',
+ zk: 'org.apache.zookeeper:zookeeper:3.3.1',
+ jcip: 'net.jcip:jcip-annotations:1.0',
+ junit: 'junit:junit:4.10',
+ ]
+
+
+dependencies {
+
+ /* S4 Platform. We only need the API, not the transitive dependencies. */
+// s4Libs.each { module ->
+// compile( module ) //{ transitive = false }
+// s4API( module )
+// }
+
+ compile project(":s4-base")
+ compile project(":s4-comm")
+ compile project(":s4-core")
+
+ /* Logging. */
+ compile( libraries.slf4j )
+ compile( libraries.logback_core )
+ compile( libraries.logback_classic )
+
+ /* Commons. */
+ compile( libraries.commons_io )
+ compile( libraries.commons_config )
+ compile( libraries.commons_coll )
+
+ /* Misc. */
+ compile( libraries.jcip )
+
+ /* Testing. */
+ testCompile( libraries.junit )
+}
+
+/* Set the manifest attributes for the S4 archive here.
+* TODO: separate custom properties from std ones and set custom properties at the top of the build script.
+*/
+manifest.mainAttributes(
+ provider: 'gradle',
+ 'Implementation-Url': 'http://incubator.apache.org/projects/s4.html',
+ 'Implementation-Version': version,
+ 'Implementation-Vendor': 'Apache S4',
+ 'Implementation-Vendor-Id': 's4app',
+ 'S4-App-Class': appClassname, // gets set by the s4r task.
+ 'S4-Version': s4Version
+ )
+
+appDependencies = ( configurations.compile )
+
+/* This task will extract all the class files and create a fat jar. We set the manifest and the extension to make it an S4 archive file. */
+// TODO: exclude schenma files as needed (not critical) see: http://forums.gradle.org/gradle/topics/using_gradle_to_fat_jar_a_spring_project
+task s4r(type: Jar) {
+ dependsOn jar
+ from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
+ from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+ manifest = project.manifest
+ extension = 's4r'
+
+ /* Set class name in manifest. Parse source files until we find a class that extends App.
+ * Get fully qualified Java class name and set attribute in Manifest.
+ */
+ sourceSets.main.allSource.files.each { File file ->
+ if (appClassname =="" || appClassname == "UNKNOWN") {
+ // only execute the closure for this file if we haven't already found the app class name
+ appClassname = getAppClassname(file)
+ if(appClassname != "") {
+ manifest.mainAttributes('S4-App-Class': appClassname)
+ }
+ }
+ }
+
+ if (appClassname == "UNKNOWN") {
+
+ println "Couldn't find App class in source files...aborting."
+ exit(1)
+ }
+}
+
+/* List the artifacts that will br added to the s4 archive (and explode if needed). */
+s4r << {
+ appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
+ configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+
+ /* This is for debugging. */
+ //configurations.s4All.each { File file -> println 's4All: ' + file.name }
+ //deployableDependencies.each { File file -> println 'Deploy: ' + file.name }
+
+ // more debugging statements.
+ //sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
+
+}
+
+/* Install the S4 archive to the install directory. */
+task installS4R (type: Copy) {
+ dependsOn s4r
+ from s4r.archivePath
+ into s4AppInstallDir
+}
+
+/* Generates the gradlew scripts.
+http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
+task wrapper(type: Wrapper) { gradleVersion = '1.0-milestone-3' }
+
+
+/* Parse source file to get the app classname so we can use it in the manifest.
+* TODO: Use a real Java parser. (This is not skippong comments for example.)
+*/
+def getAppClassname(file) {
+ def classname = "UNKNOWN"
+ lines= file.readLines()
+ for(line in lines) {
+
+ def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
+ if(pn) {
+ packageName = pn[0][1] + "."
+ }
+
+ def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
+ if (an) {
+ classname = packageName + an[0][1]
+ println "Found app class name: " + classname
+ break
+ }
+ }
+ classname
+}
+
+class Version {
+ int major
+ int minor
+ int bugfix
+ boolean isRelease
+
+ String toString() {
+ "$major.$minor.$bugfix${isRelease ? '' : '-SNAPSHOT'}"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/A.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/A.java b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/A.java
new file mode 100644
index 0000000..732be13
--- /dev/null
+++ b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/A.java
@@ -0,0 +1,16 @@
+package org.apache.s4.deploy;
+
+import org.I0Itec.zkclient.ZkClient;
+
+public class A {
+
+ public A(ZkClient zkClient) {
+ try {
+ zkClient.createEphemeral(AppConstants.STARTED_ZNODE_2, null);
+ } catch (Exception e) {
+ System.exit(-1);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/AppConstants.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/AppConstants.java b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/AppConstants.java
new file mode 100644
index 0000000..a12b6ea
--- /dev/null
+++ b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/AppConstants.java
@@ -0,0 +1,10 @@
+package org.apache.s4.deploy;
+
+public class AppConstants {
+ public static final String STARTED_ZNODE_1 = "/s4-test/test-app-1-started";
+ public static final String INITIALIZED_ZNODE_1 = "/s4-test/test-app-1-initialized";
+
+ public static final String STARTED_ZNODE_2 = "/s4-test/test-app-2-started";
+ public static final String INITIALIZED_ZNODE_2 = "/s4-test/test-app-2-initialized";
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/7382fc87/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
new file mode 100644
index 0000000..0657ee9
--- /dev/null
+++ b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
@@ -0,0 +1,38 @@
+package org.apache.s4.deploy;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.s4.core.App;
+import org.apache.zookeeper.CreateMode;
+
+public class TestApp extends App {
+
+ private ZkClient zkClient;
+
+ @Override
+ protected void onClose() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void onInit() {
+ try {
+ zkClient = new ZkClient("localhost:" + 21810);
+ if (!zkClient.exists("/s4-test")) {
+ zkClient.create("/s4-test", null, CreateMode.PERSISTENT);
+ }
+ zkClient.createEphemeral(AppConstants.INITIALIZED_ZNODE_2, null);
+ } catch (Exception e) {
+ System.exit(-1);
+ }
+ }
+
+ @Override
+ protected void onStart() {
+ try {
+ Class.forName("org.apache.s4.deploy.A").getConstructor(ZkClient.class).newInstance(zkClient);
+ } catch (Exception e) {
+ System.exit(-1);
+ }
+ }
+}