You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/15 16:06:01 UTC

[12/22] inter-app communications + refactorings

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index 5d61ef4..8e04fc4 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -1,18 +1,3 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
 package org.apache.s4.core;
 
 import java.util.Collection;
@@ -41,9 +26,9 @@ public class Stream<T extends Event> implements Runnable, Streamable {
     final static private int CAPACITY = 1000;
     private static int idCounter = 0;
     final private String name;
-    final private Key<T> key;
+    final protected Key<T> key;
     final private ProcessingElement[] targetPEs;
-    final private BlockingQueue<EventMessage> queue = new ArrayBlockingQueue<EventMessage>(CAPACITY);
+    protected final BlockingQueue<EventMessage> queue = new ArrayBlockingQueue<EventMessage>(CAPACITY);
     private Thread thread;
     final private Sender sender;
     final private Receiver receiver;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/Adapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/Adapter.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/Adapter.java
deleted file mode 100644
index 25ffc59..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/Adapter.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.s4.core.adapter;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.App;
-import org.apache.s4.core.RemoteSender;
-
-import com.google.inject.Inject;
-
-public abstract class Adapter extends App {
-
-    @Inject
-    RemoteSender remoteSender;
-
-    public RemoteSender getRemoteSender() {
-        return remoteSender;
-    }
-
-    public void setRemoteSender(RemoteSender remoteSender) {
-        this.remoteSender = remoteSender;
-    }
-
-    protected <T extends Event> RemoteStream createRemoteStream(String name) {
-
-        return new RemoteStream(this, name);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
deleted file mode 100644
index f027f06..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package org.apache.s4.core.adapter;
-
-import java.io.File;
-import java.io.FileInputStream;
-
-import org.apache.s4.core.Server;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
-public class AdapterMain {
-    private static final Logger logger = LoggerFactory.getLogger(AdapterMain.class);
-
-    public static void main(String[] args) {
-
-        AdapterArgs adapterArgs = new AdapterArgs();
-        JCommander jc = new JCommander(adapterArgs);
-
-        try {
-            jc.parse(args);
-        } catch (Exception e) {
-            e.printStackTrace();
-            jc.usage();
-        }
-
-        try {
-            Injector injector = Guice.createInjector(new AdapterModule(new FileInputStream(new File(
-                    adapterArgs.s4PropertiesFilePath))));
-            Server server = injector.getInstance(Server.class);
-            try {
-                server.start(injector);
-            } catch (Exception e) {
-                logger.error("Failed to start the controller.", e);
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
-    }
-
-    @Parameters(separators = "=")
-    static class AdapterArgs {
-
-        @Parameter(names = "-s4Properties", description = "s4 properties file path", required = true)
-        String s4PropertiesFilePath;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterModule.java
deleted file mode 100644
index 6e5c4a0..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterModule.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.apache.s4.core.adapter;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.tcp.TCPListener;
-import org.apache.s4.comm.tcp.TCPRemoteEmitter;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromZK;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.RemoteTopology;
-import org.apache.s4.comm.topology.RemoteTopologyFromZK;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromZK;
-import org.apache.s4.deploy.DeploymentManager;
-import org.apache.s4.deploy.DistributedDeploymentManager;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-public class AdapterModule extends AbstractModule {
-
-    InputStream configFileInputStream;
-    private PropertiesConfiguration config;
-
-    public AdapterModule(InputStream configFileInputStream) {
-        this.configFileInputStream = configFileInputStream;
-    }
-
-    @Override
-    protected void configure() {
-        if (config == null) {
-            loadProperties(binder());
-        }
-        if (configFileInputStream != null) {
-            try {
-                configFileInputStream.close();
-            } catch (IOException ignored) {
-            }
-        }
-
-        int numHosts = config.getList("cluster.hosts").size();
-        boolean isCluster = numHosts > 1 ? true : false;
-        bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(Boolean.valueOf(isCluster));
-
-        bind(Cluster.class);
-
-        bind(Assignment.class).to(AssignmentFromZK.class);
-
-        bind(Topology.class).to(TopologyFromZK.class);
-        bind(RemoteTopology.class).to(RemoteTopologyFromZK.class);
-
-        bind(RemoteEmitter.class).to(TCPRemoteEmitter.class);
-        bind(Emitter.class).to(TCPEmitter.class);
-        bind(Listener.class).to(TCPListener.class);
-
-        // TODO downstream hasher
-
-        /* The hashing function to map keys top partitions. */
-        bind(Hasher.class).to(DefaultHasher.class);
-
-        /* Use Kryo to serialize events. */
-        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-
-        bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
-    }
-
-    private void loadProperties(Binder binder) {
-        try {
-            config = new PropertiesConfiguration();
-            config.load(configFileInputStream);
-
-            System.out.println(ConfigurationUtils.toString(config));
-            // TODO - validate properties.
-
-            /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-        } catch (ConfigurationException e) {
-            binder.addError(e);
-            e.printStackTrace();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/RemoteStream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/RemoteStream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/RemoteStream.java
deleted file mode 100644
index a83feee..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/RemoteStream.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.s4.core.adapter;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.core.RemoteSender;
-import org.apache.s4.core.Streamable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Stream that dispatches events to a remote cluster
- * 
- */
-public class RemoteStream implements Streamable<Event> {
-
-    private Thread thread;
-    String name;
-
-    RemoteSender remoteSender;
-    int id;
-    private Adapter adapter;
-    private static Logger logger = LoggerFactory.getLogger(RemoteStream.class);
-
-    private static AtomicInteger remoteStreamCounter = new AtomicInteger();
-
-    public RemoteStream(Adapter adapter, String name) {
-        this.name = name;
-        this.adapter = adapter;
-        adapter.addStream(this);
-        remoteSender = adapter.getRemoteSender();
-        this.id = remoteStreamCounter.addAndGet(1);
-    }
-
-    @Override
-    public void start() {
-
-    }
-
-    @Override
-    public void put(Event event) {
-        event.setStreamId(name);
-        event.setAppId(adapter.getId());
-
-        // TODO specify partitioning?
-        remoteSender.sendToRemotePartitions(event);
-
-    }
-
-    @Override
-    public void close() {
-        thread.interrupt();
-    }
-
-    @Override
-    public String getName() {
-        return name;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
index 4f7ab86..a316005 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
@@ -80,7 +80,7 @@ public class DistributedDeploymentManager implements DeploymentManager {
         zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
         zkClient.setZkSerializer(new ZNRecordSerializer());
         IZkChildListener appListener = new AppsChangeListener();
-        appsPath = "/" + clusterName + "/apps";
+        appsPath = "/s4/clusters/" + clusterName + "/apps";
         if (!zkClient.exists(appsPath)) {
             zkClient.create(appsPath, null, CreateMode.PERSISTENT);
         }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
index 5a7e76b..7a01992 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
@@ -7,14 +7,14 @@ import java.util.concurrent.TimeUnit;
 import junit.framework.Assert;
 
 import org.apache.s4.core.triggers.TriggeredApp;
-import org.apache.s4.core.triggers.TriggeredModule;
 import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.NIOServerCnxn.Factory;
 import org.junit.After;
-import org.junit.Before;
 
+import com.google.common.io.Resources;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 
@@ -23,7 +23,7 @@ import com.google.inject.Injector;
  * instantiating new S4 nodes
  */
 // NOTE: placed in this package so that App#start(), init() and close() can be called without modifying their visibility
-public abstract class TriggerTest {
+public abstract class TriggerTest extends ZkBasedTest {
 
     private Factory zookeeperServerConnectionFactory;
     public static TriggerType triggerType;
@@ -33,27 +33,23 @@ public abstract class TriggerTest {
         TIME_BASED, COUNT_BASED, NONE
     }
 
-    @Before
-    public void prepare() throws IOException, InterruptedException, KeeperException {
-        CommTestUtils.cleanupTmpDirs();
-        zookeeperServerConnectionFactory = CommTestUtils.startZookeeperServer();
-    }
-
     @After
     public void cleanup() throws IOException, InterruptedException {
         if (app != null) {
             app.close();
             app = null;
         }
-        CommTestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+        cleanupZkBasedTest();
     }
 
     protected CountDownLatch createTriggerAppAndSendEvent() throws IOException, KeeperException, InterruptedException {
         final ZooKeeper zk = CommTestUtils.createZkClient();
-        Injector injector = Guice.createInjector(new TriggeredModule());
+        Injector injector = Guice.createInjector(new DefaultModule(Resources.newInputStreamSupplier(
+                Resources.getResource("default.s4.properties")).getInput()));
         app = injector.getInstance(TriggeredApp.class);
         app.init();
         app.start();
+        // app.close();
 
         String time1 = String.valueOf(System.currentTimeMillis());
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredModule.java
deleted file mode 100644
index cf7cd80..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/triggers/TriggeredModule.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package org.apache.s4.core.triggers;
-
-import org.apache.s4.fixtures.FileBasedClusterManagementTestModule;
-
-public class TriggeredModule extends FileBasedClusterManagementTestModule<TriggeredApp> {
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 4e0a3e7..f5f4ce4 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -3,7 +3,6 @@ package org.apache.s4.deploy;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.util.List;
@@ -20,7 +19,6 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.s4.comm.tools.TaskSetup;
 import org.apache.s4.comm.topology.ZNRecord;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.deploy.DistributedDeploymentManager;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.CoreTestUtils;
 import org.apache.zookeeper.CreateMode;
@@ -31,10 +29,12 @@ import org.jboss.netty.handler.codec.http.HttpHeaders;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
+import com.google.common.io.Resources;
 import com.sun.net.httpserver.Headers;
 import com.sun.net.httpserver.HttpExchange;
 import com.sun.net.httpserver.HttpHandler;
@@ -63,17 +63,17 @@ public class TestAutomaticDeployment {
 
         File gradlewFile = CoreTestUtils.findGradlewInRootDir();
 
-        CoreTestUtils.callGradleTask(gradlewFile, new File(gradlewFile.getParentFile().getAbsolutePath()
+        CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
                 + "/test-apps/simple-deployable-app-1/build.gradle"), "installS4R", new String[] { "appsDir="
                 + tmpAppsDir.getAbsolutePath() });
 
-        CoreTestUtils.callGradleTask(gradlewFile, new File(gradlewFile.getParentFile().getAbsolutePath()
+        CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
                 + "/test-apps/simple-deployable-app-2/build.gradle"), "installS4R", new String[] { "appsDir="
                 + tmpAppsDir.getAbsolutePath() });
     }
 
     @Before
-    public void cleanLocalAppsDir() throws ConfigurationException {
+    public void cleanLocalAppsDir() throws ConfigurationException, IOException {
         PropertiesConfiguration config = loadConfig();
 
         if (!new File(config.getString("appsDir")).exists()) {
@@ -86,14 +86,15 @@ public class TestAutomaticDeployment {
         }
     }
 
-    private PropertiesConfiguration loadConfig() throws ConfigurationException {
-        InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
+    private PropertiesConfiguration loadConfig() throws ConfigurationException, IOException {
         PropertiesConfiguration config = new PropertiesConfiguration();
-        config.load(is);
+        config.load(Resources.newInputStreamSupplier(Resources.getResource("default.s4.properties")).getInput());
         return config;
     }
 
+    // ignore this test since now we only deploy from artifacts published through zookeeper
     @Test
+    @Ignore
     public void testInitialDeploymentFromFileSystem() throws Exception {
 
         File s4rToDeploy = new File(loadConfig().getString("appsDir") + File.separator + "testapp"
@@ -142,7 +143,7 @@ public class TestAutomaticDeployment {
         if (!initial) {
             ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
             record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
-            zkClient.create("/" + clusterName + "/apps/testApp", record, CreateMode.PERSISTENT);
+            zkClient.create("/s4/clusters/" + clusterName + "/apps/testApp", record, CreateMode.PERSISTENT);
         }
 
         Assert.assertTrue(signalAppInitialized.await(10, TimeUnit.SECONDS));
@@ -180,16 +181,16 @@ public class TestAutomaticDeployment {
 
         ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()) + "-app1");
         record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uri1);
-        zkClient.create("/" + clusterName + "/apps/testApp1", record1, CreateMode.PERSISTENT);
+        zkClient.create("/s4/clusters/" + clusterName + "/apps/testApp1", record1, CreateMode.PERSISTENT);
 
         ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()) + "-app2");
         record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uri2);
-        zkClient.create("/" + clusterName + "/apps/testApp2", record2, CreateMode.PERSISTENT);
+        zkClient.create("/s4/clusters/" + clusterName + "/apps/testApp2", record2, CreateMode.PERSISTENT);
 
-        Assert.assertTrue(signalApp1Initialized.await(10, TimeUnit.SECONDS));
+        Assert.assertTrue(signalApp1Initialized.await(20, TimeUnit.SECONDS));
         Assert.assertTrue(signalApp1Started.await(10, TimeUnit.SECONDS));
 
-        Assert.assertTrue(signalApp2Initialized.await(10, TimeUnit.SECONDS));
+        Assert.assertTrue(signalApp2Initialized.await(20, TimeUnit.SECONDS));
         Assert.assertTrue(signalApp2Started.await(10, TimeUnit.SECONDS));
 
     }
@@ -221,32 +222,24 @@ public class TestAutomaticDeployment {
     }
 
     /**
-     * 
-     * Tests that classes with same signature are loaded in different class loaders (through the S4RLoader), even when
-     * referenced through reflection, and even when referencing classes present in the classpath of the S4 node
-     * 
-     * Works in the following manner:
-     * 
-     * - we have app1 and app2, very simple apps
-     * 
-     * - app1 and app2 have 3 classes with same name: A, AppConstants and TestApp
-     * 
-     * - app1 in addition has a PE and a socket adapter so that it can react to injected events
-     * 
-     * - upon initialization of the application, TestApp writes a znode in Zookeeper, corresponding to the application
-     * index (1 or 2), using the corresponding constant from the AppConstant class (which is part of the S4 node
-     * classpath, and therefore loaded by the standard classloader, not from an s4 app classloader)
+     * * * Tests that classes with same signature are loaded in different class loaders (through the S4RLoader), even
+     * when referenced through reflection, and even when referencing classes present in the classpath of the S4 nod * *
+     * Works in the following manne * * - we have app1 and app2, very simple a * * - app1 and app2 have 3 classes with
+     * same name: A, AppConstants and Tes * * - app1 in addition has a PE and a socket adapter so that it can react to
+     * injected e * * - upon initialization of the application, TestApp writes a znode in Zookeeper, corresponding to
+     * the application index (1 or 2), using the corresponding constant from the AppConstant class (which is part of the
+     * S4 node classpath, and therefore loaded by the standard classloader, not from an s4 app classl *
      * 
      * - upon startup of the application, TestApp creates A by reflection, and A writes a znode specific to the current
-     * app
+     * p
      * 
      * - app1 and app2 are generated through gradle scripts, called by executing the "gradlew" executable at the root of
-     * the project, and using the build.gradle file available for these applications
+     * the project, and using the build.gradle file available for these appl * ns
      * 
-     * - app1 and app2 s4r archives are copied to a web server and published to Zookeeper
+     * - app1 and app2 s4r archives are copied to a web server and published to * per
      * 
      * - they automatically get deployed, and we verify that 2 apps are correctly started, therefore that classes
-     * TestApp and A were independently loaded for independent applications
+     * TestApp and A were independently loaded for independent ap * ions
      * 
      */
 
@@ -289,9 +282,8 @@ public class TestAutomaticDeployment {
         // current package .
 
         // 1. start s4 nodes. Check that no app is deployed.
-        InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
         PropertiesConfiguration config = new PropertiesConfiguration();
-        config.load(is);
+        config.load(Resources.newInputStreamSupplier(Resources.getResource("default.s4.properties")).getInput());
 
         clusterName = config.getString("cluster.name");
         TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
@@ -300,11 +292,11 @@ public class TestAutomaticDeployment {
 
         zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
         zkClient.setZkSerializer(new ZNRecordSerializer());
-        List<String> processes = zkClient.getChildren("/" + clusterName + "/process");
+        List<String> processes = zkClient.getChildren("/s4/clusters/" + clusterName + "/process");
         Assert.assertTrue(processes.size() == 0);
         final CountDownLatch signalProcessesReady = new CountDownLatch(1);
 
-        zkClient.subscribeChildChanges("/" + clusterName + "/process", new IZkChildListener() {
+        zkClient.subscribeChildChanges("/s4/clusters/" + clusterName + "/process", new IZkChildListener() {
 
             @Override
             public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
@@ -316,7 +308,7 @@ public class TestAutomaticDeployment {
         });
 
         File tmpConfig = File.createTempFile("tmp", "config");
-        Assert.assertTrue(ByteStreams.copy(getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties"),
+        Assert.assertTrue(ByteStreams.copy(getClass().getResourceAsStream("/default.s4.properties"),
                 Files.newOutputStreamSupplier(tmpConfig)) > 0);
         forkedNode = CoreTestUtils.forkS4Node(new String[] { tmpConfig.getAbsolutePath() });
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
index e853df6..ca42d9d 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/prodcon/TestProducerConsumer.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
+import com.google.common.io.Resources;
 import com.sun.net.httpserver.HttpServer;
 
 public class TestProducerConsumer {
@@ -46,17 +47,17 @@ public class TestProducerConsumer {
         Assert.assertTrue(tmpAppsDir.exists());
         File gradlewFile = CoreTestUtils.findGradlewInRootDir();
 
-        CoreTestUtils.callGradleTask(gradlewFile, new File(gradlewFile.getParentFile().getAbsolutePath()
+        CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
                 + "/test-apps/s4-showtime/build.gradle"), "installS4R",
                 new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
 
-        CoreTestUtils.callGradleTask(gradlewFile, new File(gradlewFile.getParentFile().getAbsolutePath()
+        CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
                 + "/test-apps/s4-counter/build.gradle"), "installS4R",
                 new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
     }
 
     @Before
-    public void cleanLocalAppsDir() throws ConfigurationException {
+    public void cleanLocalAppsDir() throws ConfigurationException, IOException {
         PropertiesConfiguration config = loadConfig();
 
         if (!new File(config.getString("appsDir")).exists()) {
@@ -88,10 +89,10 @@ public class TestProducerConsumer {
         CommTestUtils.killS4App(forkedNode);
     }
 
-    private PropertiesConfiguration loadConfig() throws org.apache.commons.configuration.ConfigurationException {
-        InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
+    private PropertiesConfiguration loadConfig() throws org.apache.commons.configuration.ConfigurationException,
+            IOException {
         PropertiesConfiguration config = new PropertiesConfiguration();
-        config.load(is);
+        config.load(Resources.newInputStreamSupplier(Resources.getResource("default.s4.properties")).getInput());
         return config;
     }
 
@@ -117,11 +118,11 @@ public class TestProducerConsumer {
 
         ZNRecord record1 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
         record1.putSimpleField(DistributedDeploymentManager.S4R_URI, uriShowtime);
-        zkClient.create("/" + clusterName + "/apps/showtime", record1, CreateMode.PERSISTENT);
+        zkClient.create("/s4/clusters/" + clusterName + "/apps/showtime", record1, CreateMode.PERSISTENT);
 
         ZNRecord record2 = new ZNRecord(String.valueOf(System.currentTimeMillis()));
         record2.putSimpleField(DistributedDeploymentManager.S4R_URI, uriCounter);
-        zkClient.create("/" + clusterName + "/apps/counter", record2, CreateMode.PERSISTENT);
+        zkClient.create("/s4/clusters/" + clusterName + "/apps/counter", record2, CreateMode.PERSISTENT);
 
         // TODO validate test through some Zookeeper notifications
         Thread.sleep(10000);
@@ -133,22 +134,25 @@ public class TestProducerConsumer {
         // current package .
 
         // 1. start s4 nodes. Check that no app is deployed.
-        InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
+        // PropertiesConfiguration config = new PropertiesConfiguration();
+        // config.load(Resources.newInputStreamSupplier(Resources.getResource("/org.apache.s4.deploy.s4.properties"))
+        // .getInput());
+        InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
         PropertiesConfiguration config = new PropertiesConfiguration();
         config.load(is);
 
         clusterName = config.getString("cluster.name");
         TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
-        taskSetup.clean(clusterName);
+        taskSetup.clean("s4");
         taskSetup.setup(clusterName, 1, 1300);
 
         zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
         zkClient.setZkSerializer(new ZNRecordSerializer());
-        List<String> processes = zkClient.getChildren("/" + clusterName + "/process");
+        List<String> processes = zkClient.getChildren("/s4/clusters/" + clusterName + "/process");
         Assert.assertTrue(processes.size() == 0);
         final CountDownLatch signalProcessesReady = new CountDownLatch(1);
 
-        zkClient.subscribeChildChanges("/" + clusterName + "/process", new IZkChildListener() {
+        zkClient.subscribeChildChanges("/s4/clusters/" + clusterName + "/process", new IZkChildListener() {
 
             @Override
             public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
@@ -160,7 +164,7 @@ public class TestProducerConsumer {
         });
 
         File tmpConfig = File.createTempFile("tmp", "config");
-        Assert.assertTrue(ByteStreams.copy(getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties"),
+        Assert.assertTrue(ByteStreams.copy(getClass().getResourceAsStream("/default.s4.properties"),
                 Files.newOutputStreamSupplier(tmpConfig)) > 0);
         forkedNode = CoreTestUtils.forkS4Node(new String[] { tmpConfig.getAbsolutePath() });
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 060f82a..d9bf9a6 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -1,17 +1,19 @@
 package org.apache.s4.fixtures;
 
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 import junit.framework.Assert;
 
 import org.apache.s4.core.App;
 import org.apache.s4.core.Main;
+import org.gradle.tooling.BuildLauncher;
+import org.gradle.tooling.GradleConnector;
+import org.gradle.tooling.ProjectConnection;
+
+import sun.net.ProgressListener;
 
 import com.google.common.io.PatternFilenameFilter;
 
@@ -53,53 +55,38 @@ public class CoreTestUtils extends CommTestUtils {
         return gradlewFile;
     }
 
-    public static void callGradleTask(File gradlewFile, File buildFile, String taskName, String[] params)
-            throws Exception {
-    
-        List<String> cmdList = new ArrayList<String>();
-        cmdList.add(gradlewFile.getAbsolutePath());
-        cmdList.add("-c");
-        cmdList.add(gradlewFile.getParentFile().getAbsolutePath() + "/settings.gradle");
-        cmdList.add("-b");
-        cmdList.add(buildFile.getAbsolutePath());
-        cmdList.add(taskName);
-        if (params.length > 0) {
-            for (int i = 0; i < params.length; i++) {
-                cmdList.add("-P" + params[i]);
-            }
-        }
-    
-        System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
-        ProcessBuilder pb = new ProcessBuilder(cmdList);
-    
-        pb.directory(buildFile.getParentFile());
-        pb.redirectErrorStream();
-        final Process process = pb.start();
-    
-        process.waitFor();
-    
-        // try {
-        // int exitValue = process.exitValue();
-        // Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
-        // } catch (IllegalThreadStateException ignored) {
-        // }
-    
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
-                String line;
-                try {
-                    line = br.readLine();
-                    while (line != null) {
-                        System.out.println(line);
-                        line = br.readLine();
-                    }
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
+    public static void callGradleTask(File buildFile, String taskName, String[] params) throws Exception {
+
+        ProjectConnection connection = GradleConnector.newConnector().forProjectDirectory(buildFile.getParentFile())
+                .connect();
+
+        try {
+            BuildLauncher build = connection.newBuild();
+
+            // select tasks to run:
+            build.forTasks(taskName);
+
+            List<String> buildArgs = new ArrayList<String>();
+            // buildArgs.add("-b");
+            // buildArgs.add(buildFilePath);
+            buildArgs.add("-stacktrace");
+            buildArgs.add("-info");
+            if (params.length > 0) {
+                for (int i = 0; i < params.length; i++) {
+                    buildArgs.add("-P" + params[i]);
                 }
             }
-        }).start();
-    
+
+            build.withArguments(buildArgs.toArray(new String[] {}));
+
+            // if you want to listen to the progress events:
+            ProgressListener listener = null; // use your implementation
+
+            // kick the build off:
+            build.run();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java
index cefe8b3..c1815bf 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/SocketAdapter.java
@@ -1,6 +1,5 @@
 package org.apache.s4.fixtures;
 
-
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -11,14 +10,13 @@ import org.apache.s4.core.Stream;
 import org.apache.s4.wordcount.KeyValueEvent;
 import org.apache.s4.wordcount.StringEvent;
 
-
 public class SocketAdapter<T extends StringEvent> {
 
     static ServerSocket serverSocket;
 
     /**
-     * Listens to incoming sentence and forwards them to a sentence Stream.
-     * Each sentence is sent through a new socket connection
+     * Listens to incoming sentence and forwards them to a sentence Stream. Each sentence is sent through a new socket
+     * connection
      * 
      * @param stream
      * @throws IOException
@@ -39,13 +37,13 @@ public class SocketAdapter<T extends StringEvent> {
 
                         String line = in.readLine();
                         System.out.println("read: " + line);
-                        stream.put(stringEventFactory.create(line)) ;
+                        stream.put(stringEventFactory.create(line));
                         connectedSocket.close();
                     }
 
                 } catch (IOException e) {
                     e.printStackTrace();
-                    System.exit(-1);
+                    // System.exit(-1);
                 } finally {
                     if (in != null) {
                         try {
@@ -67,9 +65,9 @@ public class SocketAdapter<T extends StringEvent> {
         t.start();
 
     }
-    
-    public void close()  {
-        if(serverSocket !=null) {
+
+    public void close() {
+        if (serverSocket != null) {
             try {
                 serverSocket.close();
             } catch (Exception e) {
@@ -77,28 +75,27 @@ public class SocketAdapter<T extends StringEvent> {
             }
         }
     }
-    
+
     interface StringEventFactory<T> {
         T create(String string);
     }
-    
+
     public static class SentenceEventFactory implements StringEventFactory<StringEvent> {
 
         @Override
         public StringEvent create(String string) {
             return new StringEvent(string);
         }
-        
+
     }
-    
+
     public static class KeyValueEventFactory implements StringEventFactory<KeyValueEvent> {
 
         @Override
         public KeyValueEvent create(String string) {
             return new KeyValueEvent(string);
         }
-        
+
     }
 
-    
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
index cff6474..18aaf9c 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZkBasedAppModule.java
@@ -5,6 +5,8 @@ import java.lang.reflect.Type;
 
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Listener;
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.core.RemoteSenders;
 
 public class ZkBasedAppModule<T> extends ZkBasedClusterManagementTestModule {
     private final Class<?> appClass;
@@ -22,8 +24,9 @@ public class ZkBasedAppModule<T> extends ZkBasedClusterManagementTestModule {
         this.appClass = findAppClass();
     }
 
-    protected ZkBasedAppModule(Class<? extends Emitter> emitterClass, Class<? extends Listener> listenerClass) {
-        super(emitterClass, listenerClass);
+    protected ZkBasedAppModule(Class<? extends Emitter> emitterClass,
+            Class<? extends RemoteEmitter> remoteEmitterClass, Class<? extends Listener> listenerClass) {
+        super(emitterClass, remoteEmitterClass, listenerClass);
         this.appClass = findAppClass();
     }
 
@@ -31,5 +34,7 @@ public class ZkBasedAppModule<T> extends ZkBasedClusterManagementTestModule {
     protected void configure() {
         super.configure();
         bind(appClass);
+        bind(RemoteSenders.class);
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
index 4af6a4f..e493385 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountModule.java
@@ -1,7 +1,7 @@
 package org.apache.s4.wordcount;
 
-import org.apache.s4.fixtures.FileBasedClusterManagementTestModule;
+import org.apache.s4.fixtures.ZkBasedAppModule;
 
-public class WordCountModule extends FileBasedClusterManagementTestModule<WordCountApp> {
+public class WordCountModule extends ZkBasedAppModule<WordCountApp> {
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index e12853d..14c7abd 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -6,6 +6,8 @@ import java.util.concurrent.CountDownLatch;
 
 import junit.framework.Assert;
 
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.comm.tools.TaskSetup;
 import org.apache.s4.core.App;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.zookeeper.CreateMode;
@@ -17,6 +19,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.io.Resources;
+
 public class WordCountTest {
 
     public static final String SENTENCE_1 = "to be or not to be doobie doobie da";
@@ -53,6 +57,11 @@ public class WordCountTest {
     @Test
     public void testSimple() throws Exception {
         final ZooKeeper zk = CommTestUtils.createZkClient();
+        TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
+        PropertiesConfiguration config = new PropertiesConfiguration();
+        config.load(Resources.newInputStreamSupplier(Resources.getResource("default.s4.properties")).getInput());
+        taskSetup.clean("s4");
+        taskSetup.setup(config.getString("cluster.name"), 1, 10000);
 
         App.main(new String[] { WordCountModule.class.getName(), WordCountApp.class.getName() });
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java
deleted file mode 100644
index 322690a..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountModuleZk.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.apache.s4.wordcount.zk;
-
-import org.apache.s4.fixtures.ZkBasedAppModule;
-import org.apache.s4.wordcount.WordCountApp;
-
-public class WordCountModuleZk extends ZkBasedAppModule<WordCountApp> {
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
deleted file mode 100644
index 6f900c3..0000000
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/zk/WordCountTestZk.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package org.apache.s4.wordcount.zk;
-
-import static org.apache.s4.wordcount.WordCountTest.SENTENCE_1;
-import static org.apache.s4.wordcount.WordCountTest.SENTENCE_1_TOTAL_WORDS;
-import static org.apache.s4.wordcount.WordCountTest.SENTENCE_2;
-import static org.apache.s4.wordcount.WordCountTest.SENTENCE_2_TOTAL_WORDS;
-import static org.apache.s4.wordcount.WordCountTest.SENTENCE_3;
-
-import java.io.File;
-import java.util.concurrent.CountDownLatch;
-
-import junit.framework.Assert;
-
-import org.apache.s4.core.Main;
-import org.apache.s4.fixtures.CommTestUtils;
-import org.apache.s4.fixtures.ZkBasedTest;
-import org.apache.s4.wordcount.WordCountApp;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.Test;
-
-public class WordCountTestZk extends ZkBasedTest {
-    @Test
-    public void test() throws Exception {
-
-        final ZooKeeper zk = CommTestUtils.createZkClient();
-
-        Main.main(new String[] { WordCountModuleZk.class.getName(), WordCountApp.class.getName() });
-
-        CountDownLatch signalTextProcessed = new CountDownLatch(1);
-        CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
-
-        // add authorizations for processing
-        for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
-            zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-        }
-        CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
-        CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
-        CommTestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
-        signalTextProcessed.await();
-        File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
-        String s = CommTestUtils.readFile(results);
-        Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/resources/default.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/default.s4.properties b/subprojects/s4-core/src/test/resources/default.s4.properties
index 2235032..488bd21 100644
--- a/subprojects/s4-core/src/test/resources/default.s4.properties
+++ b/subprojects/s4-core/src/test/resources/default.s4.properties
@@ -1,14 +1,10 @@
 comm.queue_emmiter_size = 8000
 comm.queue_listener_size = 8000
-cluster.hosts = localhost
-cluster.ports = 5077
-cluster.lock_dir = {user.dir}/tmp
-cluster.name = s4-test-cluster
+cluster.name = cluster1
 cluster.zk_address = localhost:2181
 cluster.zk_session_timeout = 10000
 cluster.zk_connection_timeout = 10000
 s4.logger_level = DEBUG
-comm.module = org.apache.s4.core.CustomModule
 appsDir=/tmp/deploy-test
 tcp.partition.queue_size=1000
 comm.timeout=100

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
deleted file mode 100644
index 3a566f2..0000000
--- a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
+++ /dev/null
@@ -1,11 +0,0 @@
-comm.queue_emmiter_size = 8000
-comm.queue_listener_size = 8000
-cluster.hosts = localhost
-cluster.ports = 5077
-cluster.name = s4-test-cluster
-cluster.zk_address = localhost:2181
-cluster.zk_session_timeout = 10000
-cluster.zk_connection_timeout = 10000
-comm.module = org.apache.s4.core.adapter.AdapterModule
-s4.logger_level = TRACE
-appsDir=/tmp/deploy-test

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-core/src/test/resources/s4-counter-example.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/resources/s4-counter-example.properties b/subprojects/s4-core/src/test/resources/s4-counter-example.properties
deleted file mode 100644
index b60f40a..0000000
--- a/subprojects/s4-core/src/test/resources/s4-counter-example.properties
+++ /dev/null
@@ -1,7 +0,0 @@
-comm.queue_emmiter_size = 8000
-comm.queue_listener_size = 8000
-cluster.hosts = localhost
-cluster.ports = 5077
-cluster.lock_dir = /tmp
-
-

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java
index eb96b9d..a4ec6d6 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/counter/Module.java
@@ -1,17 +1,17 @@
 /*
  * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *          http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
  * either express or implied. See the License for the specific
  * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
+ * License. See accompanying LICENSE file.
  */
 package org.apache.s4.example.counter;
 
@@ -21,19 +21,6 @@ import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.ConfigurationUtils;
 import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
@@ -70,38 +57,40 @@ public class Module extends AbstractModule {
 
     @Override
     protected void configure() {
-        if (config == null)
-            loadProperties(binder());
-
-        bind(MyApp.class);
-
-        bind(Cluster.class);
-
-        /* Configure static assignment using a configuration file. */
-        bind(Assignment.class).to(AssignmentFromFile.class);
 
-        /* Configure a static cluster topology using a configuration file. */
-        bind(Topology.class).to(TopologyFromFile.class);
-
-        // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
-        // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
+        // TODO use the default module
+        // if (config == null)
+        // loadProperties(binder());
         //
-        // bind(Emitter.class).to(QueueingEmitter.class);
-        // bind(Listener.class).to(QueueingListener.class);
-
-        /* Use the Netty comm layer implementation. */
-        // bind(Emitter.class).to(NettyEmitter.class);
-        // bind(Listener.class).to(NettyListener.class);
-
-        /* Use a simple UDP comm layer implementation. */
-        bind(Emitter.class).to(UDPEmitter.class);
-        bind(Listener.class).to(UDPListener.class);
-
-        /* The hashing function to map keys top partitions. */
-        bind(Hasher.class).to(DefaultHasher.class);
-
-        /* Use Kryo to serialize events. */
-        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+        // bind(MyApp.class);
+        //
+        // bind(PhysicalCluster.class);
+        //
+        // /* Configure static assignment using a configuration file. */
+        // bind(Assignment.class).to(AssignmentFromFile.class);
+        //
+        // /* Configure a static cluster topology using a configuration file. */
+        // bind(Cluster.class).to(TopologyFromFile.class);
+        //
+        // // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
+        // // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
+        // //
+        // // bind(Emitter.class).to(QueueingEmitter.class);
+        // // bind(Listener.class).to(QueueingListener.class);
+        //
+        // /* Use the Netty comm layer implementation. */
+        // // bind(Emitter.class).to(NettyEmitter.class);
+        // // bind(Listener.class).to(NettyListener.class);
+        //
+        // /* Use a simple UDP comm layer implementation. */
+        // bind(Emitter.class).to(UDPEmitter.class);
+        // bind(Listener.class).to(UDPListener.class);
+        //
+        // /* The hashing function to map keys top partitions. */
+        // bind(Hasher.class).to(DefaultHasher.class);
+        //
+        // /* Use Kryo to serialize events. */
+        // bind(SerializerDeserializer.class).to(KryoSerDeser.class);
 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
index a8f034b..9e14031 100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/fluent/counter/Module.java
@@ -1,17 +1,17 @@
 /*
  * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *          http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
  * either express or implied. See the License for the specific
  * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
+ * License. See accompanying LICENSE file.
  */
 package org.apache.s4.example.fluent.counter;
 
@@ -21,20 +21,6 @@ import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.ConfigurationUtils;
 import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-import org.apache.s4.fluent.AppMaker;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
@@ -71,38 +57,40 @@ public class Module extends AbstractModule {
 
     @Override
     protected void configure() {
-        if (config == null)
-            loadProperties(binder());
-
-        bind(AppMaker.class).to(Main.class);
-
-        bind(Cluster.class);
-
-        /* Configure static assignment using a configuration file. */
-        bind(Assignment.class).to(AssignmentFromFile.class);
 
-        /* Configure a static cluster topology using a configuration file. */
-        bind(Topology.class).to(TopologyFromFile.class);
-
-        // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
-        // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
+        // TODO use the default module
+        // if (config == null)
+        // loadProperties(binder());
         //
-        // bind(Emitter.class).to(QueueingEmitter.class);
-        // bind(Listener.class).to(QueueingListener.class);
-
-        /* Use the Netty comm layer implementation. */
-        // bind(Emitter.class).to(NettyEmitter.class);
-        // bind(Listener.class).to(NettyListener.class);
-
-        /* Use a simple UDP comm layer implementation. */
-        bind(Emitter.class).to(UDPEmitter.class);
-        bind(Listener.class).to(UDPListener.class);
-
-        /* The hashing function to map keys top partitions. */
-        bind(Hasher.class).to(DefaultHasher.class);
-
-        /* Use Kryo to serialize events. */
-        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+        // bind(AppMaker.class).to(Main.class);
+        //
+        // bind(PhysicalCluster.class);
+        //
+        // /* Configure static assignment using a configuration file. */
+        // bind(Assignment.class).to(AssignmentFromFile.class);
+        //
+        // /* Configure a static cluster topology using a configuration file. */
+        // bind(Cluster.class).to(TopologyFromFile.class);
+        //
+        // // bind(Emitter.class).annotatedWith(Names.named("ll")).to(NettyEmitter.class);
+        // // bind(Listener.class).annotatedWith(Names.named("ll")).to(NettyListener.class);
+        // //
+        // // bind(Emitter.class).to(QueueingEmitter.class);
+        // // bind(Listener.class).to(QueueingListener.class);
+        //
+        // /* Use the Netty comm layer implementation. */
+        // // bind(Emitter.class).to(NettyEmitter.class);
+        // // bind(Listener.class).to(NettyListener.class);
+        //
+        // /* Use a simple UDP comm layer implementation. */
+        // bind(Emitter.class).to(UDPEmitter.class);
+        // bind(Listener.class).to(UDPListener.class);
+        //
+        // /* The hashing function to map keys top partitions. */
+        // bind(Hasher.class).to(DefaultHasher.class);
+        //
+        // /* Use Kryo to serialize events. */
+        // bind(SerializerDeserializer.class).to(KryoSerDeser.class);
 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-tools/s4-tools.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/s4-tools.gradle b/subprojects/s4-tools/s4-tools.gradle
index 68c728e..406d670 100644
--- a/subprojects/s4-tools/s4-tools.gradle
+++ b/subprojects/s4-tools/s4-tools.gradle
@@ -30,11 +30,28 @@ dependencies {
     compile libraries.jcommander
     compile libraries.zkclient
     compile libraries.commons_io
+    compile libraries.gradle_base_services
+    compile libraries.gradle_core
+    compile libraries.gradle_tooling_api
+    compile libraries.gradle_wrapper
+
 }
 
 apply plugin:'application'
 mainClassName = "org.apache.s4.tools.Tools"
 
+applicationDistribution.from(configurations.compile) {
+    // this is supposed to be done by default but is not anymore in 1.0-rc-3
+    include "*.jar"
+    into("lib")
+}
+
+startScripts {
+    classpath = files('$APP_HOME/lib/*')
+}
+
+
+
 run {
     // run doesn't yet directly accept command line parameters...
     if ( project.hasProperty('args') ) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
index 48f65c5..fd73428 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
@@ -38,7 +38,7 @@ public class DefineCluster {
     @Parameters(commandNames = "s4 newCluster", separators = "=", commandDescription = "Setup new S4 logical cluster")
     static class ZKServerArgs extends S4ArgsBase {
 
-        @Parameter(names = "-name", description = "S4 cluster name", required = true)
+        @Parameter(names = "-cluster", description = "S4 cluster name", required = true)
         String clusterName = "s4-test-cluster";
 
         @Parameter(names = "-nbTasks", description = "number of tasks for the cluster", required = true)

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
index df3de81..3183bd2 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
@@ -1,9 +1,6 @@
 package org.apache.s4.tools;
 
-import java.io.BufferedReader;
 import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -18,8 +15,13 @@ import org.apache.s4.comm.topology.ZNRecord;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
 import org.apache.s4.deploy.DistributedDeploymentManager;
 import org.apache.zookeeper.CreateMode;
+import org.gradle.tooling.BuildLauncher;
+import org.gradle.tooling.GradleConnector;
+import org.gradle.tooling.ProjectConnection;
 import org.slf4j.LoggerFactory;
 
+import sun.net.ProgressListener;
+
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import com.google.common.io.ByteStreams;
@@ -69,10 +71,11 @@ public class Deploy {
             final String uri = s4rToDeploy.toURI().toString();
             ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
             record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
-            zkClient.create("/" + deployArgs.clusterName + "/apps/" + deployArgs.appName, record, CreateMode.PERSISTENT);
+            zkClient.create("/s4/clusters/" + deployArgs.clusterName + "/apps/" + deployArgs.appName, record,
+                    CreateMode.PERSISTENT);
             logger.info("uploaded application [{}] to cluster [{}], using zookeeper znode [{}]", new String[] {
                     deployArgs.appName, deployArgs.clusterName,
-                    "/" + deployArgs.clusterName + "/apps/" + deployArgs.appName });
+                    "/s4/clusters/" + deployArgs.clusterName + "/apps/" + deployArgs.appName });
 
         } catch (Exception e) {
             LoggerFactory.getLogger(Deploy.class).error("Cannot deploy app", e);
@@ -110,57 +113,43 @@ public class Deploy {
 
         public static void exec(String gradlewExecPath, String buildFilePath, String taskName, String[] params)
                 throws Exception {
-            // Thread.sleep(10000);
-            List<String> cmdList = new ArrayList<String>();
-            // cmdList.add("sleep");
-            // cmdList.add("2");
-            // cmdList.add(";");
-            cmdList.add(gradlewExecPath);
-            // cmdList.add("-c");
-            // cmdList.add(gradlewFile.getParentFile().getAbsolutePath() + "/settings.gradle");
-            cmdList.add("-b");
-            cmdList.add(buildFilePath);
-            cmdList.add(taskName);
-            cmdList.add("-stacktrace");
-            cmdList.add("-info");
-            if (params.length > 0) {
-                for (int i = 0; i < params.length; i++) {
-                    cmdList.add("-P" + params[i]);
-                }
-            }
-            System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
-            ProcessBuilder pb = new ProcessBuilder(cmdList);
-
-            pb.directory(new File(buildFilePath).getParentFile());
-            pb.redirectErrorStream();
-
-            final Process process = pb.start();
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
-                    String line;
-                    try {
-                        line = br.readLine();
-                        while (line != null) {
-                            System.out.println(line);
-                            line = br.readLine();
-                        }
-                    } catch (IOException e) {
-                        throw new RuntimeException(e);
+
+            ProjectConnection connection = GradleConnector.newConnector()
+                    .forProjectDirectory(new File(buildFilePath).getParentFile()).connect();
+
+            try {
+                BuildLauncher build = connection.newBuild();
+
+                // select tasks to run:
+                build.forTasks(taskName);
+
+                List<String> buildArgs = new ArrayList<String>();
+                // buildArgs.add("-b");
+                // buildArgs.add(buildFilePath);
+                buildArgs.add("-stacktrace");
+                buildArgs.add("-info");
+                if (params.length > 0) {
+                    for (int i = 0; i < params.length; i++) {
+                        buildArgs.add("-P" + params[i]);
                     }
                 }
-            }).start();
 
-            process.waitFor();
+                logger.info(Arrays.toString(buildArgs.toArray()));
+
+                build.withArguments(buildArgs.toArray(new String[] {}));
+
 
-            // try {
-            // int exitValue = process.exitValue();
-            // Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
-            // } catch (IllegalThreadStateException ignored) {
-            // }
+                // if you want to listen to the progress events:
+                ProgressListener listener = null; // use your implementation
+                // build.addProgressListener(listener);
 
+                // kick the build off:
+                build.run();
+            } finally {
+                connection.close();
+            }
         }
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/test-apps/s4-counter/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/s4-counter/build.gradle b/test-apps/s4-counter/build.gradle
index 37b1daf..87512df 100644
--- a/test-apps/s4-counter/build.gradle
+++ b/test-apps/s4-counter/build.gradle
@@ -100,7 +100,7 @@ dependencies {
    compile project(":s4-base")
    compile project(":s4-comm")
    compile project(":s4-core")
-   
+
    /* Logging. */
    compile( libraries.slf4j )
    compile( libraries.logback_core )
@@ -138,10 +138,10 @@ appDependencies = ( configurations.compile )
 task s4r(type: Jar) {
    dependsOn jar
    from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
-   from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+   from { configurations.archives.allArtifacts.files.collect { it.isDirectory() ? it : zipTree(it) } }
    manifest = project.manifest
    extension = 's4r'
-   
+
    /* Set class name in manifest. Parse source files until we find a class that extends App.
     * Get fully qualified Java class name and set attribute in Manifest.
     */
@@ -154,9 +154,9 @@ task s4r(type: Jar) {
            }
        }
    }
-   
+
    if (appClassname == "UNKNOWN") {
-       
+
        println "Couldn't find App class in source files...aborting."
        exit(1)
    }
@@ -165,7 +165,7 @@ task s4r(type: Jar) {
 /* List the artifacts that will br added to the s4 archive (and explode if needed). */
 s4r << {
    appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
-   configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+   configurations.archives.allArtifacts.files.each { File file -> println 'Adding to s4 archive: ' + file.name }
 
    /* This is for debugging. */
    //configurations.s4All.each { File file -> println 's4All: ' + file.name }
@@ -173,7 +173,7 @@ s4r << {
 
    // more debugging statements.
    //sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
-   
+
 }
 
 /* Install the S4 archive to the install directory. */
@@ -195,12 +195,12 @@ def getAppClassname(file) {
    def classname = "UNKNOWN"
    lines= file.readLines()
    for(line in lines) {
-       
+
        def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
        if(pn) {
            packageName = pn[0][1] + "."
        }
-       
+
        def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
        if (an) {
            classname = packageName + an[0][1]

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/test-apps/s4-showtime/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/s4-showtime/build.gradle b/test-apps/s4-showtime/build.gradle
index 37b1daf..72ff887 100644
--- a/test-apps/s4-showtime/build.gradle
+++ b/test-apps/s4-showtime/build.gradle
@@ -100,7 +100,7 @@ dependencies {
    compile project(":s4-base")
    compile project(":s4-comm")
    compile project(":s4-core")
-   
+
    /* Logging. */
    compile( libraries.slf4j )
    compile( libraries.logback_core )
@@ -138,10 +138,10 @@ appDependencies = ( configurations.compile )
 task s4r(type: Jar) {
    dependsOn jar
    from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
-   from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+   from { configurations.archives.allArtifacts.files.collect { it.isDirectory() ? it : zipTree(it) } }
    manifest = project.manifest
    extension = 's4r'
-   
+
    /* Set class name in manifest. Parse source files until we find a class that extends App.
     * Get fully qualified Java class name and set attribute in Manifest.
     */
@@ -154,9 +154,9 @@ task s4r(type: Jar) {
            }
        }
    }
-   
+
    if (appClassname == "UNKNOWN") {
-       
+
        println "Couldn't find App class in source files...aborting."
        exit(1)
    }
@@ -165,7 +165,7 @@ task s4r(type: Jar) {
 /* List the artifacts that will br added to the s4 archive (and explode if needed). */
 s4r << {
    appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
-   configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+   configurations.archives.allArtifacts.files.each { File file -> println 'Adding to s4 archive: ' + file.name }
 
    /* This is for debugging. */
    //configurations.s4All.each { File file -> println 's4All: ' + file.name }
@@ -173,7 +173,7 @@ s4r << {
 
    // more debugging statements.
    //sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
-   
+
 }
 
 /* Install the S4 archive to the install directory. */
@@ -183,9 +183,7 @@ task installS4R (type: Copy) {
    into s4AppInstallDir
 }
 
-/* Generates the gradlew scripts.
-http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
-task wrapper(type: Wrapper) { gradleVersion = '1.0-milestone-3' }
+
 
 
 /* Parse source file to get the app classname so we can use it in the manifest.
@@ -195,12 +193,12 @@ def getAppClassname(file) {
    def classname = "UNKNOWN"
    lines= file.readLines()
    for(line in lines) {
-       
+
        def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
        if(pn) {
            packageName = pn[0][1] + "."
        }
-       
+
        def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
        if (an) {
            classname = packageName + an[0][1]

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/test-apps/simple-deployable-app-1/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-1/build.gradle b/test-apps/simple-deployable-app-1/build.gradle
index 37b1daf..87512df 100644
--- a/test-apps/simple-deployable-app-1/build.gradle
+++ b/test-apps/simple-deployable-app-1/build.gradle
@@ -100,7 +100,7 @@ dependencies {
    compile project(":s4-base")
    compile project(":s4-comm")
    compile project(":s4-core")
-   
+
    /* Logging. */
    compile( libraries.slf4j )
    compile( libraries.logback_core )
@@ -138,10 +138,10 @@ appDependencies = ( configurations.compile )
 task s4r(type: Jar) {
    dependsOn jar
    from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
-   from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+   from { configurations.archives.allArtifacts.files.collect { it.isDirectory() ? it : zipTree(it) } }
    manifest = project.manifest
    extension = 's4r'
-   
+
    /* Set class name in manifest. Parse source files until we find a class that extends App.
     * Get fully qualified Java class name and set attribute in Manifest.
     */
@@ -154,9 +154,9 @@ task s4r(type: Jar) {
            }
        }
    }
-   
+
    if (appClassname == "UNKNOWN") {
-       
+
        println "Couldn't find App class in source files...aborting."
        exit(1)
    }
@@ -165,7 +165,7 @@ task s4r(type: Jar) {
 /* List the artifacts that will br added to the s4 archive (and explode if needed). */
 s4r << {
    appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
-   configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+   configurations.archives.allArtifacts.files.each { File file -> println 'Adding to s4 archive: ' + file.name }
 
    /* This is for debugging. */
    //configurations.s4All.each { File file -> println 's4All: ' + file.name }
@@ -173,7 +173,7 @@ s4r << {
 
    // more debugging statements.
    //sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
-   
+
 }
 
 /* Install the S4 archive to the install directory. */
@@ -195,12 +195,12 @@ def getAppClassname(file) {
    def classname = "UNKNOWN"
    lines= file.readLines()
    for(line in lines) {
-       
+
        def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
        if(pn) {
            packageName = pn[0][1] + "."
        }
-       
+
        def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
        if (an) {
            classname = packageName + an[0][1]

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/test-apps/simple-deployable-app-2/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/simple-deployable-app-2/build.gradle b/test-apps/simple-deployable-app-2/build.gradle
index 37b1daf..87512df 100644
--- a/test-apps/simple-deployable-app-2/build.gradle
+++ b/test-apps/simple-deployable-app-2/build.gradle
@@ -100,7 +100,7 @@ dependencies {
    compile project(":s4-base")
    compile project(":s4-comm")
    compile project(":s4-core")
-   
+
    /* Logging. */
    compile( libraries.slf4j )
    compile( libraries.logback_core )
@@ -138,10 +138,10 @@ appDependencies = ( configurations.compile )
 task s4r(type: Jar) {
    dependsOn jar
    from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
-   from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+   from { configurations.archives.allArtifacts.files.collect { it.isDirectory() ? it : zipTree(it) } }
    manifest = project.manifest
    extension = 's4r'
-   
+
    /* Set class name in manifest. Parse source files until we find a class that extends App.
     * Get fully qualified Java class name and set attribute in Manifest.
     */
@@ -154,9 +154,9 @@ task s4r(type: Jar) {
            }
        }
    }
-   
+
    if (appClassname == "UNKNOWN") {
-       
+
        println "Couldn't find App class in source files...aborting."
        exit(1)
    }
@@ -165,7 +165,7 @@ task s4r(type: Jar) {
 /* List the artifacts that will br added to the s4 archive (and explode if needed). */
 s4r << {
    appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
-   configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+   configurations.archives.allArtifacts.files.each { File file -> println 'Adding to s4 archive: ' + file.name }
 
    /* This is for debugging. */
    //configurations.s4All.each { File file -> println 's4All: ' + file.name }
@@ -173,7 +173,7 @@ s4r << {
 
    // more debugging statements.
    //sourceSets.main.compileClasspath.each { File file -> println 'compileClasspath: ' + file.name }
-   
+
 }
 
 /* Install the S4 archive to the install directory. */
@@ -195,12 +195,12 @@ def getAppClassname(file) {
    def classname = "UNKNOWN"
    lines= file.readLines()
    for(line in lines) {
-       
+
        def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
        if(pn) {
            packageName = pn[0][1] + "."
        }
-       
+
        def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
        if (an) {
            classname = packageName + an[0][1]

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/test-apps/twitter-adapter/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/build.gradle b/test-apps/twitter-adapter/build.gradle
index 100837d..5e7b2dc 100644
--- a/test-apps/twitter-adapter/build.gradle
+++ b/test-apps/twitter-adapter/build.gradle
@@ -117,7 +117,7 @@ appDependencies = ( configurations.compile )
 task s4r(type: Jar) {
    dependsOn jar
    from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
-   from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+   from { configurations.archives.allArtifacts.files.collect { it.isDirectory() ? it : zipTree(it) } }
    manifest = project.manifest
    extension = 's4r'
 
@@ -144,7 +144,7 @@ task s4r(type: Jar) {
 /* List the artifacts that will br added to the s4 archive (and explode if needed). */
 s4r << {
    appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
-   configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+   configurations.archives.allArtifacts.files.each { println 'Adding to s4 archive: ' + it.name }
 
    /* This is for debugging. */
    //configurations.s4All.each { File file -> println 's4All: ' + file.name }
@@ -180,10 +180,10 @@ def getAppClassname(file) {
            packageName = pn[0][1] + "."
        }
 
-       def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+Adapter.*\{/
+       def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
        if (an) {
            classname = packageName + an[0][1]
-           println "Found adapter class name: " + classname
+           println "Found app class name: " + classname
            break
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
index 102ca10..0827e3a 100644
--- a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
+++ b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
@@ -6,10 +6,9 @@ import java.net.ServerSocket;
 import java.util.Properties;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.I0Itec.zkclient.ZkClient;
 import org.apache.s4.base.Event;
-import org.apache.s4.core.adapter.Adapter;
-import org.apache.s4.core.adapter.RemoteStream;
+import org.apache.s4.core.App;
+import org.apache.s4.core.RemoteStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -20,11 +19,9 @@ import twitter4j.TwitterStream;
 import twitter4j.TwitterStreamFactory;
 import twitter4j.conf.ConfigurationBuilder;
 
-public class TwitterInputAdapter extends Adapter {
+public class TwitterInputAdapter extends App {
 
-    private ZkClient zkClient;
     private static Logger logger = LoggerFactory.getLogger(TwitterInputAdapter.class);
-    private String urlString = "https://stream.twitter.com/1/statuses/sample.json";
 
     public TwitterInputAdapter() {
     }
@@ -47,7 +44,7 @@ public class TwitterInputAdapter extends Adapter {
 
     @Override
     protected void onInit() {
-        remoteStream = createRemoteStream("RawStatus");
+        remoteStream = createOutputStream("RawStatus");
         t = new Thread(new Dequeuer());
     }
 
@@ -114,7 +111,7 @@ public class TwitterInputAdapter extends Adapter {
 
         @Override
         public void run() {
-            while (!Thread.interrupted()) {
+            while (true) {
                 try {
                     Status status = messageQueue.take();
                     Event event = new Event();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/test-apps/twitter-counter/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/build.gradle b/test-apps/twitter-counter/build.gradle
index e10eae0..b93d0b9 100644
--- a/test-apps/twitter-counter/build.gradle
+++ b/test-apps/twitter-counter/build.gradle
@@ -115,7 +115,7 @@ appDependencies = ( configurations.compile )
 task s4r(type: Jar) {
    dependsOn jar
    from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
-   from { configurations.archives.allArtifactFiles.collect { it.isDirectory() ? it : zipTree(it) } }
+   from { configurations.archives.allArtifacts.files.collect { zipTree(it) } }
    manifest = project.manifest
    extension = 's4r'
 
@@ -139,10 +139,11 @@ task s4r(type: Jar) {
    }
 }
 
+
 /* List the artifacts that will br added to the s4 archive (and explode if needed). */
 s4r << {
    appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
-   configurations.archives.allArtifactFiles.each { File file -> println 'Adding to s4 archive: ' + file.name }
+   configurations.archives.allArtifacts.files.each { println 'Adding to s4 archive: ' + it.name }
 
    /* This is for debugging. */
    //configurations.s4All.each { File file -> println 's4All: ' + file.name }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/99b6f048/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
index e854395..90c3199 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
@@ -58,7 +58,7 @@ public class TwitterCounterApp extends App {
             TopicExtractorPE topicExtractorPE = createPE(TopicExtractorPE.class);
             topicExtractorPE.setDownStream(topicSeenStream);
             topicExtractorPE.setSingleton(true);
-            createStream("RawStatus", topicExtractorPE);
+            createInputStream("RawStatus", topicExtractorPE);
 
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -67,10 +67,6 @@ public class TwitterCounterApp extends App {
 
     @Override
     protected void onStart() {
-        // try {
-        // t.start();
-        // } catch (Exception e) {
-        // throw new RuntimeException(e);
-        // }
+
     }
 }