You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/15 16:06:01 UTC
[12/22] inter-app communications + refactorings
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index 5d61ef4..8e04fc4 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -1,18 +1,3 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
package org.apache.s4.core;
import java.util.Collection;
@@ -41,9 +26,9 @@ public class Stream<T extends Event> implements Runnable, Streamable {
final static private int CAPACITY = 1000;
private static int idCounter = 0;
final private String name;
- final private Key<T> key;
+ final protected Key<T> key;
final private ProcessingElement[] targetPEs;
- final private BlockingQueue<EventMessage> queue = new ArrayBlockingQueue<EventMessage>(CAPACITY);
+ protected final BlockingQueue<EventMessage> queue = new ArrayBlockingQueue<EventMessage>(CAPACITY);
private Thread thread;
final private Sender sender;
final private Receiver receiver;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/Adapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/Adapter.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/Adapter.java
deleted file mode 100644
index 25ffc59..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/Adapter.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.s4.core.adapter;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.App;
-import org.apache.s4.core.RemoteSender;
-
-import com.google.inject.Inject;
-
-public abstract class Adapter extends App {
-
- @Inject
- RemoteSender remoteSender;
-
- public RemoteSender getRemoteSender() {
- return remoteSender;
- }
-
- public void setRemoteSender(RemoteSender remoteSender) {
- this.remoteSender = remoteSender;
- }
-
- protected <T extends Event> RemoteStream createRemoteStream(String name) {
-
- return new RemoteStream(this, name);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
deleted file mode 100644
index f027f06..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package org.apache.s4.core.adapter;
-
-import java.io.File;
-import java.io.FileInputStream;
-
-import org.apache.s4.core.Server;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
-public class AdapterMain {
- private static final Logger logger = LoggerFactory.getLogger(AdapterMain.class);
-
- public static void main(String[] args) {
-
- AdapterArgs adapterArgs = new AdapterArgs();
- JCommander jc = new JCommander(adapterArgs);
-
- try {
- jc.parse(args);
- } catch (Exception e) {
- e.printStackTrace();
- jc.usage();
- }
-
- try {
- Injector injector = Guice.createInjector(new AdapterModule(new FileInputStream(new File(
- adapterArgs.s4PropertiesFilePath))));
- Server server = injector.getInstance(Server.class);
- try {
- server.start(injector);
- } catch (Exception e) {
- logger.error("Failed to start the controller.", e);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-
- @Parameters(separators = "=")
- static class AdapterArgs {
-
- @Parameter(names = "-s4Properties", description = "s4 properties file path", required = true)
- String s4PropertiesFilePath;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterModule.java
deleted file mode 100644
index 6e5c4a0..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterModule.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.apache.s4.core.adapter;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.tcp.TCPListener;
-import org.apache.s4.comm.tcp.TCPRemoteEmitter;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.RemoteTopology;
-import org.apache.s4.comm.topology.RemoteTopologyFromZK;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromZK;
-import org.apache.s4.deploy.DeploymentManager;
-import org.apache.s4.deploy.DistributedDeploymentManager;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-public class AdapterModule extends AbstractModule {
-
- InputStream configFileInputStream;
- private PropertiesConfiguration config;
-
- public AdapterModule(InputStream configFileInputStream) {
- this.configFileInputStream = configFileInputStream;
- }
-
- @Override
- protected void configure() {
- if (config == null) {
- loadProperties(binder());
- }
- if (configFileInputStream != null) {
- try {
- configFileInputStream.close();
- } catch (IOException ignored) {
- }
- }
-
- int numHosts = config.getList("cluster.hosts").size();
- boolean isCluster = numHosts > 1 ? true : false;
- bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(Boolean.valueOf(isCluster));
-
- bind(Cluster.class);
-
- bind(Assignment.class).to(AssignmentFromZK.class);
-
- bind(Topology.class).to(TopologyFromZK.class);
- bind(RemoteTopology.class).to(RemoteTopologyFromZK.class);
-
- bind(RemoteEmitter.class).to(TCPRemoteEmitter.class);
- bind(Emitter.class).to(TCPEmitter.class);
- bind(Listener.class).to(TCPListener.class);
-
- // TODO downstream hasher
-
- /* The hashing function to map keys top partitions. */
- bind(Hasher.class).to(DefaultHasher.class);
-
- /* Use Kryo to serialize events. */
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-
- bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
- }
-
- private void loadProperties(Binder binder) {
- try {
- config = new PropertiesConfiguration();
- config.load(configFileInputStream);
-
- System.out.println(ConfigurationUtils.toString(config));
- // TODO - validate properties.
-
- /* Make all properties injectable. Do we need this? */
- Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/RemoteStream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/RemoteStream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/RemoteStream.java
deleted file mode 100644
index a83feee..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/RemoteStream.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.s4.core.adapter;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.RemoteSender;
-import org.apache.s4.core.Streamable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Stream that dispatches events to a remote cluster
- *
- */
-public class RemoteStream implements Streamable<Event> {
-
- private Thread thread;
- String name;
-
- RemoteSender remoteSender;
- int id;
- private Adapter adapter;
- private static Logger logger = LoggerFactory.getLogger(RemoteStream.class);
-
- private static AtomicInteger remoteStreamCounter = new AtomicInteger();
-
- public RemoteStream(Adapter adapter, String name) {
- this.name = name;
- this.adapter = adapter;
- adapter.addStream(this);
- remoteSender = adapter.getRemoteSender();
- this.id = remoteStreamCounter.addAndGet(1);
- }
-
- @Override
- public void start() {
-
- }
-
- @Override
- public void put(Event event) {
- event.setStreamId(name);
- event.setAppId(adapter.getId());
-
- // TODO specify partitioning?
- remoteSender.sendToRemotePartitions(event);
-
- }
-
- @Override
- public void close() {
- thread.interrupt();
- }
-
- @Override
- public String getName() {
- return name;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
index 4f7ab86..a316005 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
@@ -80,7 +80,7 @@ public class DistributedDeploymentManager implements DeploymentManager {
zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
zkClient.setZkSerializer(new ZNRecordSerializer());
IZkChildListener appListener = new AppsChangeListener();
- appsPath = "/" + clusterName + "/apps";
+ appsPath = "/s4/clusters/" + clusterName + "/apps";
if (!zkClient.exists(appsPath)) {
zkClient.create(appsPath, null, CreateMode.PERSISTENT);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
index 5a7e76b..7a01992 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
@@ -7,14 +7,14 @@ import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.apache.s4.core.triggers.TriggeredApp;
-import org.apache.s4.core.triggers.TriggeredModule;
import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;
import org.junit.After;
-import org.junit.Before;
+import com.google.common.io.Resources;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -23,7 +23,7 @@ import com.google.inject.Injector;
* instantiating new S4 nodes
*/
// NOTE: placed in this package so that App#start(), init() and close() can be called without modifying their visibility
-public abstract class TriggerTest {
+public abstract class TriggerTest extends ZkBasedTest {
private Factory zookeeperServerConnectionFactory;
public static TriggerType triggerType;
@@ -33,27 +33,23 @@ public abstract class TriggerTest {
TIME_BASED, COUNT_BASED, NONE
}
- @Before
- public void prepare() throws IOException, InterruptedException, KeeperException {
- CommTestUtils.cleanupTmpDirs();
- zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
- }
-
@After
public void cleanup() throws IOException, InterruptedException {
if (app != null) {
app.close();
app = null;
}
- CommTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+ cleanupZkBasedTest();
}
protected CountDownLatch createTriggerAppAndSendEvent() throws IOException, KeeperException, InterruptedException {
final ZooKeeper zk = CommTestUtils.createZkClient();
- Injector injector = Guice.createInjector(new TriggeredModule());
+ Injector injector = Guice.createInjector(new DefaultModule(Resources.newInputStreamSupplier(
+ Resources.getResource("default.s4.properties")).getInput()));
app = injector.getInstance(TriggeredApp.class);
app.init();
app.start();
+ // app.close();
String time1 = String.valueOf(System.currentTimeMillis());
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredModule.java
deleted file mode 100644
index cf7cd80..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredModule.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package org.apache.s4.core.triggers;
-
-import org.apache.s4.fixtures.FileBasedClusterManagementTestModule;
-
-public class TriggeredModule extends FileBasedClusterManagementTestModule<TriggeredApp> {
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 4e0a3e7..f5f4ce4 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -3,7 +3,6 @@ package org.apache.s4.deploy;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.List;
@@ -20,7 +19,6 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.s4.comm.tools.TaskSetup;
import org.apache.s4.comm.topology.ZNRecord;
import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.deploy.DistributedDeploymentManager;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
import org.apache.zookeeper.CreateMode;
@@ -31,10 +29,12 @@ import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
+import com.google.common.io.Resources;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
@@ -63,17 +63,17 @@ public class TestAutomaticDeployment {
File gradlewFile = CoreTestUtils.findGradlewInRootDir();
- CoreTestUtils.callGradleTask(gradlewFile, new File(gradlewFile.getParentFile().getAbsolutePath()
+ CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
+ "/test-apps/simple-deployable-app-1/build.gradle"), "installS4R", new String[] { "appsDir="
+ tmpAppsDir.getAbsolutePath() });
- CoreTestUtils.callGradleTask(gradlewFile, new File(gradlewFile.getParentFile().getAbsolutePath()
+ CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
+ "/test-apps/simple-deployable-app-2/build.gradle"), "installS4R", new String[] { "appsDir="
+ tmpAppsDir.getAbsolutePath() });
}
@Before
- public void cleanLocalAppsDir() throws ConfigurationException {
+ public void cleanLocalAppsDir() throws ConfigurationException, IOException {
PropertiesConfiguration config = loadConfig();
if (!new File(config.getString("appsDir")).exists()) {
@@ -86,14 +86,15 @@ public class TestAutomaticDeployment {
}
}
- private PropertiesConfiguration loadConfig() throws ConfigurationException {
- InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
+ private PropertiesConfiguration loadConfig() throws ConfigurationException, IOException {
PropertiesConfiguration config = new PropertiesConfiguration();
- config.load(is);
+ config.load(Resources.newInputStreamSupplier(Resources.getResource("default.s4.properties")).getInput());
return config;
}
+ // ignore this test since now we only deploy from artifacts published through zookeeper
@Test
+ @Ignore
public void testInitialDeploymentFromFileSystem() throws Exception {
File s4rToDeploy = new File(loadConfig().getString("appsDir") + File.separator + "testapp"
@@ -142,7 +143,7 @@ public class TestAutomaticDeployment {
if (!initial) {
ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
- zkClient.create("/" + clusterName + "/apps/testApp", record, CreateMode.PERSISTENT);
+ zkClient.create("/s4/clusters/" + clusterName + "/apps/testApp", record, CreateMode.PERSISTENT);
}
Assert.assertTrue(signalAppInitialized.await(10, TimeUnit.SECONDS));
@@ -180,16 +181,16 @@ public class TestAutomaticDeployment {
ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()) + "-app1");
record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uri1);
- zkClient.create("/" + clusterName + "/apps/testApp1", record1, CreateMode.PERSISTENT);
+ zkClient.create("/s4/clusters/" + clusterName + "/apps/testApp1", record1, CreateMode.PERSISTENT);
ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()) + "-app2");
record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uri2);
- zkClient.create("/" + clusterName + "/apps/testApp2", record2, CreateMode.PERSISTENT);
+ zkClient.create("/s4/clusters/" + clusterName + "/apps/testApp2", record2, CreateMode.PERSISTENT);
- Assert.assertTrue(signalApp1Initialized.await(10, TimeUnit.SECONDS));
+ Assert.assertTrue(signalApp1Initialized.await(20, TimeUnit.SECONDS));
Assert.assertTrue(signalApp1Started.await(10, TimeUnit.SECONDS));
- Assert.assertTrue(signalApp2Initialized.await(10, TimeUnit.SECONDS));
+ Assert.assertTrue(signalApp2Initialized.await(20, TimeUnit.SECONDS));
Assert.assertTrue(signalApp2Started.await(10, TimeUnit.SECONDS));
}
@@ -221,32 +222,24 @@ public class TestAutomaticDeployment {
}
/**
- *
- * Tests that classes with same signature are loaded in different class loaders (through the S4RLoader), even when
- * referenced through reflection, and even when referencing classes present in the classpath of the S4 node
- *
- * Works in the following manner:
- *
- * - we have app1 and app2, very simple apps
- *
- * - app1 and app2 have 3 classes with same name: A, AppConstants and TestApp
- *
- * - app1 in addition has a PE and a socket adapter so that it can react to injected events
- *
- * - upon initialization of the application, TestApp writes a znode in Zookeeper, corresponding to the application
- * index (1 or 2), using the corresponding constant from the AppConstant class (which is part of the S4 node
- * classpath, and therefore loaded by the standard classloader, not from an s4 app classloader)
+ * * * Tests that classes with same signature are loaded in different class loaders (through the S4RLoader), even
+ * when referenced through reflection, and even when referencing classes present in the classpath of the S4 nod * *
+ * Works in the following manne * * - we have app1 and app2, very simple a * * - app1 and app2 have 3 classes with
+ * same name: A, AppConstants and Tes * * - app1 in addition has a PE and a socket adapter so that it can react to
+ * injected e * * - upon initialization of the application, TestApp writes a znode in Zookeeper, corresponding to
+ * the application index (1 or 2), using the corresponding constant from the AppConstant class (which is part of the
+ * S4 node classpath, and therefore loaded by the standard classloader, not from an s4 app classl *
*
* - upon startup of the application, TestApp creates A by reflection, and A writes a znode specific to the current
- * app
+ * p
*
* - app1 and app2 are generated through gradle scripts, called by executing the "gradlew" executable at the root of
- * the project, and using the build.gradle file available for these applications
+ * the project, and using the build.gradle file available for these appl * ns
*
- * - app1 and app2 s4r archives are copied to a web server and published to Zookeeper
+ * - app1 and app2 s4r archives are copied to a web server and published to * per
*
* - they automatically get deployed, and we verify that 2 apps are correctly started, therefore that classes
- * TestApp and A were independently loaded for independent applications
+ * TestApp and A were independently loaded for independent ap * ions
*
*/
@@ -289,9 +282,8 @@ public class TestAutomaticDeployment {
// current package .
// 1. start s4 nodes. Check that no app is deployed.
- InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
PropertiesConfiguration config = new PropertiesConfiguration();
- config.load(is);
+ config.load(Resources.newInputStreamSupplier(Resources.getResource("default.s4.properties")).getInput());
clusterName = config.getString("cluster.name");
TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
@@ -300,11 +292,11 @@ public class TestAutomaticDeployment {
zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
zkClient.setZkSerializer(new ZNRecordSerializer());
- List<String> processes = zkClient.getChildren("/" + clusterName + "/process");
+ List<String> processes = zkClient.getChildren("/s4/clusters/" + clusterName + "/process");
Assert.assertTrue(processes.size() == 0);
final CountDownLatch signalProcessesReady = new CountDownLatch(1);
- zkClient.subscribeChildChanges("/" + clusterName + "/process", new IZkChildListener() {
+ zkClient.subscribeChildChanges("/s4/clusters/" + clusterName + "/process", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
@@ -316,7 +308,7 @@ public class TestAutomaticDeployment {
});
File tmpConfig = File.createTempFile("tmp", "config");
- Assert.assertTrue(ByteStreams.copy(getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties"),
+ Assert.assertTrue(ByteStreams.copy(getClass().getResourceAsStream("/default.s4.properties"),
Files.newOutputStreamSupplier(tmpConfig)) > 0);
forkedNode = CoreTestUtils.forkS4Node(new String[] { tmpConfig.getAbsolutePath() });
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
index e853df6..ca42d9d 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
+import com.google.common.io.Resources;
import com.sun.net.httpserver.HttpServer;
public class TestProducerConsumer {
@@ -46,17 +47,17 @@ public class TestProducerConsumer {
Assert.assertTrue(tmpAppsDir.exists());
File gradlewFile = CoreTestUtils.findGradlewInRootDir();
- CoreTestUtils.callGradleTask(gradlewFile, new File(gradlewFile.getParentFile().getAbsolutePath()
+ CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
+ "/test-apps/s4-showtime/build.gradle"), "installS4R",
new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
- CoreTestUtils.callGradleTask(gradlewFile, new File(gradlewFile.getParentFile().getAbsolutePath()
+ CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
+ "/test-apps/s4-counter/build.gradle"), "installS4R",
new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
}
@Before
- public void cleanLocalAppsDir() throws ConfigurationException {
+ public void cleanLocalAppsDir() throws ConfigurationException, IOException {
PropertiesConfiguration config = loadConfig();
if (!new File(config.getString("appsDir")).exists()) {
@@ -88,10 +89,10 @@ public class TestProducerConsumer {
CommTestUtils.killS4App(forkedNode);
}
- private PropertiesConfiguration loadConfig() throws org.apache.commons.configuration.ConfigurationException {
- InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
+ private PropertiesConfiguration loadConfig() throws org.apache.commons.configuration.ConfigurationException,
+ IOException {
PropertiesConfiguration config = new PropertiesConfiguration();
- config.load(is);
+ config.load(Resources.newInputStreamSupplier(Resources.getResource("default.s4.properties")).getInput());
return config;
}
@@ -117,11 +118,11 @@ public class TestProducerConsumer {
ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uriShowtime);
- zkClient.create("/" + clusterName + "/apps/showtime", record1, CreateMode.PERSISTENT);
+ zkClient.create("/s4/clusters/" + clusterName + "/apps/showtime", record1, CreateMode.PERSISTENT);
ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uriCounter);
- zkClient.create("/" + clusterName + "/apps/counter", record2, CreateMode.PERSISTENT);
+ zkClient.create("/s4/clusters/" + clusterName + "/apps/counter", record2, CreateMode.PERSISTENT);
// TODO validate test through some Zookeeper notifications
Thread.sleep(10000);
@@ -133,22 +134,25 @@ public class TestProducerConsumer {
// current package .
// 1. start s4 nodes. Check that no app is deployed.
- InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
+ // PropertiesConfiguration config = new PropertiesConfiguration();
+ // config.load(Resources.newInputStreamSupplier(Resources.getResource("/org.apache.s4.deploy.s4.properties"))
+ // .getInput());
+ InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
PropertiesConfiguration config = new PropertiesConfiguration();
config.load(is);
clusterName = config.getString("cluster.name");
TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
- taskSetup.clean(clusterName);
+ taskSetup.clean("s4");
taskSetup.setup(clusterName, 1, 1300);
zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
zkClient.setZkSerializer(new ZNRecordSerializer());
- List<String> processes = zkClient.getChildren("/" + clusterName + "/process");
+ List<String> processes = zkClient.getChildren("/s4/clusters/" + clusterName + "/process");
Assert.assertTrue(processes.size() == 0);
final CountDownLatch signalProcessesReady = new CountDownLatch(1);
- zkClient.subscribeChildChanges("/" + clusterName + "/process", new IZkChildListener() {
+ zkClient.subscribeChildChanges("/s4/clusters/" + clusterName + "/process", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
@@ -160,7 +164,7 @@ public class TestProducerConsumer {
});
File tmpConfig = File.createTempFile("tmp", "config");
- Assert.assertTrue(ByteStreams.copy(getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties"),
+ Assert.assertTrue(ByteStreams.copy(getClass().getResourceAsStream("/default.s4.properties"),
Files.newOutputStreamSupplier(tmpConfig)) > 0);
forkedNode = CoreTestUtils.forkS4Node(new String[] { tmpConfig.getAbsolutePath() });
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 060f82a..d9bf9a6 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -1,17 +1,19 @@
package org.apache.s4.fixtures;
-import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
-import java.io.InputStreamReader;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import junit.framework.Assert;
import org.apache.s4.core.App;
import org.apache.s4.core.Main;
+import org.gradle.tooling.BuildLauncher;
+import org.gradle.tooling.GradleConnector;
+import org.gradle.tooling.ProjectConnection;
+
+import sun.net.ProgressListener;
import com.google.common.io.PatternFilenameFilter;
@@ -53,53 +55,38 @@ public class CoreTestUtils extends CommTestUtils {
return gradlewFile;
}
- public static void callGradleTask(File gradlewFile, File buildFile, String taskName, String[] params)
- throws Exception {
-
- List<String> cmdList = new ArrayList<String>();
- cmdList.add(gradlewFile.getAbsolutePath());
- cmdList.add("-c");
- cmdList.add(gradlewFile.getParentFile().getAbsolutePath() + "/settings.gradle");
- cmdList.add("-b");
- cmdList.add(buildFile.getAbsolutePath());
- cmdList.add(taskName);
- if (params.length > 0) {
- for (int i = 0; i < params.length; i++) {
- cmdList.add("-P" + params[i]);
- }
- }
-
- System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
- ProcessBuilder pb = new ProcessBuilder(cmdList);
-
- pb.directory(buildFile.getParentFile());
- pb.redirectErrorStream();
- final Process process = pb.start();
-
- process.waitFor();
-
- // try {
- // int exitValue = process.exitValue();
- // Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
- // } catch (IllegalThreadStateException ignored) {
- // }
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
- String line;
- try {
- line = br.readLine();
- while (line != null) {
- System.out.println(line);
- line = br.readLine();
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
+ public static void callGradleTask(File buildFile, String taskName, String[] params) throws Exception {
+
+ ProjectConnection connection = GradleConnector.newConnector().forProjectDirectory(buildFile.getParentFile())
+ .connect();
+
+ try {
+ BuildLauncher build = connection.newBuild();
+
+ // select tasks to run:
+ build.forTasks(taskName);
+
+ List<String> buildArgs = new ArrayList<String>();
+ // buildArgs.add("-b");
+ // buildArgs.add(buildFilePath);
+ buildArgs.add("-stacktrace");
+ buildArgs.add("-info");
+ if (params.length > 0) {
+ for (int i = 0; i < params.length; i++) {
+ buildArgs.add("-P" + params[i]);
}
}
- }).start();
-
+
+ build.withArguments(buildArgs.toArray(new String[] {}));
+
+ // if you want to listen to the progress events:
+ ProgressListener listener = null; // use your implementation
+
+ // kick the build off:
+ build.run();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java
index cefe8b3..c1815bf 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java
@@ -1,6 +1,5 @@
package org.apache.s4.fixtures;
-
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -11,14 +10,13 @@ import org.apache.s4.core.Stream;
import org.apache.s4.wordcount.KeyValueEvent;
import org.apache.s4.wordcount.StringEvent;
-
public class SocketAdapter<T extends StringEvent> {
static ServerSocket serverSocket;
/**
- * Listens to incoming sentence and forwards them to a sentence Stream.
- * Each sentence is sent through a new socket connection
+ * Listens to incoming sentence and forwards them to a sentence Stream. Each sentence is sent through a new socket
+ * connection
*
* @param stream
* @throws IOException
@@ -39,13 +37,13 @@ public class SocketAdapter<T extends StringEvent> {
String line = in.readLine();
System.out.println("read: " + line);
- stream.put(stringEventFactory.create(line)) ;
+ stream.put(stringEventFactory.create(line));
connectedSocket.close();
}
} catch (IOException e) {
e.printStackTrace();
- System.exit(-1);
+ // System.exit(-1);
} finally {
if (in != null) {
try {
@@ -67,9 +65,9 @@ public class SocketAdapter<T extends StringEvent> {
t.start();
}
-
- public void close() {
- if(serverSocket !=null) {
+
+ public void close() {
+ if (serverSocket != null) {
try {
serverSocket.close();
} catch (Exception e) {
@@ -77,28 +75,27 @@ public class SocketAdapter<T extends StringEvent> {
}
}
}
-
+
interface StringEventFactory<T> {
T create(String string);
}
-
+
public static class SentenceEventFactory implements StringEventFactory<StringEvent> {
@Override
public StringEvent create(String string) {
return new StringEvent(string);
}
-
+
}
-
+
public static class KeyValueEventFactory implements StringEventFactory<KeyValueEvent> {
@Override
public KeyValueEvent create(String string) {
return new KeyValueEvent(string);
}
-
+
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
index cff6474..18aaf9c 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
@@ -5,6 +5,8 @@ import java.lang.reflect.Type;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.Listener;
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.core.RemoteSenders;
public class ZkBasedAppModule<T> extends ZkBasedClusterManagementTestModule {
private final Class<?> appClass;
@@ -22,8 +24,9 @@ public class ZkBasedAppModule<T> extends ZkBasedClusterManagementTestModule {
this.appClass = findAppClass();
}
- protected ZkBasedAppModule(Class<? extends Emitter> emitterClass, Class<? extends Listener> listenerClass) {
- super(emitterClass, listenerClass);
+ protected ZkBasedAppModule(Class<? extends Emitter> emitterClass,
+ Class<? extends RemoteEmitter> remoteEmitterClass, Class<? extends Listener> listenerClass) {
+ super(emitterClass, remoteEmitterClass, listenerClass);
this.appClass = findAppClass();
}
@@ -31,5 +34,7 @@ public class ZkBasedAppModule<T> extends ZkBasedClusterManagementTestModule {
protected void configure() {
super.configure();
bind(appClass);
+ bind(RemoteSenders.class);
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
index 4af6a4f..e493385 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
@@ -1,7 +1,7 @@
package org.apache.s4.wordcount;
-import org.apache.s4.fixtures.FileBasedClusterManagementTestModule;
+import org.apache.s4.fixtures.ZkBasedAppModule;
-public class WordCountModule extends FileBasedClusterManagementTestModule<WordCountApp> {
+public class WordCountModule extends ZkBasedAppModule<WordCountApp> {
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index e12853d..14c7abd 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -6,6 +6,8 @@ import java.util.concurrent.CountDownLatch;
import junit.framework.Assert;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.comm.tools.TaskSetup;
import org.apache.s4.core.App;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.zookeeper.CreateMode;
@@ -17,6 +19,8 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.io.Resources;
+
public class WordCountTest {
public static final String SENTENCE_1 = "to be or not to be doobie doobie da";
@@ -53,6 +57,11 @@ public class WordCountTest {
@Test
public void testSimple() throws Exception {
final ZooKeeper zk = CommTestUtils.createZkClient();
+ TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
+ PropertiesConfiguration config = new PropertiesConfiguration();
+ config.load(Resources.newInputStreamSupplier(Resources.getResource("default.s4.properties")).getInput());
+ taskSetup.clean("s4");
+ taskSetup.setup(config.getString("cluster.name"), 1, 10000);
App.main(new String[] { WordCountModule.class.getName(), WordCountApp.class.getName() });
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java
deleted file mode 100644
index 322690a..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.s4.wordcount.zk;
-
-import org.apache.s4.fixtures.ZkBasedAppModule;
-import org.apache.s4.wordcount.WordCountApp;
-
-public class WordCountModuleZk extends ZkBasedAppModule<WordCountApp> {
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
deleted file mode 100644
index 6f900c3..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package org.apache.s4.wordcount.zk;
-
-import static org.apache.s4.wordcount.WordCountTest.SENTENCE_1;
-import static org.apache.s4.wordcount.WordCountTest.SENTENCE_1_TOTAL_WORDS;
-import static org.apache.s4.wordcount.WordCountTest.SENTENCE_2;
-import static org.apache.s4.wordcount.WordCountTest.SENTENCE_2_TOTAL_WORDS;
-import static org.apache.s4.wordcount.WordCountTest.SENTENCE_3;
-
-import java.io.File;
-import java.util.concurrent.CountDownLatch;
-
-import junit.framework.Assert;
-
-import org.apache.s4.core.Main;
-import org.apache.s4.fixtures.CommTestUtils;
-import org.apache.s4.fixtures.ZkBasedTest;
-import org.apache.s4.wordcount.WordCountApp;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.Test;
-
-public class WordCountTestZk extends ZkBasedTest {
- @Test
- public void test() throws Exception {
-
- final ZooKeeper zk = CommTestUtils.createZkClient();
-
- Main.main(new String[] { WordCountModuleZk.class.getName(), WordCountApp.class.getName() });
-
- CountDownLatch signalTextProcessed = new CountDownLatch(1);
- CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
-
- // add authorizations for processing
- for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
- zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- }
- CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
- CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
- CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
- signalTextProcessed.await();
- File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
- String s = CommTestUtils.readFile(results);
- Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/default.s4.properties b/subprojects/s4-core/src/test/resources/default.s4.properties
index 2235032..488bd21 100644
--- a/subprojects/s4-core/src/test/resources/default.s4.properties
+++ b/subprojects/s4-core/src/test/resources/default.s4.properties
@@ -1,14 +1,10 @@
comm.queue_emmiter_size = 8000
comm.queue_listener_size = 8000
-cluster.hosts = localhost
-cluster.ports = 5077
-cluster.lock_dir = {user.dir}/tmp
-cluster.name = s4-test-cluster
+cluster.name = cluster1
cluster.zk_address = localhost:2181
cluster.zk_session_timeout = 10000
cluster.zk_connection_timeout = 10000
s4.logger_level = DEBUG
-comm.module = org.apache.s4.core.CustomModule
appsDir=/tmp/deploy-test
tcp.partition.queue_size=1000
comm.timeout=100
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
deleted file mode 100644
index 3a566f2..0000000
--- a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
+++ /dev/null
@@ -1,11 +0,0 @@
-comm.queue_emmiter_size = 8000
-comm.queue_listener_size = 8000
-cluster.hosts = localhost
-cluster.ports = 5077
-cluster.name = s4-test-cluster
-cluster.zk_address = localhost:2181
-cluster.zk_session_timeout = 10000
-cluster.zk_connection_timeout = 10000
-comm.module = org.apache.s4.core.adapter.AdapterModule
-s4.logger_level = TRACE
-appsDir=/tmp/deploy-test
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/resources/s4-counter-example.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/s4-counter-example.properties b/subprojects/s4-core/src/test/resources/s4-counter-example.properties
deleted file mode 100644
index b60f40a..0000000
--- a/subprojects/s4-core/src/test/resources/s4-counter-example.properties
+++ /dev/null
@@ -1,7 +0,0 @@
-comm.queue_emmiter_size = 8000
-comm.queue_listener_size = 8000
-cluster.hosts = localhost
-cluster.ports = 5077
-cluster.lock_dir = /tmp
-
-
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java
index eb96b9d..a4ec6d6 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java
@@ -1,17 +1,17 @@
/*
* Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific
* language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
+ * License. See accompanying LICENSE file.
*/
package org.apache.s4.example.counter;
@@ -21,19 +21,6 @@ import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
@@ -70,38 +57,40 @@ public class Module extends AbstractModule {
@Override
protected void configure() {
- if (config == null)
- loadProperties(binder());
-
- bind(MyApp.class);
-
- bind(Cluster.class);
-
- /* Configure static assignment using a configuration file. */
- bind(Assignment.class).to(AssignmentFromFile.class);
- /* Configure a static cluster topology using a configuration file. */
- bind(Topology.class).to(TopologyFromFile.class);
-
- // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
- // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
+ // TODO use the default module
+ // if (config == null)
+ // loadProperties(binder());
//
- // bind(Emitter.class).to(QueueingEmitter.class);
- // bind(Listener.class).to(QueueingListener.class);
-
- /* Use the Netty comm layer implementation. */
- // bind(Emitter.class).to(NettyEmitter.class);
- // bind(Listener.class).to(NettyListener.class);
-
- /* Use a simple UDP comm layer implementation. */
- bind(Emitter.class).to(UDPEmitter.class);
- bind(Listener.class).to(UDPListener.class);
-
- /* The hashing function to map keys top partitions. */
- bind(Hasher.class).to(DefaultHasher.class);
-
- /* Use Kryo to serialize events. */
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+ // bind(MyApp.class);
+ //
+ // bind(PhysicalCluster.class);
+ //
+ // /* Configure static assignment using a configuration file. */
+ // bind(Assignment.class).to(AssignmentFromFile.class);
+ //
+ // /* Configure a static cluster topology using a configuration file. */
+ // bind(Cluster.class).to(TopologyFromFile.class);
+ //
+ // // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
+ // // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
+ // //
+ // // bind(Emitter.class).to(QueueingEmitter.class);
+ // // bind(Listener.class).to(QueueingListener.class);
+ //
+ // /* Use the Netty comm layer implementation. */
+ // // bind(Emitter.class).to(NettyEmitter.class);
+ // // bind(Listener.class).to(NettyListener.class);
+ //
+ // /* Use a simple UDP comm layer implementation. */
+ // bind(Emitter.class).to(UDPEmitter.class);
+ // bind(Listener.class).to(UDPListener.class);
+ //
+ // /* The hashing function to map keys top partitions. */
+ // bind(Hasher.class).to(DefaultHasher.class);
+ //
+ // /* Use Kryo to serialize events. */
+ // bind(SerializerDeserializer.class).to(KryoSerDeser.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
index a8f034b..9e14031 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
@@ -1,17 +1,17 @@
/*
* Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific
* language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
+ * License. See accompanying LICENSE file.
*/
package org.apache.s4.example.fluent.counter;
@@ -21,20 +21,6 @@ import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-import org.apache.s4.fluent.AppMaker;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
@@ -71,38 +57,40 @@ public class Module extends AbstractModule {
@Override
protected void configure() {
- if (config == null)
- loadProperties(binder());
-
- bind(AppMaker.class).to(Main.class);
-
- bind(Cluster.class);
-
- /* Configure static assignment using a configuration file. */
- bind(Assignment.class).to(AssignmentFromFile.class);
- /* Configure a static cluster topology using a configuration file. */
- bind(Topology.class).to(TopologyFromFile.class);
-
- // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
- // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
+ // TODO use the default module
+ // if (config == null)
+ // loadProperties(binder());
//
- // bind(Emitter.class).to(QueueingEmitter.class);
- // bind(Listener.class).to(QueueingListener.class);
-
- /* Use the Netty comm layer implementation. */
- // bind(Emitter.class).to(NettyEmitter.class);
- // bind(Listener.class).to(NettyListener.class);
-
- /* Use a simple UDP comm layer implementation. */
- bind(Emitter.class).to(UDPEmitter.class);
- bind(Listener.class).to(UDPListener.class);
-
- /* The hashing function to map keys top partitions. */
- bind(Hasher.class).to(DefaultHasher.class);
-
- /* Use Kryo to serialize events. */
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+ // bind(AppMaker.class).to(Main.class);
+ //
+ // bind(PhysicalCluster.class);
+ //
+ // /* Configure static assignment using a configuration file. */
+ // bind(Assignment.class).to(AssignmentFromFile.class);
+ //
+ // /* Configure a static cluster topology using a configuration file. */
+ // bind(Cluster.class).to(TopologyFromFile.class);
+ //
+ // // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
+ // // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
+ // //
+ // // bind(Emitter.class).to(QueueingEmitter.class);
+ // // bind(Listener.class).to(QueueingListener.class);
+ //
+ // /* Use the Netty comm layer implementation. */
+ // // bind(Emitter.class).to(NettyEmitter.class);
+ // // bind(Listener.class).to(NettyListener.class);
+ //
+ // /* Use a simple UDP comm layer implementation. */
+ // bind(Emitter.class).to(UDPEmitter.class);
+ // bind(Listener.class).to(UDPListener.class);
+ //
+ // /* The hashing function to map keys top partitions. */
+ // bind(Hasher.class).to(DefaultHasher.class);
+ //
+ // /* Use Kryo to serialize events. */
+ // bind(SerializerDeserializer.class).to(KryoSerDeser.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-tools/s4-tools.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/s4-tools.gradle b/subprojects/s4-tools/s4-tools.gradle
index 68c728e..406d670 100644
--- a/subprojects/s4-tools/s4-tools.gradle
+++ b/subprojects/s4-tools/s4-tools.gradle
@@ -30,11 +30,28 @@ dependencies {
compile libraries.jcommander
compile libraries.zkclient
compile libraries.commons_io
+ compile libraries.gradle_base_services
+ compile libraries.gradle_core
+ compile libraries.gradle_tooling_api
+ compile libraries.gradle_wrapper
+
}
apply plugin:'application'
mainClassName = "org.apache.s4.tools.Tools"
+applicationDistribution.from(configurations.compile) {
+ // this is supposed to be done by default but is not anymore in 1.0-rc-3
+ include "*.jar"
+ into("lib")
+}
+
+startScripts {
+ classpath = files('$APP_HOME/lib/*')
+}
+
+
+
run {
// run doesn't yet directly accept command line parameters...
if ( project.hasProperty('args') ) {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
index 48f65c5..fd73428 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
@@ -38,7 +38,7 @@ public class DefineCluster {
@Parameters(commandNames = "s4 newCluster", separators = "=", commandDescription = "Setup new S4 logical cluster")
static class ZKServerArgs extends S4ArgsBase {
- @Parameter(names = "-name", description = "S4 cluster name", required = true)
+ @Parameter(names = "-cluster", description = "S4 cluster name", required = true)
String clusterName = "s4-test-cluster";
@Parameter(names = "-nbTasks", description = "number of tasks for the cluster", required = true)
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
index df3de81..3183bd2 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
@@ -1,9 +1,6 @@
package org.apache.s4.tools;
-import java.io.BufferedReader;
import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -18,8 +15,13 @@ import org.apache.s4.comm.topology.ZNRecord;
import org.apache.s4.comm.topology.ZNRecordSerializer;
import org.apache.s4.deploy.DistributedDeploymentManager;
import org.apache.zookeeper.CreateMode;
+import org.gradle.tooling.BuildLauncher;
+import org.gradle.tooling.GradleConnector;
+import org.gradle.tooling.ProjectConnection;
import org.slf4j.LoggerFactory;
+import sun.net.ProgressListener;
+
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.common.io.ByteStreams;
@@ -69,10 +71,11 @@ public class Deploy {
final String uri = s4rToDeploy.toURI().toString();
ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
- zkClient.create("/" + deployArgs.clusterName + "/apps/" + deployArgs.appName, record, CreateMode.PERSISTENT);
+ zkClient.create("/s4/clusters/" + deployArgs.clusterName + "/apps/" + deployArgs.appName, record,
+ CreateMode.PERSISTENT);
logger.info("uploaded application [{}] to cluster [{}], using zookeeper znode [{}]", new String[] {
deployArgs.appName, deployArgs.clusterName,
- "/" + deployArgs.clusterName + "/apps/" + deployArgs.appName });
+ "/s4/clusters/" + deployArgs.clusterName + "/apps/" + deployArgs.appName });
} catch (Exception e) {
LoggerFactory.getLogger(Deploy.class).error("Cannot deploy app", e);
@@ -110,57 +113,43 @@ public class Deploy {
public static void exec(String gradlewExecPath, String buildFilePath, String taskName, String[] params)
throws Exception {
- // Thread.sleep(10000);
- List<String> cmdList = new ArrayList<String>();
- // cmdList.add("sleep");
- // cmdList.add("2");
- // cmdList.add(";");
- cmdList.add(gradlewExecPath);
- // cmdList.add("-c");
- // cmdList.add(gradlewFile.getParentFile().getAbsolutePath() + "/settings.gradle");
- cmdList.add("-b");
- cmdList.add(buildFilePath);
- cmdList.add(taskName);
- cmdList.add("-stacktrace");
- cmdList.add("-info");
- if (params.length > 0) {
- for (int i = 0; i < params.length; i++) {
- cmdList.add("-P" + params[i]);
- }
- }
- System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
- ProcessBuilder pb = new ProcessBuilder(cmdList);
-
- pb.directory(new File(buildFilePath).getParentFile());
- pb.redirectErrorStream();
-
- final Process process = pb.start();
- new Thread(new Runnable() {
- @Override
- public void run() {
- BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
- String line;
- try {
- line = br.readLine();
- while (line != null) {
- System.out.println(line);
- line = br.readLine();
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
+
+ ProjectConnection connection = GradleConnector.newConnector()
+ .forProjectDirectory(new File(buildFilePath).getParentFile()).connect();
+
+ try {
+ BuildLauncher build = connection.newBuild();
+
+ // select tasks to run:
+ build.forTasks(taskName);
+
+ List<String> buildArgs = new ArrayList<String>();
+ // buildArgs.add("-b");
+ // buildArgs.add(buildFilePath);
+ buildArgs.add("-stacktrace");
+ buildArgs.add("-info");
+ if (params.length > 0) {
+ for (int i = 0; i < params.length; i++) {
+ buildArgs.add("-P" + params[i]);
}
}
- }).start();
- process.waitFor();
+ logger.info(Arrays.toString(buildArgs.toArray()));
+
+ build.withArguments(buildArgs.toArray(new String[] {}));
+
- // try {
- // int exitValue = process.exitValue();
- // Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
- // } catch (IllegalThreadStateException ignored) {
- // }
+ // if you want to listen to the progress events:
+ ProgressListener listener = null; // use your implementation
+ // build.addProgressListener(listener);
+ // kick the build off:
+ build.run();
+ } finally {
+ connection.close();
+ }
}
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/test-apps/s4-counter/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/s4-counter/build.gradle b/test-apps/s4-counter/build.gradle
index 37b1daf..87512df 100644
--- a/test-apps/s4-counter/build.gradle
+++ b/test-apps/s4-counter/build.gradle
@@ -100,7 +100,7 @@ dependencies {
compile project(":s4-base")
compile project(":s4-comm")
compile project(":s4-core")
-
+
/* Logging. */
compile( libraries.slf4j )
compile( libraries.logback_core )
@@ -138,10 +138,10 @@ appDependencies = ( configurations.compile )
task s4r(type: Jar) {
dependsOn jar
from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
- from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+ from { configurations.archives.allArtifacts.files.collect { it.isDirectory() ? it : zipTree(it) } }
manifest = project.manifest
extension = 's4r'
-
+
/* Set class name in manifest. Parse source files until we find a class that extends App.
* Get fully qualified Java class name and set attribute in Manifest.
*/
@@ -154,9 +154,9 @@ task s4r(type: Jar) {
}
}
}
-
+
if (appClassname == "UNKNOWN") {
-
+
println "Couldn't find App class in source files...aborting."
exit(1)
}
@@ -165,7 +165,7 @@ task s4r(type: Jar) {
/* List the artifacts that will br added to the s4 archive (and explode if needed). */
s4r << {
appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
- configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+ configurations.archives.allArtifacts.files.each { File file -> println 'Adding to s4 archive: ' + file.name }
/* This is for debugging. */
//configurations.s4All.each { File file -> println 's4All: ' + file.name }
@@ -173,7 +173,7 @@ s4r << {
// more debugging statements.
//sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
-
+
}
/* Install the S4 archive to the install directory. */
@@ -195,12 +195,12 @@ def getAppClassname(file) {
def classname = "UNKNOWN"
lines= file.readLines()
for(line in lines) {
-
+
def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
if(pn) {
packageName = pn[0][1] + "."
}
-
+
def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
if (an) {
classname = packageName + an[0][1]
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/test-apps/s4-showtime/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/s4-showtime/build.gradle b/test-apps/s4-showtime/build.gradle
index 37b1daf..72ff887 100644
--- a/test-apps/s4-showtime/build.gradle
+++ b/test-apps/s4-showtime/build.gradle
@@ -100,7 +100,7 @@ dependencies {
compile project(":s4-base")
compile project(":s4-comm")
compile project(":s4-core")
-
+
/* Logging. */
compile( libraries.slf4j )
compile( libraries.logback_core )
@@ -138,10 +138,10 @@ appDependencies = ( configurations.compile )
task s4r(type: Jar) {
dependsOn jar
from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
- from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+ from { configurations.archives.allArtifacts.files.collect { it.isDirectory() ? it : zipTree(it) } }
manifest = project.manifest
extension = 's4r'
-
+
/* Set class name in manifest. Parse source files until we find a class that extends App.
* Get fully qualified Java class name and set attribute in Manifest.
*/
@@ -154,9 +154,9 @@ task s4r(type: Jar) {
}
}
}
-
+
if (appClassname == "UNKNOWN") {
-
+
println "Couldn't find App class in source files...aborting."
exit(1)
}
@@ -165,7 +165,7 @@ task s4r(type: Jar) {
/* List the artifacts that will br added to the s4 archive (and explode if needed). */
s4r << {
appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
- configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+ configurations.archives.allArtifacts.files.each { File file -> println 'Adding to s4 archive: ' + file.name }
/* This is for debugging. */
//configurations.s4All.each { File file -> println 's4All: ' + file.name }
@@ -173,7 +173,7 @@ s4r << {
// more debugging statements.
//sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
-
+
}
/* Install the S4 archive to the install directory. */
@@ -183,9 +183,7 @@ task installS4R (type: Copy) {
into s4AppInstallDir
}
-/* Generates the gradlew scripts.
-http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
-task wrapper(type: Wrapper) { gradleVersion = '1.0-milestone-3' }
+
/* Parse source file to get the app classname so we can use it in the manifest.
@@ -195,12 +193,12 @@ def getAppClassname(file) {
def classname = "UNKNOWN"
lines= file.readLines()
for(line in lines) {
-
+
def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
if(pn) {
packageName = pn[0][1] + "."
}
-
+
def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
if (an) {
classname = packageName + an[0][1]
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/test-apps/simple-deployable-app-1/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-1/build.gradle b/test-apps/simple-deployable-app-1/build.gradle
index 37b1daf..87512df 100644
--- a/test-apps/simple-deployable-app-1/build.gradle
+++ b/test-apps/simple-deployable-app-1/build.gradle
@@ -100,7 +100,7 @@ dependencies {
compile project(":s4-base")
compile project(":s4-comm")
compile project(":s4-core")
-
+
/* Logging. */
compile( libraries.slf4j )
compile( libraries.logback_core )
@@ -138,10 +138,10 @@ appDependencies = ( configurations.compile )
task s4r(type: Jar) {
dependsOn jar
from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
- from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+ from { configurations.archives.allArtifacts.files.collect { it.isDirectory() ? it : zipTree(it) } }
manifest = project.manifest
extension = 's4r'
-
+
/* Set class name in manifest. Parse source files until we find a class that extends App.
* Get fully qualified Java class name and set attribute in Manifest.
*/
@@ -154,9 +154,9 @@ task s4r(type: Jar) {
}
}
}
-
+
if (appClassname == "UNKNOWN") {
-
+
println "Couldn't find App class in source files...aborting."
exit(1)
}
@@ -165,7 +165,7 @@ task s4r(type: Jar) {
/* List the artifacts that will br added to the s4 archive (and explode if needed). */
s4r << {
appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
- configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+ configurations.archives.allArtifacts.files.each { File file -> println 'Adding to s4 archive: ' + file.name }
/* This is for debugging. */
//configurations.s4All.each { File file -> println 's4All: ' + file.name }
@@ -173,7 +173,7 @@ s4r << {
// more debugging statements.
//sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
-
+
}
/* Install the S4 archive to the install directory. */
@@ -195,12 +195,12 @@ def getAppClassname(file) {
def classname = "UNKNOWN"
lines= file.readLines()
for(line in lines) {
-
+
def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
if(pn) {
packageName = pn[0][1] + "."
}
-
+
def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
if (an) {
classname = packageName + an[0][1]
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/test-apps/simple-deployable-app-2/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-2/build.gradle b/test-apps/simple-deployable-app-2/build.gradle
index 37b1daf..87512df 100644
--- a/test-apps/simple-deployable-app-2/build.gradle
+++ b/test-apps/simple-deployable-app-2/build.gradle
@@ -100,7 +100,7 @@ dependencies {
compile project(":s4-base")
compile project(":s4-comm")
compile project(":s4-core")
-
+
/* Logging. */
compile( libraries.slf4j )
compile( libraries.logback_core )
@@ -138,10 +138,10 @@ appDependencies = ( configurations.compile )
task s4r(type: Jar) {
dependsOn jar
from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
- from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+ from { configurations.archives.allArtifacts.files.collect { it.isDirectory() ? it : zipTree(it) } }
manifest = project.manifest
extension = 's4r'
-
+
/* Set class name in manifest. Parse source files until we find a class that extends App.
* Get fully qualified Java class name and set attribute in Manifest.
*/
@@ -154,9 +154,9 @@ task s4r(type: Jar) {
}
}
}
-
+
if (appClassname == "UNKNOWN") {
-
+
println "Couldn't find App class in source files...aborting."
exit(1)
}
@@ -165,7 +165,7 @@ task s4r(type: Jar) {
/* List the artifacts that will br added to the s4 archive (and explode if needed). */
s4r << {
appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
- configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+ configurations.archives.allArtifacts.files.each { File file -> println 'Adding to s4 archive: ' + file.name }
/* This is for debugging. */
//configurations.s4All.each { File file -> println 's4All: ' + file.name }
@@ -173,7 +173,7 @@ s4r << {
// more debugging statements.
//sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
-
+
}
/* Install the S4 archive to the install directory. */
@@ -195,12 +195,12 @@ def getAppClassname(file) {
def classname = "UNKNOWN"
lines= file.readLines()
for(line in lines) {
-
+
def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
if(pn) {
packageName = pn[0][1] + "."
}
-
+
def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
if (an) {
classname = packageName + an[0][1]
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/test-apps/twitter-adapter/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/build.gradle b/test-apps/twitter-adapter/build.gradle
index 100837d..5e7b2dc 100644
--- a/test-apps/twitter-adapter/build.gradle
+++ b/test-apps/twitter-adapter/build.gradle
@@ -117,7 +117,7 @@ appDependencies = ( configurations.compile )
task s4r(type: Jar) {
dependsOn jar
from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
- from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+ from { configurations.archives.allArtifacts.files.collect { it.isDirectory() ? it : zipTree(it) } }
manifest = project.manifest
extension = 's4r'
@@ -144,7 +144,7 @@ task s4r(type: Jar) {
/* List the artifacts that will br added to the s4 archive (and explode if needed). */
s4r << {
appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
- configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+ configurations.archives.allArtifacts.files.each { println 'Adding to s4 archive: ' + it.name }
/* This is for debugging. */
//configurations.s4All.each { File file -> println 's4All: ' + file.name }
@@ -180,10 +180,10 @@ def getAppClassname(file) {
packageName = pn[0][1] + "."
}
- def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+Adapter.*\{/
+ def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
if (an) {
classname = packageName + an[0][1]
- println "Found adapter class name: " + classname
+ println "Found app class name: " + classname
break
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
index 102ca10..0827e3a 100644
--- a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
+++ b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
@@ -6,10 +6,9 @@ import java.net.ServerSocket;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
-import org.I0Itec.zkclient.ZkClient;
import org.apache.s4.base.Event;
-import org.apache.s4.core.adapter.Adapter;
-import org.apache.s4.core.adapter.RemoteStream;
+import org.apache.s4.core.App;
+import org.apache.s4.core.RemoteStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -20,11 +19,9 @@ import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
-public class TwitterInputAdapter extends Adapter {
+public class TwitterInputAdapter extends App {
- private ZkClient zkClient;
private static Logger logger = LoggerFactory.getLogger(TwitterInputAdapter.class);
- private String urlString = "https://stream.twitter.com/1/statuses/sample.json";
public TwitterInputAdapter() {
}
@@ -47,7 +44,7 @@ public class TwitterInputAdapter extends Adapter {
@Override
protected void onInit() {
- remoteStream = createRemoteStream("RawStatus");
+ remoteStream = createOutputStream("RawStatus");
t = new Thread(new Dequeuer());
}
@@ -114,7 +111,7 @@ public class TwitterInputAdapter extends Adapter {
@Override
public void run() {
- while (!Thread.interrupted()) {
+ while (true) {
try {
Status status = messageQueue.take();
Event event = new Event();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/test-apps/twitter-counter/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/build.gradle b/test-apps/twitter-counter/build.gradle
index e10eae0..b93d0b9 100644
--- a/test-apps/twitter-counter/build.gradle
+++ b/test-apps/twitter-counter/build.gradle
@@ -115,7 +115,7 @@ appDependencies = ( configurations.compile )
task s4r(type: Jar) {
dependsOn jar
from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
- from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+ from { configurations.archives.allArtifacts.files.collect { zipTree(it) } }
manifest = project.manifest
extension = 's4r'
@@ -139,10 +139,11 @@ task s4r(type: Jar) {
}
}
+
/* List the artifacts that will br added to the s4 archive (and explode if needed). */
s4r << {
appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
- configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+ configurations.archives.allArtifacts.files.each { println 'Adding to s4 archive: ' + it.name }
/* This is for debugging. */
//configurations.s4All.each { File file -> println 's4All: ' + file.name }
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
index e854395..90c3199 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
@@ -58,7 +58,7 @@ public class TwitterCounterApp extends App {
TopicExtractorPE topicExtractorPE = createPE(TopicExtractorPE.class);
topicExtractorPE.setDownStream(topicSeenStream);
topicExtractorPE.setSingleton(true);
- createStream("RawStatus", topicExtractorPE);
+ createInputStream("RawStatus", topicExtractorPE);
} catch (Exception e) {
throw new RuntimeException(e);
@@ -67,10 +67,6 @@ public class TwitterCounterApp extends App {
@Override
protected void onStart() {
- // try {
- // t.start();
- // } catch (Exception e) {
- // throw new RuntimeException(e);
- // }
+
}
}