You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2013/01/17 15:49:23 UTC
[2/2] git commit: Improve application and platform configuration
Updated Branches:
refs/heads/S4-59 [created] d3b7c30ab
Improve application and platform configuration
- we now have 3 layers:
1/ base layer. It simply registers the node in the logical cluster,
and monitors application configuration for the cluster
2/ platform layer. Based on the application configuration, it loads custom defined
modules, fetching them if necessary, then starts a Server with distributed app loader,
or starts an app directly
3/ app layer. Loads app code, fetching it remotely if necessary, and starts the app
- these layers use a hierarchy of classloaders (loading from modules jars and app S4R)
and of Guice injectors
- also fixed -clean parameter in zkServer tool (with arity of 0, specifying -clean means
cleaning, not specifying it means not cleaning)
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/d3b7c30a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/d3b7c30a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/d3b7c30a
Branch: refs/heads/S4-59
Commit: d3b7c30ab5999fc55f3d4c1b4cd92e36faa09f14
Parents: d9e1599
Author: Matthieu Morel <mm...@apache.org>
Authored: Tue Jan 15 20:18:05 2013 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Jan 17 16:43:49 2013 +0100
----------------------------------------------------------------------
subprojects/s4-base/s4-base.gradle | 2 +-
.../org/apache/s4/base/util/ModulesLoader.java | 16 +
.../org/apache/s4/base/util/S4RLoaderFactory.java | 4 +-
.../java/org/apache/s4/comm/DefaultCommModule.java | 28 +--
.../org/apache/s4/comm/ModulesLoaderFactory.java | 112 +++++++
.../apache/s4/comm/util/ArchiveFetchException.java | 14 +
.../org/apache/s4/comm/util/ArchiveFetcher.java | 42 +++
.../s4/comm/util/FileSystemArchiveFetcher.java | 41 +++
.../apache/s4/comm/util/HttpArchiveFetcher.java | 187 ++++++++++++
.../org/apache/s4/comm/util/RemoteFileFetcher.java | 23 ++
.../src/main/resources/default.s4.comm.properties | 3 -
.../java/org/apache/s4/comm/tcp/TCPCommTest.java | 7 +-
.../java/org/apache/s4/comm/udp/UDPCommTest.java | 6 +-
.../org/apache/s4/fixtures/TestCommModule.java | 38 +++
.../main/java/org/apache/s4/core/BaseModule.java | 80 +++++
.../java/org/apache/s4/core/DefaultCoreModule.java | 15 +
.../src/main/java/org/apache/s4/core/Main.java | 224 --------------
.../main/java/org/apache/s4/core/S4Bootstrap.java | 228 +++++++++++++++
.../src/main/java/org/apache/s4/core/S4Node.java | 79 +++++
.../src/main/java/org/apache/s4/core/Server.java | 2 +-
.../s4/core/ft/DefaultFileSystemStateStorage.java | 22 +--
.../ft/FileSystemBackendCheckpointingModule.java | 30 ++-
.../java/org/apache/s4/core/util/AppConfig.java | 159 ++++++++++
.../s4/core/util/ParametersInjectionModule.java | 3 +-
.../java/org/apache/s4/deploy/DeploymentUtils.java | 38 +++
.../s4/deploy/DistributedDeploymentManager.java | 63 +++--
.../org/apache/s4/deploy/FileSystemS4RFetcher.java | 41 ---
.../java/org/apache/s4/deploy/HttpS4RFetcher.java | 186 ------------
.../main/java/org/apache/s4/deploy/S4RFetcher.java | 42 ---
.../org/apache/s4/core/ft/CheckpointingTest.java | 4 +
.../org/apache/s4/core/ft/FTWordCountTest.java | 23 +-
...ndWithZKStorageCallbackCheckpointingModule.java | 8 +-
.../java/org/apache/s4/core/ft/RecoveryTest.java | 31 ++-
.../s4/core/moduleloader/TestModuleLoader.java | 131 +++++++++
.../core/moduleloader/TestModuleLoaderRemote.java | 12 +
.../apache/s4/deploy/TestAutomaticDeployment.java | 16 +-
.../s4/deploy/prodcon/TestProducerConsumer.java | 17 +-
.../java/org/apache/s4/fixtures/CoreTestUtils.java | 9 +-
.../org/apache/s4/wordcount/WordClassifierPE.java | 2 +-
.../org/apache/s4/wordcount/WordCountTest.java | 36 +--
.../src/test/java/org/apache/s4/edsl/TestEDSL.java | 9 +-
.../src/main/java/org/apache/s4/tools/Deploy.java | 74 ++++--
.../src/main/java/org/apache/s4/tools/Tools.java | 6 +-
.../main/java/org/apache/s4/tools/ZKServer.java | 2 +-
44 files changed, 1452 insertions(+), 663 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-base/s4-base.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/s4-base.gradle b/subprojects/s4-base/s4-base.gradle
index fffe198..e38ac28 100644
--- a/subprojects/s4-base/s4-base.gradle
+++ b/subprojects/s4-base/s4-base.gradle
@@ -19,5 +19,5 @@
description = 'Interfaces and most basic classes required by nultiple modules.'
-dependencies {
+dependencies {
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/ModulesLoader.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/ModulesLoader.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/ModulesLoader.java
new file mode 100644
index 0000000..6dab438
--- /dev/null
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/ModulesLoader.java
@@ -0,0 +1,16 @@
+package org.apache.s4.base.util;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+
+/**
+ * A classloader for loading module classes from a list of URLs, typically locally copied/extracted files.
+ *
+ */
+public class ModulesLoader extends URLClassLoader {
+
+ public ModulesLoader(URL[] urls) {
+ super(urls);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoaderFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoaderFactory.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoaderFactory.java
index d33c3bf..527c57f 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoaderFactory.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoaderFactory.java
@@ -48,7 +48,7 @@ public class S4RLoaderFactory {
private static Logger logger = LoggerFactory.getLogger(S4RLoaderFactory.class);
- @Inject(optional = true)
+ @Inject
@Named("s4.tmp.dir")
File tmpDir;
@@ -69,7 +69,7 @@ public class S4RLoaderFactory {
File s4rDir = null;
if (tmpDir == null) {
s4rDir = Files.createTempDir();
- s4rDir.deleteOnExit();
+ // s4rDir.deleteOnExit();
logger.warn(
"s4.tmp.dir not specified, using temporary directory [{}] for unpacking S4R. You may want to specify a parent non-temporary directory.",
s4rDir.getAbsolutePath());
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index d7c8cee..2fa3e0e 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -20,7 +20,6 @@ package org.apache.s4.comm;
import java.io.IOException;
import java.io.InputStream;
-import java.util.HashMap;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.ConfigurationException;
@@ -32,8 +31,6 @@ import org.apache.s4.base.RemoteEmitter;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.tcp.RemoteEmitters;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromZK;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.ClusterFromZK;
import org.apache.s4.comm.topology.Clusters;
@@ -55,7 +52,6 @@ public class DefaultCommModule extends AbstractModule {
private static Logger logger = LoggerFactory.getLogger(DefaultCommModule.class);
InputStream commConfigInputStream;
private PropertiesConfiguration config;
- String clusterName;
/**
*
@@ -65,10 +61,9 @@ public class DefaultCommModule extends AbstractModule {
* the name of the cluster to which the current node belongs. If specified in the configuration file,
* this parameter will be ignored.
*/
- public DefaultCommModule(InputStream commConfigInputStream, String clusterName) {
+ public DefaultCommModule(InputStream commConfigInputStream) {
super();
this.commConfigInputStream = commConfigInputStream;
- this.clusterName = clusterName;
}
@SuppressWarnings("unchecked")
@@ -89,10 +84,11 @@ public class DefaultCommModule extends AbstractModule {
/* Use Kryo to serialize events. */
bind(SerializerDeserializer.class).to(KryoSerDeser.class);
- // a node holds a single partition assignment
- // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
- bind(Assignment.class).to(AssignmentFromZK.class);
+ // // a node holds a single partition assignment
+ // // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
+ // bind(Assignment.class).to(AssignmentFromZK.class);
bind(Cluster.class).to(ClusterFromZK.class);
+ // bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
bind(Clusters.class).to(ClustersFromZK.class);
@@ -128,20 +124,6 @@ public class DefaultCommModule extends AbstractModule {
/* Make all properties injectable. Do we need this? */
Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
- if (clusterName != null) {
- if (config.containsKey("s4.cluster.name")) {
- logger.warn(
- "cluster [{}] passed as a parameter will not be used because an existing cluster.name parameter of value [{}] was found in the configuration file and will be used",
- clusterName, config.getProperty("s4.cluster.name"));
- } else {
- Names.bindProperties(binder, new HashMap<String, String>() {
- {
- put("s4.cluster.name", clusterName);
- }
- });
- }
- }
-
} catch (ConfigurationException e) {
binder.addError(e);
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ModulesLoaderFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ModulesLoaderFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ModulesLoaderFactory.java
new file mode 100644
index 0000000..2b89039
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/ModulesLoaderFactory.java
@@ -0,0 +1,112 @@
+package org.apache.s4.comm;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+
+import org.apache.s4.base.util.ModulesLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class ModulesLoaderFactory {
+
+ private static Logger logger = LoggerFactory.getLogger(ModulesLoaderFactory.class);
+
+ @Inject
+ @Named("s4.tmp.dir")
+ File tmpDir;
+
+ /**
+ * Explodes the jar archive in a subdirectory of a user specified directory through "s4.tmp.dir" parameter, and
+ * prepares a classloader that will load classes and resources from, first, the application classes, then the
+ * dependencies.
+ *
+ * Inspired from Hadoop's application classloading implementation (RunJar class).
+ *
+ * @param modulesJarPath
+ * path to s4r
+ * @return classloader that loads resources from the archive in a predefined order
+ */
+ public ModulesLoader createModulesLoader(Iterable<File> modulesFiles) {
+ List<URL> classpath = new ArrayList<URL>();
+ for (File moduleFile : modulesFiles) {
+ addModuleToClasspath(moduleFile, classpath);
+ }
+ return new ModulesLoader(classpath.toArray(new URL[] {}));
+
+ }
+
+ private void addModuleToClasspath(File moduleFile, List<URL> classpath) {
+
+ File moduleDir = null;
+ if (tmpDir == null) {
+ moduleDir = Files.createTempDir();
+ moduleDir.deleteOnExit();
+ logger.warn(
+ "s4.tmp.dir not specified, using temporary directory [{}] for unpacking S4R. You may want to specify a parent non-temporary directory.",
+ moduleDir.getAbsolutePath());
+ } else {
+ moduleDir = new File(tmpDir, moduleFile.getName() + "-" + System.currentTimeMillis());
+ if (!moduleDir.mkdir()) {
+ throw new RuntimeException("Cannot create directory for unzipping S4R file in ["
+ + moduleDir.getAbsolutePath() + "]. Aborting deployment.");
+ }
+ }
+ logger.info("Unzipping S4R archive in [{}]", moduleDir.getAbsolutePath());
+
+ JarFile jar = null;
+ try {
+ jar = new JarFile(moduleFile);
+ Enumeration<JarEntry> entries = jar.entries();
+ while (entries.hasMoreElements()) {
+ JarEntry entry = entries.nextElement();
+ if (!entry.isDirectory()) {
+ File to = new File(moduleDir, entry.getName());
+ Files.createParentDirs(to);
+ InputStream is = jar.getInputStream(entry);
+ OutputStream os = new FileOutputStream(to);
+ try {
+ ByteStreams.copy(is, os);
+ } finally {
+ Closeables.closeQuietly(is);
+ Closeables.closeQuietly(os);
+ }
+ }
+ }
+
+ classpath.add(moduleDir.toURI().toURL());
+ addDirLibsToClassPath(classpath, moduleDir, "/lib");
+
+ } catch (IOException e) {
+ logger.error("Cannot process S4R [{}]: {}", moduleFile.getAbsolutePath(),
+ e.getClass().getName() + "/" + e.getMessage());
+ throw new RuntimeException("Cannot create S4R classloader", e);
+ }
+ }
+
+ private void addDirLibsToClassPath(List<URL> classpath, File s4rDir, String dir) throws MalformedURLException {
+ File[] libs = new File(s4rDir, dir).listFiles();
+ if (libs != null) {
+ for (int i = 0; i < libs.length; i++) {
+ if (!libs[i].isDirectory()) {
+ classpath.add(libs[i].toURI().toURL());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetchException.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetchException.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetchException.java
new file mode 100644
index 0000000..8567cce
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetchException.java
@@ -0,0 +1,14 @@
+package org.apache.s4.comm.util;
+
+
+public class ArchiveFetchException extends Exception {
+
+ public ArchiveFetchException(String string) {
+ super(string);
+ }
+
+ public ArchiveFetchException(String string, Throwable throwable) {
+ super(string, throwable);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetcher.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetcher.java
new file mode 100644
index 0000000..dbd35ba
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/ArchiveFetcher.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.comm.util;
+
+import java.io.InputStream;
+import java.net.URI;
+
+/**
+ * This interface defines methods to fetch archive files from a URI (S4R or modules jars). Various protocols can be
+ * supported in the implementation classes (e.g. file system, HTTP etc...)
+ *
+ */
+public interface ArchiveFetcher {
+
+ /**
+ * Returns a stream to an archive file
+ *
+ * @param uri
+ * archive identifier
+ * @return an input stream for accessing the content of the archive file
+ * @throws ArchiveFetchException
+ * when fetching fails
+ */
+ InputStream fetch(URI uri) throws ArchiveFetchException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/FileSystemArchiveFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/FileSystemArchiveFetcher.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/FileSystemArchiveFetcher.java
new file mode 100644
index 0000000..294877f
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/FileSystemArchiveFetcher.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.comm.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.net.URI;
+
+/**
+ * Fetches modules jar files and application S4R files from a file system, possibly distributed.
+ *
+ */
+public class FileSystemArchiveFetcher implements ArchiveFetcher {
+
+ @Override
+ public InputStream fetch(URI uri) throws ArchiveFetchException {
+ try {
+ return new FileInputStream(new File(uri));
+ } catch (FileNotFoundException e) {
+ throw new ArchiveFetchException("Cannot retrieve file from uri [" + uri.toString() + "]");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java
new file mode 100644
index 0000000..4ee28d3
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/HttpArchiveFetcher.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.comm.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
+import org.jboss.netty.handler.codec.http.HttpChunk;
+import org.jboss.netty.handler.codec.http.HttpClientCodec;
+import org.jboss.netty.handler.codec.http.HttpContentDecompressor;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.ByteStreams;
+
+/**
+ * <p>
+ * Fetches modules and app archives through HTTP.
+ * </p>
+ * <p>
+ * The underlying implementation uses Netty, and borrows code from the Netty snoop example.</br>
+ *
+ * @see <a href="http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/package-summary.html">Netty
+ * snoop example</a>
+ *
+ * </p>
+ */
+public class HttpArchiveFetcher implements ArchiveFetcher {
+
+ private static Logger logger = LoggerFactory.getLogger(HttpArchiveFetcher.class);
+
+ @Override
+ public InputStream fetch(URI uri) throws ArchiveFetchException {
+ logger.debug("Fetching file through http: {}", uri.toString());
+
+ String host = uri.getHost();
+ int port = uri.getPort();
+ if (port == -1) {
+ if (uri.getScheme().equalsIgnoreCase("http")) {
+ port = 80;
+ } else if (uri.getScheme().equalsIgnoreCase("https")) {
+ port = 443;
+ }
+ }
+
+ ClientBootstrap clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
+ File tmpFile;
+ try {
+ tmpFile = File.createTempFile("http", "download");
+ } catch (IOException e) {
+ throw new ArchiveFetchException("Cannot create temporary file for fetching archive data from http server",
+ e);
+ }
+ clientBootstrap.setPipelineFactory(new HttpClientPipelineFactory(tmpFile));
+ ChannelFuture channelFuture = clientBootstrap.connect(new InetSocketAddress(host, port));
+ // TODO timeout?
+ Channel channel = channelFuture.awaitUninterruptibly().getChannel();
+ if (!channelFuture.isSuccess()) {
+ clientBootstrap.releaseExternalResources();
+ throw new ArchiveFetchException("Cannot connect to http uri [" + uri.toString() + "]",
+ channelFuture.getCause());
+ }
+
+ HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString());
+ request.setHeader(HttpHeaders.Names.HOST, host);
+ request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+ request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
+
+ channel.write(request);
+
+ channel.getCloseFuture().awaitUninterruptibly();
+
+ clientBootstrap.releaseExternalResources();
+
+ logger.debug("Finished downloading archive file through http {}, as file: {}", uri.toString(),
+ tmpFile.getAbsolutePath());
+ try {
+ return new FileInputStream(tmpFile);
+ } catch (FileNotFoundException e) {
+ throw new ArchiveFetchException("Cannot get input stream from temporary file with s4r data ["
+ + tmpFile.getAbsolutePath() + "]");
+ }
+ }
+
+ private class HttpClientPipelineFactory implements ChannelPipelineFactory {
+
+ File tmpFile;
+
+ public HttpClientPipelineFactory(File tmpFile) {
+ this.tmpFile = tmpFile;
+ }
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ // Create a default pipeline implementation.
+ ChannelPipeline pipeline = Channels.pipeline();
+
+ pipeline.addLast("codec", new HttpClientCodec());
+
+ // Remove the following line if you don't want automatic content decompression.
+ pipeline.addLast("inflater", new HttpContentDecompressor());
+
+ pipeline.addLast("handler", new HttpResponseHandler(tmpFile));
+ return pipeline;
+ }
+ }
+
+ // see http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/HttpResponseHandler.html
+ private class HttpResponseHandler extends SimpleChannelUpstreamHandler {
+
+ private boolean readingChunks;
+ FileOutputStream fos;
+
+ public HttpResponseHandler(File tmpFile) throws FileNotFoundException {
+ this.fos = new FileOutputStream(tmpFile);
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ if (!readingChunks) {
+ HttpResponse response = (HttpResponse) e.getMessage();
+
+ if (response.isChunked()) {
+ readingChunks = true;
+ } else {
+ copyContentToTmpFile(response.getContent());
+ }
+ } else {
+ HttpChunk chunk = (HttpChunk) e.getMessage();
+ if (chunk.isLast()) {
+ readingChunks = false;
+ fos.close();
+ } else {
+ copyContentToTmpFile(chunk.getContent());
+ }
+ }
+
+ }
+
+ private void copyContentToTmpFile(ChannelBuffer content) throws IOException, FileNotFoundException {
+ ChannelBufferInputStream cbis = new ChannelBufferInputStream(content);
+ ByteStreams.copy(cbis, fos);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/RemoteFileFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/RemoteFileFetcher.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/RemoteFileFetcher.java
new file mode 100644
index 0000000..f81b81b
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/RemoteFileFetcher.java
@@ -0,0 +1,23 @@
+package org.apache.s4.comm.util;
+
+import java.io.InputStream;
+import java.net.URI;
+
+/**
+ * Factory for remote file fetchers depending on the access protocol.
+ *
+ */
+public class RemoteFileFetcher implements ArchiveFetcher {
+
+ @Override
+ public InputStream fetch(URI uri) throws ArchiveFetchException {
+ String scheme = uri.getScheme();
+ if ("file".equalsIgnoreCase(scheme)) {
+ return new FileSystemArchiveFetcher().fetch(uri);
+ }
+ if ("http".equalsIgnoreCase(scheme) || "https".equalsIgnoreCase(scheme)) {
+ return new HttpArchiveFetcher().fetch(uri);
+ }
+ throw new ArchiveFetchException("Unsupported protocol " + scheme);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
index 36417b4..bef0894 100644
--- a/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
+++ b/subprojects/s4-comm/src/main/resources/default.s4.comm.properties
@@ -3,6 +3,3 @@ s4.comm.emitter.remote.class=org.apache.s4.comm.tcp.TCPRemoteEmitter
s4.comm.listener.class=org.apache.s4.comm.tcp.TCPListener
# I/O channel connection timeout, when applicable (e.g. used by netty)
s4.comm.timeout=1000
-s4.cluster.zk_address = localhost:2181
-s4.cluster.zk_session_timeout = 10000
-s4.cluster.zk_connection_timeout = 10000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
index c9153ce..1ffae1d 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java
@@ -20,9 +20,9 @@ package org.apache.s4.comm.tcp;
import java.io.IOException;
-import org.apache.s4.comm.DefaultCommModule;
import org.apache.s4.comm.DeliveryTestUtil;
import org.apache.s4.comm.util.ProtocolTestUtil;
+import org.apache.s4.fixtures.TestCommModule;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,10 +45,11 @@ public abstract class TCPCommTest extends ProtocolTestUtil {
super(numTasks);
}
+ @Override
public Injector newInjector() {
try {
- return Guice.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
- .openStream(), CLUSTER_NAME));
+ return Guice.createInjector(new TestCommModule(Resources.getResource("default.s4.comm.properties")
+ .openStream()));
} catch (IOException e) {
Assert.fail();
return null;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
index 0f6004f..543efec 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java
@@ -20,9 +20,9 @@ package org.apache.s4.comm.udp;
import java.io.IOException;
-import org.apache.s4.comm.DefaultCommModule;
import org.apache.s4.comm.DeliveryTestUtil;
import org.apache.s4.comm.util.ProtocolTestUtil;
+import org.apache.s4.fixtures.TestCommModule;
import org.junit.Assert;
import com.google.common.io.Resources;
@@ -44,8 +44,8 @@ public abstract class UDPCommTest extends ProtocolTestUtil {
@Override
protected Injector newInjector() throws IOException {
- return Guice.createInjector(new DefaultCommModule(Resources.getResource("udp.s4.comm.properties").openStream(),
- "cluster1"), new UDPCommTestModule());
+ return Guice.createInjector(new TestCommModule(Resources.getResource("udp.s4.comm.properties").openStream()),
+ new UDPCommTestModule());
}
class UDPCommTestModule extends AbstractModule {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
new file mode 100644
index 0000000..c1fb253
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
@@ -0,0 +1,38 @@
+package org.apache.s4.fixtures;
+
+import java.io.InputStream;
+
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.util.ArchiveFetcher;
+import org.apache.s4.comm.util.RemoteFileFetcher;
+
+import com.google.inject.name.Names;
+
+/**
+ * Binds dependencies that come for the base layer and are defined in {@link BaseModule} in s4-core.
+ *
+ * We need them injected for the tests to work, in particular for getting an assignment
+ *
+ *
+ */
+public class TestCommModule extends DefaultCommModule {
+
+ public TestCommModule(InputStream commConfigInputStream) {
+ super(commConfigInputStream);
+ }
+
+ @Override
+ protected void configure() {
+ super.configure();
+ bind(String.class).annotatedWith(Names.named("s4.cluster.name")).toInstance("cluster1");
+ bind(String.class).annotatedWith(Names.named("s4.cluster.zk_address")).toInstance("localhost:2181");
+ bind(Integer.class).annotatedWith(Names.named("s4.cluster.zk_session_timeout")).toInstance(10000);
+ bind(Integer.class).annotatedWith(Names.named("s4.cluster.zk_connection_timeout")).toInstance(10000);
+ bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
+ // bind(Cluster.class).to(ClusterFromZK.class);
+
+ bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
new file mode 100644
index 0000000..df2d8f1
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -0,0 +1,80 @@
+package org.apache.s4.core;
+
+import java.io.InputStream;
+import java.util.HashMap;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.util.ArchiveFetcher;
+import org.apache.s4.comm.util.RemoteFileFetcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+public class BaseModule extends AbstractModule {
+
+ private static Logger logger = LoggerFactory.getLogger(BaseModule.class);
+
+ private PropertiesConfiguration config;
+ InputStream baseConfigInputStream;
+ String clusterName;
+
+ public BaseModule(InputStream baseConfigInputStream, String clusterName) {
+ super();
+ this.baseConfigInputStream = baseConfigInputStream;
+ this.clusterName = clusterName;
+ }
+
+ @Override
+ protected void configure() {
+ if (config == null) {
+ loadProperties(binder());
+ }
+ // a node holds a single partition assignment
+ // ==> Assignment is a singleton so it shared between base, comm and app layers.
+ // it is eager so that the node is able to join a cluster immediately
+ bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
+ // bind(Cluster.class).to(ClusterFromZK.class);
+
+ bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
+ bind(S4Bootstrap.class);
+
+ }
+
+ @SuppressWarnings("serial")
+ private void loadProperties(Binder binder) {
+ try {
+ config = new PropertiesConfiguration();
+ config.load(baseConfigInputStream);
+
+ // TODO - validate properties.
+
+ /* Make all properties injectable. Do we need this? */
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+
+ if (clusterName != null) {
+ if (config.containsKey("s4.cluster.name")) {
+ logger.warn(
+ "cluster [{}] passed as a parameter will not be used because an existing cluster.name parameter of value [{}] was found in the configuration file and will be used",
+ clusterName, config.getProperty("s4.cluster.name"));
+ } else {
+ Names.bindProperties(binder, new HashMap<String, String>() {
+ {
+ put("s4.cluster.name", clusterName);
+ }
+ });
+ }
+ }
+
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 5701640..1f07bf6 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -18,6 +18,7 @@
package org.apache.s4.core;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -36,8 +37,11 @@ import org.apache.s4.deploy.DistributedDeploymentManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.io.Files;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
+import com.google.inject.Provides;
+import com.google.inject.name.Named;
import com.google.inject.name.Names;
/**
@@ -85,6 +89,17 @@ public class DefaultCoreModule extends AbstractModule {
bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
}
+ @Provides
+ @Named("s4.tmp.dir")
+ public File provideTmpDir() {
+ File tmpS4Dir = Files.createTempDir();
+ tmpS4Dir.deleteOnExit();
+ logger.warn(
+ "s4.tmp.dir not specified, using temporary directory [{}] for unpacking S4R. You may want to specify a parent non-temporary directory.",
+ tmpS4Dir.getAbsolutePath());
+ return tmpS4Dir;
+ }
+
private void loadProperties(Binder binder) {
try {
config = new PropertiesConfiguration();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
deleted file mode 100644
index fc85219..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.core;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.s4.comm.DefaultCommModule;
-import org.apache.s4.core.util.ParametersInjectionModule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.beust.jcommander.IStringConverter;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.google.common.io.Resources;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Module;
-import com.google.inject.util.Modules;
-import com.google.inject.util.Modules.OverriddenModuleBuilder;
-
-/**
- * Bootstrap class for S4. It creates an S4 node.
- *
- */
-public class Main {
-
- private static final Logger logger = LoggerFactory.getLogger(Main.class);
-
- /**
- * Starts an S4 server.
- *
- * @param args
- */
- public static void main(String[] args) {
-
- MainArgs mainArgs = new MainArgs();
- JCommander jc = new JCommander(mainArgs);
-
- try {
- jc.parse(args);
- } catch (Exception e) {
- JCommander.getConsole().println("Cannot parse arguments: " + e.getMessage());
- jc.usage();
- System.exit(1);
- }
-
- startNode(mainArgs);
- }
-
- private static void startNode(MainArgs mainArgs) {
- try {
- Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
-
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- logger.error("Uncaught exception in thread {}", t.getName(), e);
-
- }
- });
- Injector injector;
- InputStream commConfigFileInputStream;
- InputStream coreConfigFileInputStream;
- String commConfigString;
- if (mainArgs.commConfigFilePath == null) {
- commConfigFileInputStream = Resources.getResource("default.s4.comm.properties").openStream();
- commConfigString = "default.s4.comm.properties from classpath";
- } else {
- commConfigFileInputStream = new FileInputStream(new File(mainArgs.commConfigFilePath));
- commConfigString = mainArgs.commConfigFilePath;
- }
-
- String coreConfigString;
- if (mainArgs.coreConfigFilePath == null) {
- coreConfigFileInputStream = Resources.getResource("default.s4.core.properties").openStream();
- coreConfigString = "default.s4.core.properties from classpath";
- } else {
- coreConfigFileInputStream = new FileInputStream(new File(mainArgs.coreConfigFilePath));
- coreConfigString = mainArgs.coreConfigFilePath;
- }
-
- logger.info(
- "Initializing S4 node with : \n- comm module class [{}]\n- comm configuration file [{}]\n- core module class [{}]\n- core configuration file[{}]\n- extra modules: {}\n- inline parameters: {}",
- new String[] { mainArgs.commModuleClass, commConfigString, mainArgs.coreModuleClass,
- coreConfigString, Arrays.toString(mainArgs.extraModulesClasses.toArray(new String[] {})),
- Arrays.toString(mainArgs.extraNamedParameters.toArray(new String[] {})) });
-
- AbstractModule commModule = (AbstractModule) Class.forName(mainArgs.commModuleClass)
- .getConstructor(InputStream.class, String.class)
- .newInstance(commConfigFileInputStream, mainArgs.clusterName);
- AbstractModule coreModule = (AbstractModule) Class.forName(mainArgs.coreModuleClass)
- .getConstructor(InputStream.class).newInstance(coreConfigFileInputStream);
-
- List<com.google.inject.Module> extraModules = new ArrayList<com.google.inject.Module>();
- for (String moduleClass : mainArgs.extraModulesClasses) {
- extraModules.add((Module) Class.forName(moduleClass).newInstance());
- }
- Module combinedModule = Modules.combine(commModule, coreModule);
- if (extraModules.size() > 0) {
- OverriddenModuleBuilder overridenModuleBuilder = Modules.override(combinedModule);
- combinedModule = overridenModuleBuilder.with(extraModules);
- }
-
- if (mainArgs.zkConnectionString != null) {
- mainArgs.extraNamedParameters.add("s4.cluster.zk_address=" + mainArgs.zkConnectionString);
- }
-
- if (!mainArgs.extraNamedParameters.isEmpty()) {
- logger.debug("Adding named parameters for injection : {}",
- Arrays.toString(mainArgs.extraNamedParameters.toArray(new String[] {})));
- Map<String, String> namedParameters = new HashMap<String, String>();
-
- for (String namedParam : mainArgs.extraNamedParameters) {
- namedParameters.put(namedParam.split("[=]")[0].trim(),
- namedParam.substring(namedParam.indexOf('=') + 1).trim());
- }
- combinedModule = Modules.override(combinedModule).with(new ParametersInjectionModule(namedParameters));
- }
-
- injector = Guice.createInjector(combinedModule);
-
- if (mainArgs.appClass != null) {
- logger.info("Starting S4 node with single application from class [{}]", mainArgs.appClass);
- App app = (App) injector.getInstance(Class.forName(mainArgs.appClass));
- app.init();
- app.start();
- } else {
- logger.info("Starting S4 node. This node will automatically download applications published for the cluster it belongs to");
- Server server = injector.getInstance(Server.class);
- try {
- server.start(injector);
- } catch (Exception e) {
- logger.error("Failed to start the controller.", e);
- }
- }
- } catch (Exception e) {
- logger.error("Cannot start S4 node", e);
- System.exit(1);
- }
- }
-
- /**
- * Defines command parameters.
- *
- */
- @Parameters(separators = "=")
- public static class MainArgs {
-
- @Parameter(names = { "-c", "-cluster" }, description = "cluster name", required = true)
- String clusterName = null;
-
- @Parameter(names = "-commModuleClass", description = "configuration module class for the communication layer", required = false)
- String commModuleClass = DefaultCommModule.class.getName();
-
- @Parameter(names = "-commConfig", description = "s4 communication layer configuration file", required = false)
- String commConfigFilePath;
-
- @Parameter(names = "-coreModuleClass", description = "s4-core configuration module class", required = false)
- String coreModuleClass = DefaultCoreModule.class.getName();
-
- @Parameter(names = "-coreConfig", description = "s4 core configuration file", required = false)
- String coreConfigFilePath = null;
-
- @Parameter(names = "-appClass", description = "App class to load. This will disable dynamic downloading but allows to start apps directly. These app classes must have been loaded first, usually through a custom module.", required = false, hidden = true)
- String appClass = null;
-
- @Parameter(names = { "-extraModulesClasses", "-emc" }, description = "Comma-separated list of additional configuration modules (they will be instantiated through their constructor without arguments).", required = false, hidden = false)
- List<String> extraModulesClasses = new ArrayList<String>();
-
- @Parameter(names = { "-namedStringParameters", "-p" }, description = "Comma-separated list of inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter = InlineConfigParameterConverter.class)
- List<String> extraNamedParameters = new ArrayList<String>();
-
- @Parameter(names = "-zk", description = "Zookeeper connection string", required = false)
- String zkConnectionString;
-
- }
-
- /**
- * Parameters parsing utility.
- *
- */
- public static class InlineConfigParameterConverter implements IStringConverter<String> {
-
- @Override
- public String convert(String arg) {
- Pattern parameterPattern = Pattern.compile("(\\S+=\\S+)");
- logger.info("processing inline configuration parameter {}", arg);
- Matcher parameterMatcher = parameterPattern.matcher(arg);
- if (!parameterMatcher.find()) {
- throw new IllegalArgumentException("Cannot understand parameter " + arg);
- }
- return parameterMatcher.group(1);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
new file mode 100644
index 0000000..deb14ad
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Bootstrap.java
@@ -0,0 +1,228 @@
+package org.apache.s4.core;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.s4.base.util.ModulesLoader;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.ModulesLoaderFactory;
+import org.apache.s4.comm.topology.ZNRecord;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.comm.util.ArchiveFetchException;
+import org.apache.s4.comm.util.ArchiveFetcher;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.core.util.ParametersInjectionModule;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.google.common.io.Resources;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.name.Named;
+import com.google.inject.util.Modules;
+import com.google.inject.util.Modules.OverriddenModuleBuilder;
+
+/**
+ * This is the bootstrap for S4 nodes.
+ * <p>
+ * Its roles are to:
+ * <ul>
+ * <li>register within the S4 cluster (and acquire a partition).
+ * <li>look for application deployed on the S4 cluster
+ * </ul>
+ * <p>
+ * When an application is available, custom modules are fetched if necessary and a full-featured S4 node is started. The
+ * application code is then downloaded and the app started.
+ * <p>
+ * For testing purposes, it is also possible to directly start an application without fetching remote code, provided the
+ * application classes are available in the classpath.
+ *
+ *
+ *
+ */
+public class S4Bootstrap {
+ private static Logger logger = LoggerFactory.getLogger(S4Bootstrap.class);
+
+ private final ZkClient zkClient;
+ private final String appPath;
+ private final AtomicBoolean deployed = new AtomicBoolean(false);
+
+ private final ArchiveFetcher fetcher;
+
+ private Injector parentInjector;
+
+ CountDownLatch signalOneAppLoaded = new CountDownLatch(1);
+
+ @Inject
+ public S4Bootstrap(@Named("s4.cluster.name") String clusterName,
+ @Named("s4.cluster.zk_address") String zookeeperAddress,
+ @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ArchiveFetcher fetcher) {
+
+ this.fetcher = fetcher;
+ zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
+ ZkSerializer serializer = new ZNRecordSerializer();
+ zkClient.setZkSerializer(serializer);
+
+ String appDir = "/s4/clusters/" + clusterName + "/app";
+ if (!zkClient.exists(appDir)) {
+ zkClient.create(appDir, null, CreateMode.PERSISTENT);
+ }
+ appPath = appDir + "/s4App";
+ zkClient.subscribeDataChanges(appPath, new AppChangeListener());
+ }
+
+ public void start(Injector parentInjector) throws InterruptedException, ArchiveFetchException {
+ this.parentInjector = parentInjector;
+ if (zkClient.exists(appPath)) {
+ if (!deployed.get()) {
+ loadModulesAndStartNode(parentInjector);
+ }
+ }
+
+ signalOneAppLoaded.await();
+ }
+
+ private void loadModulesAndStartNode(final Injector parentInjector) throws ArchiveFetchException {
+
+ final ZNRecord appData = zkClient.readData(appPath);
+ // can be null
+ final AppConfig appConfig = new AppConfig(appData);
+
+ String appName = appData.getSimpleField("name");
+
+ List<File> modulesLocalCopies = new ArrayList<File>();
+
+ for (String uriString : appConfig.getCustomModulesURIs()) {
+ modulesLocalCopies.add(fetchModuleAndCopyToLocalFile(appName, uriString));
+ }
+ final ModulesLoader modulesLoader = new ModulesLoaderFactory().createModulesLoader(modulesLocalCopies);
+
+ Thread t = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ // load Main class through modules classloader and start it
+ S4Bootstrap.startS4App(appConfig, parentInjector, modulesLoader);
+ signalOneAppLoaded.countDown();
+ }
+ }, "S4 platform loader");
+ t.start();
+
+ }
+
+ private File fetchModuleAndCopyToLocalFile(String appName, String uriString) throws ArchiveFetchException {
+
+ URI uri;
+ try {
+ uri = new URI(uriString);
+ } catch (URISyntaxException e2) {
+ throw new ArchiveFetchException("Invalid module URI : [" + uriString + "]", e2);
+ }
+ File localModuleFileCopy;
+ try {
+ localModuleFileCopy = File.createTempFile("tmp", "module");
+ } catch (IOException e1) {
+ logger.error(
+ "Cannot deploy app [{}] because a local copy of the module file could not be initialized due to [{}]",
+ appName, e1.getClass().getName() + "->" + e1.getMessage());
+ throw new ArchiveFetchException("Cannot deploy application [" + appName + "]", e1);
+ }
+ localModuleFileCopy.deleteOnExit();
+ try {
+ if (ByteStreams.copy(fetcher.fetch(uri), Files.newOutputStreamSupplier(localModuleFileCopy)) == 0) {
+ throw new ArchiveFetchException("Cannot copy archive from [" + uri.toString() + "] to ["
+ + localModuleFileCopy.getAbsolutePath() + "] (nothing was copied)");
+ }
+ } catch (Exception e) {
+ throw new ArchiveFetchException("Cannot deploy application [" + appName + "] from URI [" + uri.toString()
+ + "] ", e);
+ }
+ return localModuleFileCopy;
+ }
+
+ public static void startS4App(AppConfig appConfig, Injector parentInjector, ClassLoader modulesLoader) {
+ try {
+ Injector injector;
+ InputStream commConfigFileInputStream = Resources.getResource("default.s4.comm.properties").openStream();
+ InputStream coreConfigFileInputStream = Resources.getResource("default.s4.core.properties").openStream();
+
+ logger.info("Initializing S4 app with : {}", appConfig.toString());
+
+ AbstractModule commModule = new DefaultCommModule(commConfigFileInputStream);
+ AbstractModule coreModule = new DefaultCoreModule(coreConfigFileInputStream);
+
+ List<com.google.inject.Module> extraModules = new ArrayList<com.google.inject.Module>();
+ for (String moduleClass : appConfig.getCustomModulesNames()) {
+ extraModules.add((Module) Class.forName(moduleClass, true, modulesLoader).newInstance());
+ }
+ Module combinedModule = Modules.combine(commModule, coreModule);
+ if (extraModules.size() > 0) {
+ OverriddenModuleBuilder overridenModuleBuilder = Modules.override(combinedModule);
+ combinedModule = overridenModuleBuilder.with(extraModules);
+ }
+
+ if (appConfig.getNamedParameters() != null && !appConfig.getNamedParameters().isEmpty()) {
+
+ logger.debug("Adding named parameters for injection : {}", appConfig.getNamedParametersAsString());
+ Map<String, String> namedParameters = new HashMap<String, String>();
+
+ namedParameters.putAll(appConfig.getNamedParameters());
+ combinedModule = Modules.override(combinedModule).with(new ParametersInjectionModule(namedParameters));
+ }
+
+ injector = parentInjector.createChildInjector(combinedModule);
+
+ if (appConfig.getAppClassName() != null && Strings.isNullOrEmpty(appConfig.getAppURI())) {
+ logger.info("Starting S4 app with application class [{}]", appConfig.getAppClassName());
+ App app = (App) injector.getInstance(Class.forName(appConfig.getAppClassName(), true, modulesLoader));
+ app.init();
+ app.start();
+ } else {
+ if (Strings.isNullOrEmpty(appConfig.getAppURI())) {
+ logger.info("S4 node in standby until app class or app URI is specified");
+ }
+ Server server = injector.getInstance(Server.class);
+ server.start(injector);
+ }
+ } catch (Exception e) {
+ logger.error("Cannot start S4 node", e);
+ System.exit(1);
+ }
+ }
+
+ class AppChangeListener implements IZkDataListener {
+
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception {
+ if (!deployed.get()) {
+ loadModulesAndStartNode(parentInjector);
+ deployed.set(true);
+ }
+
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception {
+ logger.error("Application undeployment is not supported yet");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
new file mode 100644
index 0000000..4bd929b
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
@@ -0,0 +1,79 @@
+package org.apache.s4.core;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+
+import org.apache.s4.comm.util.ArchiveFetchException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+
+/**
+ * Entry point for starting an S4 node. It parses arguments and injects an {@link S4Bootstrap} based on the
+ * {@link BaseModule} minimal configuration.
+ *
+ */
+public class S4Node {
+
+ private static Logger logger = LoggerFactory.getLogger(S4Node.class);
+
+ public static void main(String[] args) throws InterruptedException, IOException {
+ S4NodeArgs s4Args = new S4NodeArgs();
+ JCommander jc = new JCommander(s4Args);
+
+ try {
+ jc.parse(args);
+ } catch (Exception e) {
+ JCommander.getConsole().println("Cannot parse arguments: " + e.getMessage());
+ jc.usage();
+ System.exit(1);
+ }
+ startNode(s4Args);
+
+ }
+
+ private static void startNode(S4NodeArgs mainArgs) throws InterruptedException, IOException {
+ Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ logger.error("Uncaught exception in thread {}", t.getName(), e);
+
+ }
+ });
+
+ Injector injector = Guice.createInjector(new Module[] { new BaseModule(Resources.getResource(
+ "default.s4.base.properties").openStream(), mainArgs.clusterName) });
+ S4Bootstrap bootstrap = injector.getInstance(S4Bootstrap.class);
+ try {
+ bootstrap.start(injector);
+ } catch (ArchiveFetchException e1) {
+ logger.error("Cannot fetch module dependencies.", e1);
+ }
+ }
+
+ /**
+ * Defines command parameters.
+ *
+ */
+ @Parameters(separators = "=")
+ public static class S4NodeArgs {
+
+ @Parameter(names = { "-c", "-cluster" }, description = "Cluster name", required = true)
+ String clusterName = null;
+
+ @Parameter(names = "-baseConfig", description = "S4 base configuration file", required = false)
+ String baseConfigFilePath = null;
+
+ @Parameter(names = "-zk", description = "Zookeeper connection string", required = false)
+ String zkConnectionString;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index 02e521a..d94d714 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -59,7 +59,7 @@ public class Server {
@Inject
private AssignmentFromZK assignment;
- private ZkClient zkClient;
+ private final ZkClient zkClient;
/**
*
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java
index e5d4f9f..70a38cf 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java
@@ -45,12 +45,13 @@ import com.google.inject.name.Named;
* Checkpoints are stored in individual files (1 file = 1 checkpointId) in directories according to the following
* structure: <code>(storageRootpath)/prototypeId/checkpointId</code>
* </p>
- *
+ *
*/
public class DefaultFileSystemStateStorage implements StateStorage {
private static Logger logger = LoggerFactory.getLogger(DefaultFileSystemStateStorage.class);
- @Inject(optional = true)
+
+ @Inject
@Named("s4.checkpointing.filesystem.storageRootPath")
String storageRootPath;
@@ -64,7 +65,6 @@ public class DefaultFileSystemStateStorage implements StateStorage {
*/
@Inject
public void init() {
- checkStorageDir();
}
@Override
@@ -122,22 +122,6 @@ public class DefaultFileSystemStateStorage implements StateStorage {
return id;
}
- public void checkStorageDir() {
- if (storageRootPath == null) {
-
- File defaultStorageDir = new File(System.getProperty("user.dir") + File.separator + "tmp" + File.separator
- + "storage");
- storageRootPath = defaultStorageDir.getAbsolutePath();
- logger.warn("Unspecified storage dir; using default dir: {}", defaultStorageDir.getAbsolutePath());
- if (!defaultStorageDir.exists()) {
- if (!(defaultStorageDir.mkdirs())) {
- logger.error("Storage directory not specified, and cannot create default storage directory : "
- + defaultStorageDir.getAbsolutePath() + "\n Checkpointing and recovery will be disabled.");
- }
- }
- }
- }
-
@Override
public void saveState(CheckpointId key, byte[] state, StorageCallback callback) {
File f = checkpointID2File(key, storageRootPath);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
index ca23c79..5178c97 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
@@ -18,16 +18,44 @@
package org.apache.s4.core.ft;
+import java.io.File;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.name.Named;
/**
* Checkpointing module that uses the {@link DefaultFileSystemStateStorage} as a checkpointing backend.
- *
+ *
*/
public class FileSystemBackendCheckpointingModule extends AbstractModule {
+
+ private static Logger logger = LoggerFactory.getLogger(FileSystemBackendCheckpointingModule.class);
+
@Override
protected void configure() {
bind(StateStorage.class).to(DefaultFileSystemStateStorage.class);
bind(CheckpointingFramework.class).to(SafeKeeper.class);
}
+
+ @Provides
+ @Named("s4.checkpointing.filesystem.storageRootPath")
+ public String provideStorageRootPath() {
+ File defaultStorageDir = new File(System.getProperty("user.dir") + File.separator + "tmp" + File.separator
+ + "storage");
+ String storageRootPath = defaultStorageDir.getAbsolutePath();
+ logger.warn("Unspecified storage dir; using default dir: {}", defaultStorageDir.getAbsolutePath());
+ if (!defaultStorageDir.exists()) {
+ if (!(defaultStorageDir.mkdirs())) {
+ logger.error("Storage directory not specified, and cannot create default storage directory : "
+ + defaultStorageDir.getAbsolutePath() + "\n Checkpointing and recovery will be disabled.");
+ }
+ }
+ return storageRootPath;
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java
new file mode 100644
index 0000000..9bd09c7
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java
@@ -0,0 +1,159 @@
+package org.apache.s4.core.util;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.s4.comm.topology.ZNRecord;
+
+public class AppConfig {
+
+ public static final String NAMED_PARAMETERS = "namedParams";
+ public static final String APP_CLASS = "appClass";
+ public static final String APP_NAME = "appName";
+ public static final String APP_URI = "S4R_URI";
+ public static final String MODULES_CLASSES = "modulesClasses";
+ public static final String MODULES_URIS = "modulesURIs";
+
+ String appName;
+ String appClassName;
+ List<String> customModulesNames = Collections.emptyList();
+ List<String> customModulesURIs = Collections.emptyList();
+ Map<String, String> namedParameters = Collections.emptyMap();
+ String appURI;
+
+ private AppConfig() {
+ }
+
+ public AppConfig(ZNRecord znRecord) {
+ appName = znRecord.getSimpleField(APP_NAME);
+ appClassName = znRecord.getSimpleField(APP_CLASS);
+ appURI = znRecord.getSimpleField(APP_URI);
+ customModulesNames = znRecord.getListField(MODULES_CLASSES);
+ customModulesURIs = znRecord.getListField(MODULES_URIS);
+ namedParameters = znRecord.getMapField(NAMED_PARAMETERS);
+ }
+
+ public AppConfig(String appName, String appClassName, String appURI, List<String> customModulesNames,
+ List<String> customModulesURIs, Map<String, String> namedParameters) {
+ super();
+ this.appName = appName;
+ this.appClassName = appClassName;
+ this.appURI = appURI;
+ this.customModulesNames = customModulesNames;
+ this.customModulesURIs = customModulesURIs;
+ this.namedParameters = namedParameters;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public String getAppClassName() {
+ return appClassName;
+ }
+
+ public String getAppURI() {
+ return appURI;
+ }
+
+ public List<String> getCustomModulesNames() {
+ return customModulesNames;
+ }
+
+ public List<String> getCustomModulesURIs() {
+ return customModulesURIs;
+ }
+
+ public Map<String, String> getNamedParameters() {
+ return namedParameters;
+ }
+
+ public String getNamedParametersAsString() {
+ if (namedParameters == null || namedParameters.isEmpty()) {
+ return "";
+ }
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<String, String> param : namedParameters.entrySet()) {
+ sb.append(param.getKey() + "=" + param.getValue() + ",");
+ }
+ return sb.toString();
+ }
+
+ public ZNRecord asZNRecord(String id) {
+ ZNRecord record = new ZNRecord(id);
+ if (appClassName != null) {
+ record.putSimpleField(APP_CLASS, appClassName);
+ }
+ if (appName != null) {
+ record.putSimpleField(APP_NAME, appName);
+ }
+ if (appURI != null) {
+ record.putSimpleField(APP_URI, appURI);
+ }
+ if (customModulesNames != null) {
+ record.putListField(MODULES_CLASSES, customModulesNames);
+ }
+ if (customModulesURIs != null) {
+ record.putListField(MODULES_URIS, customModulesURIs);
+ }
+ if (namedParameters != null) {
+ record.putMapField(NAMED_PARAMETERS, namedParameters);
+ }
+ return record;
+ }
+
+ @Override
+ public String toString() {
+ return "app name: [" + appName + "] \n " + "app class: [" + appClassName + "] \n" + "app URI : [" + appURI
+ + "] \n" + "modules classes : [" + customModulesNames == null ? ""
+ : (Arrays.toString(customModulesNames.toArray(new String[] {}))) + " \n" + "modules URIs ["
+ + customModulesURIs == null ? ""
+ : (Arrays.toString(customModulesURIs.toArray(new String[] {}))) + "]";
+ }
+
+ public static class Builder {
+
+ AppConfig config;
+
+ public Builder() {
+ this.config = new AppConfig();
+ }
+
+ public Builder appName(String appName) {
+ config.appName = appName;
+ return this;
+ }
+
+ public Builder appClassName(String appClassName) {
+ config.appClassName = appClassName;
+ return this;
+ }
+
+ public Builder appURI(String appURI) {
+ config.appURI = appURI;
+ return this;
+ }
+
+ public Builder customModulesNames(List<String> customModulesNames) {
+ config.customModulesNames = customModulesNames;
+ return this;
+ }
+
+ public Builder customModulesURIs(List<String> customModulesURIs) {
+ config.customModulesURIs = customModulesURIs;
+ return this;
+ }
+
+ public Builder namedParameters(Map<String, String> namedParameters) {
+ config.namedParameters = namedParameters;
+ return this;
+ }
+
+ public AppConfig build() {
+ return config;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ParametersInjectionModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ParametersInjectionModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ParametersInjectionModule.java
index a20d3e6..bba20bc 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ParametersInjectionModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/ParametersInjectionModule.java
@@ -24,7 +24,8 @@ import com.google.inject.AbstractModule;
import com.google.inject.name.Names;
/**
- * Injects String parameters from a map. Used for loading parameters outside of config files.
+ * Injects String parameters from a map. Used for loading parameters outside of config files, typically parameters
+ * passed through the application configuration.
*
*/
public class ParametersInjectionModule extends AbstractModule {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
new file mode 100644
index 0000000..cc404af
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DeploymentUtils.java
@@ -0,0 +1,38 @@
+package org.apache.s4.deploy;
+
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.core.util.AppConfig;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeploymentUtils {
+
+ private static Logger logger = LoggerFactory.getLogger(DeploymentUtils.class);
+
+ public static void initAppConfig(AppConfig appConfig, String clusterName, boolean deleteIfExists, String zkString) {
+ ZkClient zk = new ZkClient(zkString);
+ ZkSerializer serializer = new ZNRecordSerializer();
+ zk.setZkSerializer(serializer);
+
+ if (zk.exists("/s4/clusters/" + clusterName + "/app/s4App")) {
+ if (deleteIfExists) {
+ zk.delete("/s4/clusters/" + clusterName + "/app/s4App");
+ }
+ }
+ try {
+ zk.create("/s4/clusters/" + clusterName + "/app/s4App", appConfig.asZNRecord("app"), CreateMode.PERSISTENT);
+ } catch (ZkNodeExistsException e) {
+ if (!deleteIfExists) {
+ logger.warn("Node {} already exists, will not overwrite", "/s4/clusters/" + clusterName + "/app/s4App");
+ } else {
+ throw new RuntimeException("Node should have been deleted");
+ }
+ }
+ zk.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
index 9fc5c53..fc05c48 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/DistributedDeploymentManager.java
@@ -20,7 +20,6 @@ package org.apache.s4.deploy;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
@@ -28,8 +27,10 @@ import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.s4.comm.topology.ZNRecord;
import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.util.ArchiveFetcher;
import org.apache.s4.core.App;
import org.apache.s4.core.Server;
+import org.apache.s4.core.util.AppConfig;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,14 +79,17 @@ public class DistributedDeploymentManager implements DeploymentManager {
private final Server server;
boolean deployed = false;
+ private final ArchiveFetcher fetcher;
+
@Inject
public DistributedDeploymentManager(@Named("s4.cluster.name") String clusterName,
@Named("s4.cluster.zk_address") String zookeeperAddress,
@Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, Server server) {
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, Server server, ArchiveFetcher fetcher) {
this.clusterName = clusterName;
this.server = server;
+ this.fetcher = fetcher;
zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
zkClient.setZkSerializer(new ZNRecordSerializer());
@@ -99,10 +103,23 @@ public class DistributedDeploymentManager implements DeploymentManager {
public void deployApplication() throws DeploymentFailedException {
ZNRecord appData = zkClient.readData(appPath);
- String uriString = appData.getSimpleField(S4R_URI);
- String appName = appData.getSimpleField("name");
+ AppConfig appConfig = new AppConfig(appData);
+ if (appConfig.getAppURI() == null) {
+ if (appConfig.getAppClassName() != null) {
+ try {
+ App app = (App) getClass().getClassLoader().loadClass(appConfig.getAppClassName()).newInstance();
+ server.startApp(app, "appName", clusterName);
+ } catch (Exception e) {
+ logger.error("Cannot start application: cannot instantiate app class {} due to: {}",
+ appConfig.getAppClassName(), e.getMessage());
+ return;
+ }
+ }
+ logger.info("{} value not set for {} : no application code will be downloaded", S4R_URI, appPath);
+ return;
+ }
try {
- URI uri = new URI(uriString);
+ URI uri = new URI(appConfig.getAppURI());
// fetch application
File localS4RFileCopy;
@@ -111,50 +128,40 @@ public class DistributedDeploymentManager implements DeploymentManager {
} catch (IOException e1) {
logger.error(
"Cannot deploy app [{}] because a local copy of the S4R file could not be initialized due to [{}]",
- appName, e1.getClass().getName() + "->" + e1.getMessage());
- throw new DeploymentFailedException("Cannot deploy application [" + appName + "]", e1);
+ appConfig.getAppName(), e1.getClass().getName() + "->" + e1.getMessage());
+ throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName() + "]", e1);
}
localS4RFileCopy.deleteOnExit();
try {
- if (ByteStreams.copy(fetchS4App(uri), Files.newOutputStreamSupplier(localS4RFileCopy)) == 0) {
+ if (ByteStreams.copy(fetcher.fetch(uri), Files.newOutputStreamSupplier(localS4RFileCopy)) == 0) {
throw new DeploymentFailedException("Cannot copy archive from [" + uri.toString() + "] to ["
+ localS4RFileCopy.getAbsolutePath() + "] (nothing was copied)");
}
- } catch (IOException e) {
- throw new DeploymentFailedException("Cannot deploy application [" + appName + "] from URI ["
- + uri.toString() + "] ", e);
+ } catch (Exception e) {
+ throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName()
+ + "] from URI [" + uri.toString() + "] ", e);
}
// install locally
- App loaded = server.loadApp(localS4RFileCopy, appName);
+ App loaded = server.loadApp(localS4RFileCopy, appConfig.getAppName());
if (loaded != null) {
- logger.info("Successfully installed application {}", appName);
+ logger.info("Successfully installed application {}", appConfig.getAppName());
// TODO sync with other nodes? (e.g. wait for other apps deployed before starting?
- server.startApp(loaded, appName, clusterName);
+ server.startApp(loaded, appConfig.getAppName(), clusterName);
} else {
- throw new DeploymentFailedException("Cannot deploy application [" + appName + "] from URI ["
- + uri.toString() + "] : cannot start application");
+ throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName()
+ + "] from URI [" + uri.toString() + "] : cannot start application");
}
} catch (URISyntaxException e) {
logger.error("Cannot deploy app {} : invalid uri for fetching s4r archive {} : {} ", new String[] {
- appName, uriString, e.getMessage() });
- throw new DeploymentFailedException("Cannot deploy application [" + appName + "]", e);
+ appConfig.getAppName(), appConfig.getAppURI(), e.getMessage() });
+ throw new DeploymentFailedException("Cannot deploy application [" + appConfig.getAppName() + "]", e);
}
deployed = true;
}
// NOTE: in theory, we could support any protocol by implementing a chained visitor scheme,
// but that's probably not that useful, and we can simply provide whichever protocol is needed
- public InputStream fetchS4App(URI uri) throws DeploymentFailedException {
- String scheme = uri.getScheme();
- if ("file".equalsIgnoreCase(scheme)) {
- return new FileSystemS4RFetcher().fetch(uri);
- }
- if ("http".equalsIgnoreCase(scheme) || "https".equalsIgnoreCase(scheme)) {
- return new HttpS4RFetcher().fetch(uri);
- }
- throw new DeploymentFailedException("Unsupported protocol " + scheme);
- }
private final class AppChangeListener implements IZkDataListener {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/d3b7c30a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/FileSystemS4RFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/FileSystemS4RFetcher.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/FileSystemS4RFetcher.java
deleted file mode 100644
index 8947998..0000000
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/FileSystemS4RFetcher.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.s4.deploy;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-import java.net.URI;
-
-/**
- * Fetches S4R files from a file system, possibly distributed.
- *
- */
-public class FileSystemS4RFetcher implements S4RFetcher {
-
- @Override
- public InputStream fetch(URI uri) throws DeploymentFailedException {
- try {
- return new FileInputStream(new File(uri));
- } catch (FileNotFoundException e) {
- throw new DeploymentFailedException("Cannot retrieve file from uri [" + uri.toString() + "]");
- }
- }
-}