You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/15 16:06:01 UTC
[21/22] git commit: inter-app communications
inter-app communications
- + deployment facilities
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/17c5ab80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/17c5ab80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/17c5ab80
Branch: refs/heads/piper
Commit: 17c5ab80eb1d510defe795021479d1ca0cd4c190
Parents: 403162e
Author: Matthieu Morel <mm...@apache.org>
Authored: Sat Mar 24 14:25:51 2012 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Mar 29 13:22:42 2012 +0200
----------------------------------------------------------------------
.../src/main/java/org/apache/s4/base/Event.java | 11 +-
.../org/apache/s4/comm/serialize/KryoSerDeser.java | 37 ++--
.../java/org/apache/s4/comm/tools/TaskSetup.java | 2 +-
.../src/main/java/org/apache/s4/core/App.java | 22 +-
.../main/java/org/apache/s4/core/CustomModule.java | 29 +--
.../main/java/org/apache/s4/core/EventSource.java | 8 +-
.../src/main/java/org/apache/s4/core/Main.java | 69 ++++--
.../src/main/java/org/apache/s4/core/Server.java | 159 +++++++++++----
.../src/main/java/org/apache/s4/core/Stream.java | 11 +-
.../main/java/org/apache/s4/core/Streamable.java | 5 +
.../s4/deploy/DistributedDeploymentManager.java | 4 +-
.../java/org/apache/s4/deploy/util/DeployApp.java | 151 ++++++++++++++
.../test/java/org/apache/s4/fixtures/ZKServer.java | 55 +++++
13 files changed, 443 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
index 0a71e29..2e023cb 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
@@ -1,17 +1,17 @@
/*
* Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific
* language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
+ * License. See accompanying LICENSE file.
*/
package org.apache.s4.base;
@@ -167,7 +167,10 @@ public class Event {
}
/* Helper data object. */
- private class Data<T> {
+ private static class Data<T> {
+
+ Data() {
+ }
private T value;
private Class<T> type;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
index 96d312e..fa66340 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
@@ -1,20 +1,20 @@
package org.apache.s4.comm.serialize;
-
import java.nio.ByteBuffer;
import org.apache.s4.base.SerializerDeserializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.ObjectBuffer;
+import com.esotericsoftware.kryo.serialize.ClassSerializer;
import com.esotericsoftware.kryo.serialize.SimpleSerializer;
public class KryoSerDeser implements SerializerDeserializer {
private Kryo kryo = new Kryo();
-
+
private int initialBufferSize = 2048;
- private int maxBufferSize = 256*1024;
+ private int maxBufferSize = 256 * 1024;
public void setInitialBufferSize(int initialBufferSize) {
this.initialBufferSize = initialBufferSize;
@@ -27,22 +27,21 @@ public class KryoSerDeser implements SerializerDeserializer {
public KryoSerDeser() {
kryo.setRegistrationOptional(true);
+ kryo.register(Class.class, new ClassSerializer(kryo));
// UUIDs don't have a no-arg constructor.
- kryo.register(java.util.UUID.class,
- new SimpleSerializer<java.util.UUID>() {
- @Override
- public java.util.UUID read(ByteBuffer buf) {
- return new java.util.UUID(buf.getLong(),
- buf.getLong());
- }
-
- @Override
- public void write(ByteBuffer buf, java.util.UUID uuid) {
- buf.putLong(uuid.getMostSignificantBits());
- buf.putLong(uuid.getLeastSignificantBits());
- }
-
- });
+ kryo.register(java.util.UUID.class, new SimpleSerializer<java.util.UUID>() {
+ @Override
+ public java.util.UUID read(ByteBuffer buf) {
+ return new java.util.UUID(buf.getLong(), buf.getLong());
+ }
+
+ @Override
+ public void write(ByteBuffer buf, java.util.UUID uuid) {
+ buf.putLong(uuid.getMostSignificantBits());
+ buf.putLong(uuid.getLeastSignificantBits());
+ }
+
+ });
}
@Override
@@ -56,4 +55,4 @@ public class KryoSerDeser implements SerializerDeserializer {
ObjectBuffer buffer = new ObjectBuffer(kryo, initialBufferSize, maxBufferSize);
return buffer.writeClassAndObject(message);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
index ab14449..193daff 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
@@ -25,7 +25,7 @@ public class TaskSetup {
public void setup(String clusterName, int tasks) {
zkclient.createPersistent("/" + clusterName + "/tasks", true);
zkclient.createPersistent("/" + clusterName + "/process", true);
- zkclient.createPersistent("/" + clusterName, true);
+ zkclient.createPersistent("/" + clusterName + "/apps", true);
for (int i = 0; i < tasks; i++) {
String taskId = "Task-" + i;
ZNRecord record = new ZNRecord(taskId);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 6249306..0939d8e 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -41,7 +41,7 @@ public abstract class App {
final private List<ProcessingElement> pePrototypes = new ArrayList<ProcessingElement>();
/* All the internal streams in this app. */
- final private List<Streamable> streams = new ArrayList<Streamable>();
+ final private List<Streamable<Event>> streams = new ArrayList<Streamable<Event>>();
/* All the the event sources exported by this app. */
final private List<EventSource> eventSources = new ArrayList<EventSource>();
@@ -107,25 +107,27 @@ public abstract class App {
}
/* Returns list of internal streams. Should only be used within the core package. */
- List<Streamable> getStreams() {
+ // TODO visibility
+ public List<Streamable<Event>> getStreams() {
return streams;
}
/* Returns list of the event sources to be exported. Should only be used within the core package. */
- List<EventSource> getEventSources() {
+ // TODO visibility
+ public List<EventSource> getEventSources() {
return eventSources;
}
protected abstract void onStart();
- protected void start() {
+ public final void start() {
// logger.info("Prepare to start App [{}].", getClass().getName());
//
- // /* Start all streams. */
- // for (Streamable<? extends Event> stream : getStreams()) {
- // stream.start();
- // }
+ /* Start all streams. */
+ for (Streamable<? extends Event> stream : getStreams()) {
+ stream.start();
+ }
//
// /* Allow abstract PE to initialize. */
// for (ProcessingElement pe : getPePrototypes()) {
@@ -138,14 +140,14 @@ public abstract class App {
protected abstract void onInit();
- protected void init() {
+ public final void init() {
onInit();
}
protected abstract void onClose();
- protected void close() {
+ public final void close() {
onClose();
removeAll();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/main/java/org/apache/s4/core/CustomModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/CustomModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/CustomModule.java
index a7fc7d8..17529d3 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/CustomModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/CustomModule.java
@@ -1,8 +1,5 @@
package org.apache.s4.core;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
@@ -37,11 +34,11 @@ import com.google.inject.name.Names;
*/
public class CustomModule extends AbstractModule {
- File configFile;
+ InputStream configFileInputStream;
private PropertiesConfiguration config;
- public CustomModule(File customConfigFile) {
- this.configFile = customConfigFile;
+ public CustomModule(InputStream configFileInputStream) {
+ this.configFileInputStream = configFileInputStream;
}
@Override
@@ -49,6 +46,12 @@ public class CustomModule extends AbstractModule {
if (config == null) {
loadProperties(binder());
}
+ if (configFileInputStream != null) {
+ try {
+ configFileInputStream.close();
+ } catch (IOException ignored) {
+ }
+ }
int numHosts = config.getList("cluster.hosts").size();
boolean isCluster = numHosts > 1 ? true : false;
@@ -73,11 +76,9 @@ public class CustomModule extends AbstractModule {
}
private void loadProperties(Binder binder) {
- InputStream is = null;
try {
- is = new FileInputStream(configFile);
config = new PropertiesConfiguration();
- config.load(is);
+ config.load(configFileInputStream);
System.out.println(ConfigurationUtils.toString(config));
// TODO - validate properties.
@@ -87,16 +88,6 @@ public class CustomModule extends AbstractModule {
} catch (ConfigurationException e) {
binder.addError(e);
e.printStackTrace();
- } catch (FileNotFoundException e) {
- binder.addError(e);
- e.printStackTrace();
- } finally {
- if (is != null) {
- try {
- is.close();
- } catch (IOException ignored) {
- }
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
index edd64ef..1f89d86 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/EventSource.java
@@ -53,7 +53,7 @@ public class EventSource implements Streamable {
*/
@Override
public void put(Event event) {
- for (Streamable stream : streamables) {
+ for (Streamable<Event> stream : streamables) {
stream.put(event);
}
}
@@ -91,4 +91,10 @@ public class EventSource implements Streamable {
public Set<Streamable> getStreamables() {
return streamables;
}
+
+ @Override
+ public void start() {
+ // TODO Auto-generated method stub
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index ed4c0ce..a55511c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -4,13 +4,13 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
+import java.net.URL;
import java.util.Arrays;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.io.Resources;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -42,17 +42,10 @@ public class Main {
}
}
- private static void startCustomS4Node(String s4PropertiesFilePath) {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ private static void startCustomS4Node(String s4PropertiesFilePath) throws FileNotFoundException {
// TODO that's quite inconvenient anyway: we still need to specify the comm module in the config
// file passed as a parameter...
- Injector injector = Guice.createInjector(new CustomModule(new File(s4PropertiesFilePath)));
-
+ Injector injector = Guice.createInjector(new CustomModule(new FileInputStream(new File(s4PropertiesFilePath))));
startServer(logger, injector);
}
@@ -83,7 +76,7 @@ public class Main {
*
*/
private static void startDevelopmentMode(String[] args) {
- if (args.length != 2) {
+ if (args.length < 2 && args.length > 3) {
usageForDevelopmentMode(args);
}
logger.info("Starting S4 app with module [{}] and app [{}]", args[0], args[1]);
@@ -93,15 +86,41 @@ public class Main {
logger.error("Module class [{}] is not an instance of [{}]", args[0], AbstractModule.class.getName());
System.exit(-1);
}
- injector = Guice.createInjector((AbstractModule) Class.forName(args[0]).newInstance());
- } catch (InstantiationException e) {
- logger.error("Invalid app class [{}] : {}", args[0], e.getMessage());
- System.exit(-1);
- } catch (IllegalAccessException e) {
- logger.error("Invalid app class [{}] : {}", args[0], e.getMessage());
- System.exit(-1);
+ if (args.length == 3) {
+ if (!(new File(args[2]).exists())) {
+ logger.error("Cannot find S4 config file {}", args[2]);
+ System.exit(-1);
+ }
+ try {
+ injector = Guice.createInjector((AbstractModule) Class.forName(args[0])
+ .getConstructor(InputStream.class).newInstance(new FileInputStream(new File(args[2]))));
+ } catch (Exception e) {
+ logger.error("Module loading error", e);
+ System.exit(-1);
+ }
+ } else {
+ URL defaultS4Config = null;
+ try {
+ defaultS4Config = Resources.getResource("/default.s4.properties");
+ } catch (IllegalArgumentException e) {
+ logger.error(
+ "Module loading error: cannot load default s4 configuration file default.s4.properties from classpath",
+ e);
+ System.exit(-1);
+ }
+
+ try {
+ injector = Guice.createInjector((AbstractModule) Class.forName(args[0]).getConstructor(File.class)
+ .newInstance(Resources.newInputStreamSupplier(defaultS4Config).getInput()));
+ } catch (Exception e) {
+ logger.error(
+ "Module loading error: cannot load default s4 configuration file default.s4.properties from classpath",
+ e);
+ System.exit(-1);
+ }
+ }
} catch (ClassNotFoundException e) {
- logger.error("Invalid app class [{}] : {}", args[0], e.getMessage());
+ logger.error("Invalid module class [{}]", args[0], e);
System.exit(-1);
}
App app;
@@ -110,13 +129,17 @@ public class Main {
app.init();
app.start();
} catch (ClassNotFoundException e) {
- logger.error("Invalid S4 application class [{}] : {}", args[0], e.getMessage());
+ logger.error("Invalid S4 application class [{}] : {}", args[1], e.getMessage());
}
}
static void usageForDevelopmentMode(String[] args) {
- logger.info("Invalid parameters " + Arrays.toString(args)
- + " \nUsage: java <classpath+params> org.apache.s4.core.Main <appClassName> <moduleClassName>");
+ logger.info("Invalid parameters "
+ + Arrays.toString(args)
+ + " \nUsage: java <classpath+params> org.apache.s4.core.Main <moduleClassName> <appClassName>"
+ + "\n(this uses default.s4.properties from the classpath)"
+ + "\nor:"
+ + " java <classpath+params> org.apache.s4.core.Main <moduleClassName> <appClassName> <pathToConfigFile>");
System.exit(-1);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index 35328aa..dd80e51 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -2,19 +2,26 @@ package org.apache.s4.core;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.jar.Attributes.Name;
import java.util.jar.JarFile;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.apache.s4.base.Event;
import org.apache.s4.base.util.S4RLoader;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
import org.apache.s4.deploy.DeploymentManager;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.Level;
+import com.google.common.collect.Maps;
import com.google.common.io.PatternFilenameFilter;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
@@ -35,7 +42,9 @@ public class Server {
public static final String MANIFEST_S4_APP_CLASS = "S4-App-Class";
// local applications directory
private final String appsDir;
- List<App> apps = new ArrayList<App>();
+ Map<String, App> apps = Maps.newHashMap();
+ Map<String, Streamable> streams = Maps.newHashMap();
+ Map<String, EventSource> eventSources = Maps.newHashMap();
CountDownLatch signalOneAppLoaded = new CountDownLatch(1);
private Injector injector;
@@ -43,15 +52,60 @@ public class Server {
@Inject
private DeploymentManager deploymentManager;
+ private String clusterName;
+
+ private ZkClient zkClient;
+
/**
- *
+ *
*/
@Inject
public Server(@Named("comm.module") String commModuleName, @Named("s4.logger_level") String logLevel,
- @Named("appsDir") String appsDir) {
+ @Named("appsDir") String appsDir, @Named("cluster.name") String clusterName,
+ @Named("cluster.zk_address") String zookeeperAddress,
+ @Named("cluster.zk_session_timeout") int sessionTimeout,
+ @Named("cluster.zk_connection_timeout") int connectionTimeout) {
this.commModuleName = commModuleName;
this.logLevel = logLevel;
this.appsDir = appsDir;
+ this.clusterName = clusterName;
+
+ zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ initializeZkStreams(clusterName);
+ watchZkStreams();
+ }
+
+ private void watchZkStreams() {
+ zkClient.subscribeChildChanges("/" + clusterName + "/streams/producers", new IZkChildListener() {
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+ // synchronized (streams) {
+ // for (String child : currentChilds) {
+ // if (apps.containsKey(child)) {
+ // eventSources.put(paramK, paramV)
+ // }
+ // }
+ // }
+
+ }
+ });
+
+ }
+
+ private void initializeZkStreams(String clusterName) {
+ try {
+ zkClient.createPersistent("/" + clusterName + "/streams");
+ zkClient.createPersistent("/" + clusterName + "/streams/producers");
+ zkClient.createPersistent("/" + clusterName + "/streams/consumers");
+ } catch (ZkException e) {
+ if (e.getCause() instanceof KeeperException.NodeExistsException) {
+ // ignore: this stream already exists
+ } else {
+ throw e;
+ }
+ }
}
public void start() throws Exception {
@@ -72,7 +126,6 @@ public class Server {
/* After some indirection we get the injector. */
injector = Guice.createInjector(module);
- Thread.sleep(10000);
File[] s4rFiles = new File(appsDir).listFiles(new PatternFilenameFilter("\\w+\\.s4r"));
for (File s4rFile : s4rFiles) {
@@ -80,39 +133,19 @@ public class Server {
}
/* Now init + start apps. TODO: implement dynamic loading/unloading using ZK. */
- for (App app : apps) {
- logger.info("Starting app " + app.getClass().getName());
- startApp(app);
+ for (Map.Entry<String, App> appEntry : apps.entrySet()) {
+ logger.info("Initializing app " + appEntry.getValue().getClass().getName());
+ appEntry.getValue().init();
}
- EventSource savedES = null;
- App consumerApp = null;
- for (App app : apps) {
- logger.info("Resolving dependencies for " + app.getClass().getName());
- List<EventSource> eventSources = app.getEventSources();
- if (eventSources.size() > 0) {
- EventSource es = eventSources.get(0);
- logger.info("App [{}] exports event source [{}].", app.getClass().getName(), es.getName());
- savedES = es; // hardcoded
- } else {
-
- // hardcoded (one app has event source the other one doesn't.
- consumerApp = app;
- }
+ for (Map.Entry<String, App> appEntry : apps.entrySet()) {
+ logger.info("Initializing app streams " + appEntry.getValue().getClass().getName());
+ updateStreams(appEntry.getValue(), appEntry.getKey());
}
- // hardcoded: make savedApp subscribe to savedES
- logger.info("The consumer app is [{}].", consumerApp.getClass().getName());
- // get the list of streams and find the one we are looking for that has name: "I need the time."
- List<Streamable> streams = consumerApp.getStreams();
- for (Streamable aStream : streams) {
-
- String streamName = aStream.getName();
-
- if (streamName.contentEquals("I need the time.")) {
- logger.info("Subscribing stream [{}] from app [{}] to event source.", streamName, consumerApp
- .getClass().getName());
- savedES.subscribeStream(aStream);
- }
+
+ for (Map.Entry<String, App> appEntry : apps.entrySet()) {
+ logger.info("Starting app " + appEntry.getKey() + "/" + appEntry.getValue().getClass().getName());
+ appEntry.getValue().start();
}
logger.info("Completed local applications startup.");
@@ -131,6 +164,54 @@ public class Server {
}
public App loadApp(File s4r) {
+ logger.info("Local app deployment: using s4r file name [{}] as application name",
+ s4r.getName().substring(0, s4r.getName().indexOf(".s4r")));
+ return loadApp(s4r, s4r.getName().substring(0, s4r.getName().indexOf(".s4r")));
+ }
+
+ public void updateStreams(App app, String appName) {
+ // register streams
+ List<Streamable<Event>> appStreams = app.getStreams();
+ for (Streamable<Event> streamable : appStreams) {
+ if (streams.containsKey(streamable.getName())) {
+ logger.error("Application {} defines the stream {} but there is already a stream with that name",
+ new String[] { appName, streamable.getName() });
+ } else {
+ // zkClient.createEphemeral("/" + clusterName + "/streams/producers/" + appName);
+ logger.debug("Adding stream {} for app {}");
+ streams.put(streamable.getName(), streamable);
+ if (eventSources.containsKey(streamable.getName())) {
+ logger.debug("Connecting matching event source for stream {} for app {}", streamable.getName(),
+ appName);
+ eventSources.get(streamable.getName()).subscribeStream(streams.get(streamable.getName()));
+ }
+ }
+ }
+
+ List<EventSource> appEventSources = app.getEventSources();
+ for (EventSource eventSource : appEventSources) {
+ if (eventSources.containsKey(eventSource.getName())) {
+ logger.error(
+ "Application {} defines the event source {} but there is already an event source with that name, from app {}",
+ new String[] { appName, eventSource.getName(),
+ String.valueOf(streams.get(eventSource.getName())) });
+ } else {
+ // zkClient.createEphemeral("/" + clusterName + "/streams/consumers/" + appName);
+ logger.debug("adding event source {} from app {}", eventSource.getName(), appName);
+ eventSources.put(eventSource.getName(), eventSource);
+ }
+ if (streams.containsKey(eventSource.getName())) {
+ logger.debug("Connecting matching stream from app {} to event source {}", appName,
+ eventSource.getName());
+ eventSource.subscribeStream(streams.get(eventSource.getName()));
+ }
+ }
+
+ }
+
+ public App loadApp(File s4r, String appName) {
+
+ // TODO handle application upgrade
Sender sender = injector.getInstance(Sender.class);
Receiver receiver = injector.getInstance(Receiver.class);
@@ -160,7 +241,7 @@ public class Server {
}
app.setCommLayer(sender, receiver);
- apps.add(app);
+ App previous = apps.put(appName, app);
logger.info("Loaded application from file {}", s4r.getAbsolutePath());
signalOneAppLoaded.countDown();
return app;
@@ -171,8 +252,12 @@ public class Server {
}
- public void startApp(App app) {
+ public void startApp(App app, String appName, String clusterName) {
+
app.init();
+
+ updateStreams(app, appName);
+
app.start();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index bd1d351..a328941 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -1,17 +1,17 @@
/*
* Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific
* language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
+ * License. See accompanying LICENSE file.
*/
package org.apache.s4.core;
@@ -43,7 +43,7 @@ public class Stream<T extends Event> implements Runnable, Streamable {
final private Key<T> key;
final private ProcessingElement[] targetPEs;
final private BlockingQueue<Event> queue = new ArrayBlockingQueue<Event>(CAPACITY);
- final private Thread thread;
+ private Thread thread;
final private Sender sender;
final private Receiver receiver;
final private int id;
@@ -78,6 +78,9 @@ public class Stream<T extends Event> implements Runnable, Streamable {
this.sender = app.getSender();
this.receiver = app.getReceiver();
this.targetPEs = processingElements;
+ }
+
+ public void start() {
/* Start streaming. */
thread = new Thread(this, name);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java
index ecbae5e..cfe54dd 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Streamable.java
@@ -10,6 +10,11 @@ import org.apache.s4.base.Event;
public interface Streamable<T extends Event> {
/**
+ * Starting the stream starts the associated dequeuing thread.
+ */
+ void start();
+
+ /**
* Put an event into the streams.
*
* @param event
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
index a152672..4f7ab86 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
@@ -119,10 +119,10 @@ public class DistributedDeploymentManager implements DeploymentManager {
App loaded = server.loadApp(s4rFile);
if (loaded != null) {
logger.info("Successfully installed application {}", newApp);
- server.startApp(loaded);
+ server.startApp(loaded, newApp, clusterName);
} else {
throw new DeploymentFailedException("Cannot deploy application [" + newApp + "] from URI ["
- + uri.toString() + "] : cannot load application");
+ + uri.toString() + "] : cannot start application");
}
// TODO sync with other nodes? (e.g. wait for other apps deployed before starting?
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
new file mode 100644
index 0000000..d8b77a4
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
@@ -0,0 +1,151 @@
+package org.apache.s4.deploy.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.deploy.DistributedDeploymentManager;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+
+public class DeployApp {
+
+ private static File tmpAppsDir;
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+
+ DeployAppArgs appArgs = new DeployAppArgs();
+ JCommander jc = new JCommander(appArgs);
+ // configure log4j for Zookeeper
+ BasicConfigurator.configure();
+ Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
+ Logger.getLogger("org.I0Itec").setLevel(Level.ERROR);
+
+ try {
+ jc.parse(args);
+ } catch (Exception e) {
+ jc.usage();
+ System.exit(-1);
+ }
+ try {
+ ZkClient zkClient = new ZkClient(appArgs.zkConnectionString);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ tmpAppsDir = Files.createTempDir();
+
+ // File gradlewFile = CoreTestUtils.findGradlewInRootDir();
+
+ // CoreTestUtils.callGradleTask(gradlewFile, new File(appArgs.gradleBuildFilePath), "installS4R",
+ // new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
+ ExecGradle.exec(appArgs.gradleExecPath, appArgs.gradleBuildFilePath, "installS4R",
+ new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
+
+ File s4rToDeploy = File.createTempFile("testapp" + System.currentTimeMillis(), "s4r");
+
+ Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath() + "/"
+ + appArgs.appName + ".s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
+
+ final String uri = s4rToDeploy.toURI().toString();
+ ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
+ record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
+ zkClient.create("/" + appArgs.clusterName + "/apps/" + appArgs.appName, record, CreateMode.PERSISTENT);
+
+ } catch (Exception e) {
+ LoggerFactory.getLogger(DeployApp.class).error("Cannot deploy app", e);
+ }
+
+ }
+
+ @Parameters(separators = "=")
+ static class DeployAppArgs {
+
+ @Parameter(names = "-gradle", description = "path to gradle/gradlew executable", required = true)
+ String gradleExecPath;
+
+ @Parameter(names = "-buildFile", description = "path to gradle build file for the S4 application", required = true)
+ String gradleBuildFilePath;
+
+ @Parameter(names = "-appName", description = "name of S4 application", required = true)
+ String appName;
+
+ @Parameter(names = "-cluster", description = "logical name of the S4 cluster", required = true)
+ String clusterName;
+
+ @Parameter(names = "-zk", description = "zookeeper connection string")
+ String zkConnectionString = "localhost:2181";
+
+ }
+
+ static class ExecGradle {
+
+ public static void exec(String gradlewExecPath, String buildFilePath, String taskName, String[] params)
+ throws Exception {
+ List<String> cmdList = new ArrayList<String>();
+ cmdList.add(gradlewExecPath);
+ // cmdList.add("-c");
+ // cmdList.add(gradlewFile.getParentFile().getAbsolutePath() + "/settings.gradle");
+ cmdList.add("-b");
+ cmdList.add(buildFilePath);
+ cmdList.add(taskName);
+ if (params.length > 0) {
+ for (int i = 0; i < params.length; i++) {
+ cmdList.add("-P" + params[i]);
+ }
+ }
+
+ System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
+ ProcessBuilder pb = new ProcessBuilder(cmdList);
+
+ pb.directory(new File(buildFilePath).getParentFile());
+ pb.redirectErrorStream();
+
+ final Process process = pb.start();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ String line;
+ try {
+ line = br.readLine();
+ while (line != null) {
+ System.out.println(line);
+ line = br.readLine();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }).start();
+ process.waitFor();
+
+ // try {
+ // int exitValue = process.exitValue();
+ // Assert.fail("forked process failed to start correctly. Exit code is [" + exitValue + "]");
+ // } catch (IllegalThreadStateException ignored) {
+ // }
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/17c5ab80/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
new file mode 100644
index 0000000..3a33219
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
@@ -0,0 +1,55 @@
+package org.apache.s4.fixtures;
+
+import org.apache.s4.comm.tools.TaskSetup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class ZKServer {
+
+ private static Logger logger = LoggerFactory.getLogger(ZKServer.class);
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ ZKServerArgs clusterArgs = new ZKServerArgs();
+ JCommander jc = new JCommander(clusterArgs);
+ try {
+ jc.parse(args);
+ } catch (Exception e) {
+ jc.usage();
+ System.exit(-1);
+ }
+ try {
+
+ logger.info("Starting zookeeper server for cluster [{}] with [{}] node(s)", clusterArgs.clusterName,
+ clusterArgs.nbTasks);
+ CommTestUtils.startZookeeperServer();
+ TaskSetup taskSetup = new TaskSetup(clusterArgs.zkConnectionString);
+ taskSetup.clean(clusterArgs.clusterName);
+ taskSetup.setup(clusterArgs.clusterName, clusterArgs.nbTasks);
+ logger.info("Zookeeper started");
+ } catch (Exception e) {
+ logger.error("Cannot initialize zookeeper with specified configuration", e);
+ }
+
+ }
+
+ @Parameters(separators = "=", commandDescription = "Start Zookeeper server and initialize S4 cluster configuration in Zookeeper (and clean previous one with same cluster name)")
+ static class ZKServerArgs {
+
+ @Parameter(names = "cluster", description = "S4 cluster name", required = true)
+ String clusterName = "s4-test-cluster";
+
+ @Parameter(names = "nbTasks", description = "number of tasks for the cluster", required = true)
+ int nbTasks = 1;
+
+ @Parameter(names = "zk", description = "Zookeeper connection string")
+ String zkConnectionString = "localhost:2181";
+ }
+
+}