You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/15 16:06:01 UTC

[21/22] git commit: inter-app communications

inter-app communications

- + deployment facilities


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/17c5ab80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/17c5ab80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/17c5ab80

Branch: refs/heads/piper
Commit: 17c5ab80eb1d510defe795021479d1ca0cd4c190
Parents: 403162e
Author: Matthieu Morel <mm...@apache.org>
Authored: Sat Mar 24 14:25:51 2012 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Mar 29 13:22:42 2012 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/s4/base/Event.java    |   11 +-
 .../org/apache/s4/comm/serialize/KryoSerDeser.java |   37 ++--
 .../java/org/apache/s4/comm/tools/TaskSetup.java   |    2 +-
 .../src/main/java/org/apache/s4/core/App.java      |   22 +-
 .../main/java/org/apache/s4/core/CustomModule.java |   29 +--
 .../main/java/org/apache/s4/core/EventSource.java  |    8 +-
 .../src/main/java/org/apache/s4/core/Main.java     |   69 ++++--
 .../src/main/java/org/apache/s4/core/Server.java   |  159 +++++++++++----
 .../src/main/java/org/apache/s4/core/Stream.java   |   11 +-
 .../main/java/org/apache/s4/core/Streamable.java   |    5 +
 .../s4/deploy/DistributedDeploymentManager.java    |    4 +-
 .../java/org/apache/s4/deploy/util/DeployApp.java  |  151 ++++++++++++++
 .../test/java/org/apache/s4/fixtures/ZKServer.java |   55 +++++
 13 files changed, 443 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
index 0a71e29..2e023cb 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
@@ -1,17 +1,17 @@
 /*
  * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *          http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
  * either express or implied. See the License for the specific
  * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
+ * License. See accompanying LICENSE file.
  */
 package org.apache.s4.base;
 
@@ -167,7 +167,10 @@ public class Event {
     }
 
     /* Helper data object. */
-    private class Data<T> {
+    private static class Data<T> {
+
+        Data() {
+        }
 
         private T value;
         private Class<T> type;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
index 96d312e..fa66340 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
@@ -1,20 +1,20 @@
 package org.apache.s4.comm.serialize;
 
-
 import java.nio.ByteBuffer;
 
 import org.apache.s4.base.SerializerDeserializer;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.ObjectBuffer;
+import com.esotericsoftware.kryo.serialize.ClassSerializer;
 import com.esotericsoftware.kryo.serialize.SimpleSerializer;
 
 public class KryoSerDeser implements SerializerDeserializer {
 
     private Kryo kryo = new Kryo();
-    
+
     private int initialBufferSize = 2048;
-    private int maxBufferSize = 256*1024;
+    private int maxBufferSize = 256 * 1024;
 
     public void setInitialBufferSize(int initialBufferSize) {
         this.initialBufferSize = initialBufferSize;
@@ -27,22 +27,21 @@ public class KryoSerDeser implements SerializerDeserializer {
     public KryoSerDeser() {
         kryo.setRegistrationOptional(true);
 
+        kryo.register(Class.class, new ClassSerializer(kryo));
         // UUIDs don't have a no-arg constructor.
-        kryo.register(java.util.UUID.class,
-                      new SimpleSerializer<java.util.UUID>() {
-                          @Override
-                          public java.util.UUID read(ByteBuffer buf) {
-                              return new java.util.UUID(buf.getLong(),
-                                                        buf.getLong());
-                          }
-
-                          @Override
-                          public void write(ByteBuffer buf, java.util.UUID uuid) {
-                              buf.putLong(uuid.getMostSignificantBits());
-                              buf.putLong(uuid.getLeastSignificantBits());
-                          }
-
-                      });
+        kryo.register(java.util.UUID.class, new SimpleSerializer<java.util.UUID>() {
+            @Override
+            public java.util.UUID read(ByteBuffer buf) {
+                return new java.util.UUID(buf.getLong(), buf.getLong());
+            }
+
+            @Override
+            public void write(ByteBuffer buf, java.util.UUID uuid) {
+                buf.putLong(uuid.getMostSignificantBits());
+                buf.putLong(uuid.getLeastSignificantBits());
+            }
+
+        });
     }
 
     @Override
@@ -56,4 +55,4 @@ public class KryoSerDeser implements SerializerDeserializer {
         ObjectBuffer buffer = new ObjectBuffer(kryo, initialBufferSize, maxBufferSize);
         return buffer.writeClassAndObject(message);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
index ab14449..193daff 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
@@ -25,7 +25,7 @@ public class TaskSetup {
     public void setup(String clusterName, int tasks) {
         zkclient.createPersistent("/" + clusterName + "/tasks", true);
         zkclient.createPersistent("/" + clusterName + "/process", true);
-        zkclient.createPersistent("/" + clusterName, true);
+        zkclient.createPersistent("/" + clusterName + "/apps", true);
         for (int i = 0; i < tasks; i++) {
             String taskId = "Task-" + i;
             ZNRecord record = new ZNRecord(taskId);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 6249306..0939d8e 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -41,7 +41,7 @@ public abstract class App {
     final private List<ProcessingElement> pePrototypes = new ArrayList<ProcessingElement>();
 
     /* All the internal streams in this app. */
-    final private List<Streamable> streams = new ArrayList<Streamable>();
+    final private List<Streamable<Event>> streams = new ArrayList<Streamable<Event>>();
 
     /* All the the event sources exported by this app. */
     final private List<EventSource> eventSources = new ArrayList<EventSource>();
@@ -107,25 +107,27 @@ public abstract class App {
     }
 
     /* Returns list of internal streams. Should only be used within the core package. */
-    List<Streamable> getStreams() {
+    // TODO visibility
+    public List<Streamable<Event>> getStreams() {
         return streams;
     }
 
     /* Returns list of the event sources to be exported. Should only be used within the core package. */
-    List<EventSource> getEventSources() {
+    // TODO visibility
+    public List<EventSource> getEventSources() {
         return eventSources;
     }
 
     protected abstract void onStart();
 
-    protected void start() {
+    public final void start() {
 
         // logger.info("Prepare to start App [{}].", getClass().getName());
         //
-        // /* Start all streams. */
-        // for (Streamable<? extends Event> stream : getStreams()) {
-        // stream.start();
-        // }
+        /* Start all streams. */
+        for (Streamable<? extends Event> stream : getStreams()) {
+            stream.start();
+        }
         //
         // /* Allow abstract PE to initialize. */
         // for (ProcessingElement pe : getPePrototypes()) {
@@ -138,14 +140,14 @@ public abstract class App {
 
     protected abstract void onInit();
 
-    protected void init() {
+    public final void init() {
 
         onInit();
     }
 
     protected abstract void onClose();
 
-    protected void close() {
+    public final void close() {
 
         onClose();
         removeAll();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/main/java/org/apache/s4/core/CustomModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/CustomModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/CustomModule.java
index a7fc7d8..17529d3 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/CustomModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/CustomModule.java
@@ -1,8 +1,5 @@
 package org.apache.s4.core;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -37,11 +34,11 @@ import com.google.inject.name.Names;
  */
 public class CustomModule extends AbstractModule {
 
-    File configFile;
+    InputStream configFileInputStream;
     private PropertiesConfiguration config;
 
-    public CustomModule(File customConfigFile) {
-        this.configFile = customConfigFile;
+    public CustomModule(InputStream configFileInputStream) {
+        this.configFileInputStream = configFileInputStream;
     }
 
     @Override
@@ -49,6 +46,12 @@ public class CustomModule extends AbstractModule {
         if (config == null) {
             loadProperties(binder());
         }
+        if (configFileInputStream != null) {
+            try {
+                configFileInputStream.close();
+            } catch (IOException ignored) {
+            }
+        }
 
         int numHosts = config.getList("cluster.hosts").size();
         boolean isCluster = numHosts > 1 ? true : false;
@@ -73,11 +76,9 @@ public class CustomModule extends AbstractModule {
     }
 
     private void loadProperties(Binder binder) {
-        InputStream is = null;
         try {
-            is = new FileInputStream(configFile);
             config = new PropertiesConfiguration();
-            config.load(is);
+            config.load(configFileInputStream);
 
             System.out.println(ConfigurationUtils.toString(config));
             // TODO - validate properties.
@@ -87,16 +88,6 @@ public class CustomModule extends AbstractModule {
         } catch (ConfigurationException e) {
             binder.addError(e);
             e.printStackTrace();
-        } catch (FileNotFoundException e) {
-            binder.addError(e);
-            e.printStackTrace();
-        } finally {
-            if (is != null) {
-                try {
-                    is.close();
-                } catch (IOException ignored) {
-                }
-            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
index edd64ef..1f89d86 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
@@ -53,7 +53,7 @@ public class EventSource implements Streamable {
      */
     @Override
     public void put(Event event) {
-        for (Streamable stream : streamables) {
+        for (Streamable<Event> stream : streamables) {
             stream.put(event);
         }
     }
@@ -91,4 +91,10 @@ public class EventSource implements Streamable {
     public Set<Streamable> getStreamables() {
         return streamables;
     }
+
+    @Override
+    public void start() {
+        // TODO Auto-generated method stub
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index ed4c0ce..a55511c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -4,13 +4,13 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.InputStream;
+import java.net.URL;
 import java.util.Arrays;
 
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.io.Resources;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -42,17 +42,10 @@ public class Main {
         }
     }
 
-    private static void startCustomS4Node(String s4PropertiesFilePath) {
-        try {
-            Thread.sleep(5000);
-        } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
+    private static void startCustomS4Node(String s4PropertiesFilePath) throws FileNotFoundException {
         // TODO that's quite inconvenient anyway: we still need to specify the comm module in the config
         // file passed as a parameter...
-        Injector injector = Guice.createInjector(new CustomModule(new File(s4PropertiesFilePath)));
-
+        Injector injector = Guice.createInjector(new CustomModule(new FileInputStream(new File(s4PropertiesFilePath))));
         startServer(logger, injector);
     }
 
@@ -83,7 +76,7 @@ public class Main {
      * 
      */
     private static void startDevelopmentMode(String[] args) {
-        if (args.length != 2) {
+        if (args.length < 2 && args.length > 3) {
             usageForDevelopmentMode(args);
         }
         logger.info("Starting S4 app with module [{}] and app [{}]", args[0], args[1]);
@@ -93,15 +86,41 @@ public class Main {
                 logger.error("Module class [{}] is not an instance of [{}]", args[0], AbstractModule.class.getName());
                 System.exit(-1);
             }
-            injector = Guice.createInjector((AbstractModule) Class.forName(args[0]).newInstance());
-        } catch (InstantiationException e) {
-            logger.error("Invalid app class [{}] : {}", args[0], e.getMessage());
-            System.exit(-1);
-        } catch (IllegalAccessException e) {
-            logger.error("Invalid app class [{}] : {}", args[0], e.getMessage());
-            System.exit(-1);
+            if (args.length == 3) {
+                if (!(new File(args[2]).exists())) {
+                    logger.error("Cannot find S4 config file {}", args[2]);
+                    System.exit(-1);
+                }
+                try {
+                    injector = Guice.createInjector((AbstractModule) Class.forName(args[0])
+                            .getConstructor(InputStream.class).newInstance(new FileInputStream(new File(args[2]))));
+                } catch (Exception e) {
+                    logger.error("Module loading error", e);
+                    System.exit(-1);
+                }
+            } else {
+                URL defaultS4Config = null;
+                try {
+                    defaultS4Config = Resources.getResource("/default.s4.properties");
+                } catch (IllegalArgumentException e) {
+                    logger.error(
+                            "Module loading error: cannot load default s4 configuration file default.s4.properties from classpath",
+                            e);
+                    System.exit(-1);
+                }
+
+                try {
+                    injector = Guice.createInjector((AbstractModule) Class.forName(args[0]).getConstructor(File.class)
+                            .newInstance(Resources.newInputStreamSupplier(defaultS4Config).getInput()));
+                } catch (Exception e) {
+                    logger.error(
+                            "Module loading error: cannot load default s4 configuration file default.s4.properties from classpath",
+                            e);
+                    System.exit(-1);
+                }
+            }
         } catch (ClassNotFoundException e) {
-            logger.error("Invalid app class [{}] : {}", args[0], e.getMessage());
+            logger.error("Invalid module class [{}]", args[0], e);
             System.exit(-1);
         }
         App app;
@@ -110,13 +129,17 @@ public class Main {
             app.init();
             app.start();
         } catch (ClassNotFoundException e) {
-            logger.error("Invalid S4 application class [{}] : {}", args[0], e.getMessage());
+            logger.error("Invalid S4 application class [{}] : {}", args[1], e.getMessage());
         }
     }
 
     static void usageForDevelopmentMode(String[] args) {
-        logger.info("Invalid parameters " + Arrays.toString(args)
-                + " \nUsage: java <classpath+params> org.apache.s4.core.Main <appClassName> <moduleClassName>");
+        logger.info("Invalid parameters "
+                + Arrays.toString(args)
+                + " \nUsage: java <classpath+params> org.apache.s4.core.Main <moduleClassName> <appClassName>"
+                + "\n(this uses default.s4.properties from the classpath)"
+                + "\nor:"
+                + " java <classpath+params> org.apache.s4.core.Main <moduleClassName> <appClassName> <pathToConfigFile>");
         System.exit(-1);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index 35328aa..dd80e51 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -2,19 +2,26 @@ package org.apache.s4.core;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.jar.Attributes.Name;
 import java.util.jar.JarFile;
 
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.apache.s4.base.Event;
 import org.apache.s4.base.util.S4RLoader;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
 import org.apache.s4.deploy.DeploymentManager;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import ch.qos.logback.classic.Level;
 
+import com.google.common.collect.Maps;
 import com.google.common.io.PatternFilenameFilter;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
@@ -35,7 +42,9 @@ public class Server {
     public static final String MANIFEST_S4_APP_CLASS = "S4-App-Class";
     // local applications directory
     private final String appsDir;
-    List<App> apps = new ArrayList<App>();
+    Map<String, App> apps = Maps.newHashMap();
+    Map<String, Streamable> streams = Maps.newHashMap();
+    Map<String, EventSource> eventSources = Maps.newHashMap();
     CountDownLatch signalOneAppLoaded = new CountDownLatch(1);
 
     private Injector injector;
@@ -43,15 +52,60 @@ public class Server {
     @Inject
     private DeploymentManager deploymentManager;
 
+    private String clusterName;
+
+    private ZkClient zkClient;
+
     /**
-     * 
+     *
      */
     @Inject
     public Server(@Named("comm.module") String commModuleName, @Named("s4.logger_level") String logLevel,
-            @Named("appsDir") String appsDir) {
+            @Named("appsDir") String appsDir, @Named("cluster.name") String clusterName,
+            @Named("cluster.zk_address") String zookeeperAddress,
+            @Named("cluster.zk_session_timeout") int sessionTimeout,
+            @Named("cluster.zk_connection_timeout") int connectionTimeout) {
         this.commModuleName = commModuleName;
         this.logLevel = logLevel;
         this.appsDir = appsDir;
+        this.clusterName = clusterName;
+
+        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
+        zkClient.setZkSerializer(new ZNRecordSerializer());
+        initializeZkStreams(clusterName);
+        watchZkStreams();
+    }
+
+    private void watchZkStreams() {
+        zkClient.subscribeChildChanges("/" + clusterName + "/streams/producers", new IZkChildListener() {
+
+            @Override
+            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+                // synchronized (streams) {
+                // for (String child : currentChilds) {
+                // if (apps.containsKey(child)) {
+                // eventSources.put(paramK, paramV)
+                // }
+                // }
+                // }
+
+            }
+        });
+
+    }
+
+    private void initializeZkStreams(String clusterName) {
+        try {
+            zkClient.createPersistent("/" + clusterName + "/streams");
+            zkClient.createPersistent("/" + clusterName + "/streams/producers");
+            zkClient.createPersistent("/" + clusterName + "/streams/consumers");
+        } catch (ZkException e) {
+            if (e.getCause() instanceof KeeperException.NodeExistsException) {
+                // ignore: this stream already exists
+            } else {
+                throw e;
+            }
+        }
     }
 
     public void start() throws Exception {
@@ -72,7 +126,6 @@ public class Server {
 
         /* After some indirection we get the injector. */
         injector = Guice.createInjector(module);
-        Thread.sleep(10000);
 
         File[] s4rFiles = new File(appsDir).listFiles(new PatternFilenameFilter("\\w+\\.s4r"));
         for (File s4rFile : s4rFiles) {
@@ -80,39 +133,19 @@ public class Server {
         }
 
         /* Now init + start apps. TODO: implement dynamic loading/unloading using ZK. */
-        for (App app : apps) {
-            logger.info("Starting app " + app.getClass().getName());
-            startApp(app);
+        for (Map.Entry<String, App> appEntry : apps.entrySet()) {
+            logger.info("Initializing app " + appEntry.getValue().getClass().getName());
+            appEntry.getValue().init();
         }
 
-        EventSource savedES = null;
-        App consumerApp = null;
-        for (App app : apps) {
-            logger.info("Resolving dependencies for " + app.getClass().getName());
-            List<EventSource> eventSources = app.getEventSources();
-            if (eventSources.size() > 0) {
-                EventSource es = eventSources.get(0);
-                logger.info("App [{}] exports event source [{}].", app.getClass().getName(), es.getName());
-                savedES = es; // hardcoded
-            } else {
-
-                // hardcoded (one app has event source the other one doesn't.
-                consumerApp = app;
-            }
+        for (Map.Entry<String, App> appEntry : apps.entrySet()) {
+            logger.info("Initializing app streams " + appEntry.getValue().getClass().getName());
+            updateStreams(appEntry.getValue(), appEntry.getKey());
         }
-        // hardcoded: make savedApp subscribe to savedES
-        logger.info("The consumer app is [{}].", consumerApp.getClass().getName());
-        // get the list of streams and find the one we are looking for that has name: "I need the time."
-        List<Streamable> streams = consumerApp.getStreams();
-        for (Streamable aStream : streams) {
-
-            String streamName = aStream.getName();
-
-            if (streamName.contentEquals("I need the time.")) {
-                logger.info("Subscribing stream [{}] from app [{}] to event source.", streamName, consumerApp
-                        .getClass().getName());
-                savedES.subscribeStream(aStream);
-            }
+
+        for (Map.Entry<String, App> appEntry : apps.entrySet()) {
+            logger.info("Starting app " + appEntry.getKey() + "/" + appEntry.getValue().getClass().getName());
+            appEntry.getValue().start();
         }
 
         logger.info("Completed local applications startup.");
@@ -131,6 +164,54 @@ public class Server {
     }
 
     public App loadApp(File s4r) {
+        logger.info("Local app deployment: using s4r file name [{}] as application name",
+                s4r.getName().substring(0, s4r.getName().indexOf(".s4r")));
+        return loadApp(s4r, s4r.getName().substring(0, s4r.getName().indexOf(".s4r")));
+    }
+
+    public void updateStreams(App app, String appName) {
+        // register streams
+        List<Streamable<Event>> appStreams = app.getStreams();
+        for (Streamable<Event> streamable : appStreams) {
+            if (streams.containsKey(streamable.getName())) {
+                logger.error("Application {} defines the stream {} but there is already a stream with that name",
+                        new String[] { appName, streamable.getName() });
+            } else {
+                // zkClient.createEphemeral("/" + clusterName + "/streams/producers/" + appName);
+                logger.debug("Adding stream {} for app {}");
+                streams.put(streamable.getName(), streamable);
+                if (eventSources.containsKey(streamable.getName())) {
+                    logger.debug("Connecting matching event source for stream {} for app {}", streamable.getName(),
+                            appName);
+                    eventSources.get(streamable.getName()).subscribeStream(streams.get(streamable.getName()));
+                }
+            }
+        }
+
+        List<EventSource> appEventSources = app.getEventSources();
+        for (EventSource eventSource : appEventSources) {
+            if (eventSources.containsKey(eventSource.getName())) {
+                logger.error(
+                        "Application {} defines the event source {} but there is already an event source with that name, from app {}",
+                        new String[] { appName, eventSource.getName(),
+                                String.valueOf(streams.get(eventSource.getName())) });
+            } else {
+                // zkClient.createEphemeral("/" + clusterName + "/streams/consumers/" + appName);
+                logger.debug("adding event source {} from app {}", eventSource.getName(), appName);
+                eventSources.put(eventSource.getName(), eventSource);
+            }
+            if (streams.containsKey(eventSource.getName())) {
+                logger.debug("Connecting matching stream from app {} to event source {}", appName,
+                        eventSource.getName());
+                eventSource.subscribeStream(streams.get(eventSource.getName()));
+            }
+        }
+
+    }
+
+    public App loadApp(File s4r, String appName) {
+
+        // TODO handle application upgrade
 
         Sender sender = injector.getInstance(Sender.class);
         Receiver receiver = injector.getInstance(Receiver.class);
@@ -160,7 +241,7 @@ public class Server {
             }
 
             app.setCommLayer(sender, receiver);
-            apps.add(app);
+            App previous = apps.put(appName, app);
             logger.info("Loaded application from file {}", s4r.getAbsolutePath());
             signalOneAppLoaded.countDown();
             return app;
@@ -171,8 +252,12 @@ public class Server {
 
     }
 
-    public void startApp(App app) {
+    public void startApp(App app, String appName, String clusterName) {
+
         app.init();
+
+        updateStreams(app, appName);
+
         app.start();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index bd1d351..a328941 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -1,17 +1,17 @@
 /*
  * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *          http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
  * either express or implied. See the License for the specific
  * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
+ * License. See accompanying LICENSE file.
  */
 package org.apache.s4.core;
 
@@ -43,7 +43,7 @@ public class Stream<T extends Event> implements Runnable, Streamable {
     final private Key<T> key;
     final private ProcessingElement[] targetPEs;
     final private BlockingQueue<Event> queue = new ArrayBlockingQueue<Event>(CAPACITY);
-    final private Thread thread;
+    private Thread thread;
     final private Sender sender;
     final private Receiver receiver;
     final private int id;
@@ -78,6 +78,9 @@ public class Stream<T extends Event> implements Runnable, Streamable {
         this.sender = app.getSender();
         this.receiver = app.getReceiver();
         this.targetPEs = processingElements;
+    }
+
+    public void start() {
 
         /* Start streaming. */
         thread = new Thread(this, name);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java
index ecbae5e..cfe54dd 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java
@@ -10,6 +10,11 @@ import org.apache.s4.base.Event;
 public interface Streamable<T extends Event> {
 
     /**
+     * Starting the stream starts the associated dequeuing thread.
+     */
+    void start();
+
+    /**
      * Put an event into the streams.
      * 
      * @param event

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
index a152672..4f7ab86 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
@@ -119,10 +119,10 @@ public class DistributedDeploymentManager implements DeploymentManager {
             App loaded = server.loadApp(s4rFile);
             if (loaded != null) {
                 logger.info("Successfully installed application {}", newApp);
-                server.startApp(loaded);
+                server.startApp(loaded, newApp, clusterName);
             } else {
                 throw new DeploymentFailedException("Cannot deploy application [" + newApp + "] from URI ["
-                        + uri.toString() + "] : cannot load application");
+                        + uri.toString() + "] : cannot start application");
             }
             // TODO sync with other nodes? (e.g. wait for other apps deployed before starting?
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
new file mode 100644
index 0000000..d8b77a4
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
@@ -0,0 +1,151 @@
+package org.apache.s4.deploy.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+
+public class DeployApp {
+
+    private static File tmpAppsDir;
+
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+
+        DeployAppArgs appArgs = new DeployAppArgs();
+        JCommander jc = new JCommander(appArgs);
+        // configure log4j for Zookeeper
+        BasicConfigurator.configure();
+        Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
+        Logger.getLogger("org.I0Itec").setLevel(Level.ERROR);
+
+        try {
+            jc.parse(args);
+        } catch (Exception e) {
+            jc.usage();
+            System.exit(-1);
+        }
+        try {
+            ZkClient zkClient = new ZkClient(appArgs.zkConnectionString);
+            zkClient.setZkSerializer(new ZNRecordSerializer());
+
+            tmpAppsDir = Files.createTempDir();
+
+            // File gradlewFile = CoreTestUtils.findGradlewInRootDir();
+
+            // CoreTestUtils.callGradleTask(gradlewFile, new File(appArgs.gradleBuildFilePath), "installS4R",
+            // new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
+            ExecGradle.exec(appArgs.gradleExecPath, appArgs.gradleBuildFilePath, "installS4R",
+                    new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
+
+            File s4rToDeploy = File.createTempFile("testapp" + System.currentTimeMillis(), "s4r");
+
+            Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath() + "/"
+                    + appArgs.appName + ".s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
+
+            final String uri = s4rToDeploy.toURI().toString();
+            ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
+            record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
+            zkClient.create("/" + appArgs.clusterName + "/apps/" + appArgs.appName, record, CreateMode.PERSISTENT);
+
+        } catch (Exception e) {
+            LoggerFactory.getLogger(DeployApp.class).error("Cannot deploy app", e);
+        }
+
+    }
+
+    @Parameters(separators = "=")
+    static class DeployAppArgs {
+
+        @Parameter(names = "-gradle", description = "path to gradle/gradlew executable", required = true)
+        String gradleExecPath;
+
+        @Parameter(names = "-buildFile", description = "path to gradle build file for the S4 application", required = true)
+        String gradleBuildFilePath;
+
+        @Parameter(names = "-appName", description = "name of S4 application", required = true)
+        String appName;
+
+        @Parameter(names = "-cluster", description = "logical name of the S4 cluster", required = true)
+        String clusterName;
+
+        @Parameter(names = "-zk", description = "zookeeper connection string")
+        String zkConnectionString = "localhost:2181";
+
+    }
+
+    static class ExecGradle {
+
+        public static void exec(String gradlewExecPath, String buildFilePath, String taskName, String[] params)
+                throws Exception {
+            List<String> cmdList = new ArrayList<String>();
+            cmdList.add(gradlewExecPath);
+            // cmdList.add("-c");
+            // cmdList.add(gradlewFile.getParentFile().getAbsolutePath() + "/settings.gradle");
+            cmdList.add("-b");
+            cmdList.add(buildFilePath);
+            cmdList.add(taskName);
+            if (params.length > 0) {
+                for (int i = 0; i < params.length; i++) {
+                    cmdList.add("-P" + params[i]);
+                }
+            }
+
+            System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
+            ProcessBuilder pb = new ProcessBuilder(cmdList);
+
+            pb.directory(new File(buildFilePath).getParentFile());
+            pb.redirectErrorStream();
+
+            final Process process = pb.start();
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
+                    String line;
+                    try {
+                        line = br.readLine();
+                        while (line != null) {
+                            System.out.println(line);
+                            line = br.readLine();
+                        }
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }).start();
+            process.waitFor();
+
+            // try {
+            // int exitValue = process.exitValue();
+            // Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
+            // } catch (IllegalThreadStateException ignored) {
+            // }
+
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
new file mode 100644
index 0000000..3a33219
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
@@ -0,0 +1,55 @@
+package org.apache.s4.fixtures;
+
+import org.apache.s4.comm.tools.TaskSetup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class ZKServer {
+
+    private static Logger logger = LoggerFactory.getLogger(ZKServer.class);
+
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+        ZKServerArgs clusterArgs = new ZKServerArgs();
+        JCommander jc = new JCommander(clusterArgs);
+        try {
+            jc.parse(args);
+        } catch (Exception e) {
+            jc.usage();
+            System.exit(-1);
+        }
+        try {
+
+            logger.info("Starting zookeeper server for cluster [{}] with [{}] node(s)", clusterArgs.clusterName,
+                    clusterArgs.nbTasks);
+            CommTestUtils.startZookeeperServer();
+            TaskSetup taskSetup = new TaskSetup(clusterArgs.zkConnectionString);
+            taskSetup.clean(clusterArgs.clusterName);
+            taskSetup.setup(clusterArgs.clusterName, clusterArgs.nbTasks);
+            logger.info("Zookeeper started");
+        } catch (Exception e) {
+            logger.error("Cannot initialize zookeeper with specified configuration", e);
+        }
+
+    }
+
+    @Parameters(separators = "=", commandDescription = "Start Zookeeper server and initialize S4 cluster configuration in Zookeeper (and clean previous one with same cluster name)")
+    static class ZKServerArgs {
+
+        @Parameter(names = "cluster", description = "S4 cluster name", required = true)
+        String clusterName = "s4-test-cluster";
+
+        @Parameter(names = "nbTasks", description = "number of tasks for the cluster", required = true)
+        int nbTasks = 1;
+
+        @Parameter(names = "zk", description = "Zookeeper connection string")
+        String zkConnectionString = "localhost:2181";
+    }
+
+}