You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/15 16:06:01 UTC
[9/22] improved scripts + added adapter facilities - added a simple
runnable project template - added a facility to start an adapter node
directly from the current project - added some documentation - improved error
handling in tcp emitter
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateApp.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateApp.java
new file mode 100644
index 0000000..8f870be
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateApp.java
@@ -0,0 +1,139 @@
+package org.apache.s4.tools;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.google.common.io.LineProcessor;
+import com.google.common.io.Resources;
+
+public class CreateApp extends S4ArgsBase {
+
+ static Logger logger = LoggerFactory.getLogger(CreateApp.class);
+
+ public static void main(String[] args) {
+
+ final CreateAppArgs appArgs = new CreateAppArgs();
+ Tools.parseArgs(appArgs, args);
+
+ if (new File(appArgs.getAppDir() + "/" + appArgs.appName.get(0)).exists()) {
+ System.err.println("There is already a directory called " + appArgs.appName.get(0) + " in the "
+ + appArgs.getAppDir()
+ + " directory. Please specify another name for your project or specify another parent directory");
+ System.exit(1);
+ }
+ // create project structure
+ try {
+ createDir(appArgs, "/src/main/java");
+ createDir(appArgs, "/src/main/resources");
+ createDir(appArgs, "/src/main/java/hello");
+
+ // copy gradlew script (redirecting to s4 gradlew)
+ File gradlewTempFile = File.createTempFile("gradlew", "tmp");
+ gradlewTempFile.deleteOnExit();
+ Files.copy(Resources.newInputStreamSupplier(Resources.getResource("templates/gradlew")), gradlewTempFile);
+ String gradlewScriptContent = Files.readLines(gradlewTempFile, Charsets.UTF_8, new PathsReplacer(appArgs));
+ Files.write(gradlewScriptContent, gradlewTempFile, Charsets.UTF_8);
+ Files.copy(gradlewTempFile, new File(appArgs.getAppDir() + "/gradlew"));
+ new File(appArgs.getAppDir() + "/gradlew").setExecutable(true);
+
+ // copy build file
+ Files.copy(Resources.newInputStreamSupplier(Resources.getResource("templates/build.gradle")), new File(
+ appArgs.getAppDir() + "/build.gradle"));
+ // copy hello app files
+ Files.copy(Resources.newInputStreamSupplier(Resources.getResource("templates/HelloPE.java.txt")), new File(
+ appArgs.getAppDir() + "/src/main/java/hello/HelloPE.java"));
+ Files.copy(Resources.newInputStreamSupplier(Resources.getResource("templates/HelloApp.java.txt")),
+ new File(appArgs.getAppDir() + "/src/main/java/hello/HelloApp.java"));
+ // copy hello app adapter
+ Files.copy(Resources.newInputStreamSupplier(Resources.getResource("templates/HelloInputAdapter.java.txt")),
+ new File(appArgs.getAppDir() + "/src/main/java/hello/HelloInputAdapter.java"));
+
+ File s4TmpFile = File.createTempFile("s4Script", "template");
+ s4TmpFile.deleteOnExit();
+ Files.copy(Resources.newInputStreamSupplier(Resources.getResource("templates/s4")), s4TmpFile);
+
+ // create s4
+ String preparedS4Script = Files.readLines(s4TmpFile, Charsets.UTF_8, new PathsReplacer(appArgs));
+
+ File s4Script = new File(appArgs.getAppDir() + "/s4");
+ Files.write(preparedS4Script, s4Script, Charsets.UTF_8);
+ s4Script.setExecutable(true);
+
+ File readmeTmpFile = File.createTempFile("newApp", "README");
+ readmeTmpFile.deleteOnExit();
+ Files.copy(Resources.newInputStreamSupplier(Resources.getResource("templates/newApp.README")),
+ readmeTmpFile);
+ // display contents from readme
+ Files.readLines(readmeTmpFile, Charsets.UTF_8, new LineProcessor<Boolean>() {
+
+ @Override
+ public boolean processLine(String line) throws IOException {
+ System.out.println(line.replace("<appDir>", appArgs.getAppDir()));
+ return true;
+ }
+
+ @Override
+ public Boolean getResult() {
+ return true;
+ }
+
+ });
+ } catch (Exception e) {
+ logger.error("Could not create project due to [{}]. Please check your configuration.", e.getMessage());
+ logger.error("Could not create project due to [{}]. Please check your configuration.", e.getMessage());
+ }
+ }
+
+ private static void createDir(CreateAppArgs appArgs, String dirName) throws Exception {
+ String filePath = appArgs.getAppDir() + dirName;
+ if (!new File(filePath).mkdirs()) {
+ logger.error("Cannot create directory [{}], exiting.", filePath);
+ throw new Exception("Cannot create directory [" + filePath + "]");
+ }
+ }
+
+ private static final class PathsReplacer implements LineProcessor<String> {
+ private final CreateAppArgs appArgs;
+ StringBuilder sb = new StringBuilder();
+
+ private PathsReplacer(CreateAppArgs appArgs) {
+ this.appArgs = appArgs;
+ }
+
+ @Override
+ public boolean processLine(String line) throws IOException {
+ sb.append(line.replace("<s4_script_path>", appArgs.s4ScriptPath).replace("<s4_install_dir>",
+ new File(appArgs.s4ScriptPath).getParent())
+ + "\n");
+ return true;
+ }
+
+ @Override
+ public String getResult() {
+ return sb.toString();
+ }
+ }
+
+ @Parameters(commandNames = "newApp", separators = "=", commandDescription = "Create new application skeleton")
+ static class CreateAppArgs extends S4ArgsBase {
+
+ @Parameter(description = "name of the application", required = true, arity = 1)
+ List<String> appName;
+
+ @Parameter(names = "-parentDir", description = "parent directory of the application")
+ String parentDir = System.getProperty("user.dir");
+
+ public String getAppDir() {
+ return parentDir + "/" + appName.get(0);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
index fd73428..f4b269b 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
@@ -38,7 +38,7 @@ public class DefineCluster {
@Parameters(commandNames = "s4 newCluster", separators = "=", commandDescription = "Setup new S4 logical cluster")
static class ZKServerArgs extends S4ArgsBase {
- @Parameter(names = "-cluster", description = "S4 cluster name", required = true)
+ @Parameter(names = { "-c", "-cluster" }, description = "S4 cluster name", required = true)
String clusterName = "s4-test-cluster";
@Parameter(names = "-nbTasks", description = "number of tasks for the cluster", required = true)
@@ -47,7 +47,7 @@ public class DefineCluster {
@Parameter(names = "-zk", description = "Zookeeper connection string")
String zkConnectionString = "localhost:2181";
- @Parameter(names = "-firstListeningPort", description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
+ @Parameter(names = { "-flp", "-firstListeningPort" }, description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
int firstListeningPort = -1;
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
index 3183bd2..de8843a 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
@@ -27,7 +27,7 @@ import com.beust.jcommander.Parameters;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
-public class Deploy {
+public class Deploy extends S4ArgsBase {
private static File tmpAppsDir;
static org.slf4j.Logger logger = LoggerFactory.getLogger(Deploy.class);
@@ -61,8 +61,8 @@ public class Deploy {
"Using specified S4R [{}], the S4R archive will not be built from source (and corresponding parameters are ignored)",
s4rPath);
} else {
- ExecGradle.exec(deployArgs.gradleExecPath, deployArgs.gradleBuildFilePath, "installS4R", new String[] {
- "appsDir=" + tmpAppsDir.getAbsolutePath(), "appName=" + deployArgs.appName });
+ ExecGradle.exec(deployArgs.gradleBuildFilePath, "installS4R",
+ new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath(), "appName=" + deployArgs.appName });
s4rPath = tmpAppsDir.getAbsolutePath() + "/" + deployArgs.appName + ".s4r";
}
Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(s4rPath)),
@@ -86,10 +86,7 @@ public class Deploy {
@Parameters(commandNames = "s4 deploy", commandDescription = "Package and deploy application to S4 cluster", separators = "=")
static class DeployAppArgs extends S4ArgsBase {
- @Parameter(names = "-gradle", description = "path to gradle/gradlew executable", required = false)
- String gradleExecPath;
-
- @Parameter(names = "-buildFile", description = "path to gradle build file for the S4 application", required = false)
+ @Parameter(names = { "-b", "-buildFile" }, description = "path to gradle build file for the S4 application", required = false)
String gradleBuildFilePath;
@Parameter(names = "-s4r", description = "path to s4r file", required = false)
@@ -98,7 +95,7 @@ public class Deploy {
@Parameter(names = "-appName", description = "name of S4 application", required = true)
String appName;
- @Parameter(names = "-cluster", description = "logical name of the S4 cluster", required = true)
+ @Parameter(names = { "-c", "-cluster" }, description = "logical name of the S4 cluster", required = true)
String clusterName;
@Parameter(names = "-zk", description = "zookeeper connection string")
@@ -111,8 +108,7 @@ public class Deploy {
static class ExecGradle {
- public static void exec(String gradlewExecPath, String buildFilePath, String taskName, String[] params)
- throws Exception {
+ public static void exec(String buildFilePath, String taskName, String[] params) throws Exception {
ProjectConnection connection = GradleConnector.newConnector()
.forProjectDirectory(new File(buildFilePath).getParentFile()).connect();
@@ -138,7 +134,6 @@ public class Deploy {
build.withArguments(buildArgs.toArray(new String[] {}));
-
// if you want to listen to the progress events:
ProgressListener listener = null; // use your implementation
// build.addProgressListener(listener);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4ArgsBase.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4ArgsBase.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4ArgsBase.java
index 6ca7d01..477fdaa 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4ArgsBase.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4ArgsBase.java
@@ -7,4 +7,7 @@ public abstract class S4ArgsBase {
@Parameter(names = "-help", description = "usage")
boolean help = false;
+ @Parameter(names = "-s4ScriptPath", description = "path of the S4 script", hidden = true, required = true)
+ String s4ScriptPath;
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index b7c06a6..2ef86d8 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -1,30 +1,84 @@
package org.apache.s4.tools;
import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
+
+import org.apache.s4.core.Main;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.common.collect.Sets;
public class Tools {
+ static Logger logger = LoggerFactory.getLogger(Tools.class);
+
+ enum Task {
+ deploy(Deploy.class), node(Main.class), zkServer(ZKServer.class), newCluster(DefineCluster.class), adapter(null), newApp(
+ CreateApp.class);
+
+ Class<?> target;
+
+ Task(Class<?> target) {
+ this.target = target;
+ }
+
+ public void dispatch(String[] args) {
+ try {
+ Method main = target.getMethod("main", String[].class);
+ main.invoke(null, new Object[] { args });
+ } catch (Exception e) {
+ logger.error("Cannot dispatch to task [{}]: wrong arguments [{}]", this.name(), Arrays.toString(args));
+ }
+ }
+
+ }
+
public static void main(String[] args) {
+
+ if (!(args.length > 1)) {
+ List<String> taskNames = getTaskNames();
+ System.err.println("please specify a task name and proper arguments. Available tasks are: "
+ + Arrays.toString(taskNames.toArray(new String[] {})));
+ System.exit(1);
+ }
+
+ // then we just pass all arguments without the task name
+ Task task = null;
try {
- Class<?> toolClass = Class.forName(args[0]);
- Method main = toolClass.getMethod("main", String[].class);
- if (args.length > 1) {
- main.invoke(null, new Object[] { Arrays.copyOfRange(args, 1, args.length) });
- } else {
- main.invoke(null, new Object[] { new String[0] });
- }
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ // first argument is -s4ScriptPath=x
+ task = Task.valueOf(args[1]);
+ } catch (IllegalArgumentException e) {
+ System.err.println("please specify a task name and proper arguments. Available tasks are: "
+ + Arrays.toString(getTaskNames().toArray(new String[] {})));
+ System.exit(1);
+ }
+ List<String> taskArgs = new ArrayList<String>();
+ if (!task.name().equals("node")) {
+ taskArgs.add(args[0]); // s4 script (only for s4-tools project classes)
+ }
+ if (args.length > 1) {
+ taskArgs.addAll(Arrays.asList(Arrays.copyOfRange(args, 2, args.length)));
+ }
+ task.dispatch(taskArgs.toArray(new String[] {}));
+
+ }
+
+ private static List<String> getTaskNames() {
+ Task[] tasks = Task.values();
+ List<String> taskNames = new ArrayList<String>();
+ for (Task task : tasks) {
+ taskNames.add(task.name());
}
+ return taskNames;
}
- public static void parseArgs(Object jcArgs, String[] cliArgs) {
+ public static JCommander parseArgs(Object jcArgs, String[] cliArgs) {
JCommander jc = new JCommander(jcArgs);
try {
if (Sets.newHashSet(cliArgs).contains("-help")) {
@@ -37,7 +91,15 @@ public class Tools {
} catch (Exception e) {
JCommander.getConsole().println("Cannot parse arguments: " + e.getMessage());
jc.usage();
- System.exit(-1);
+ System.exit(1);
}
+ return jc;
+ }
+
+ @Parameters
+ static class ToolsArgs {
+ @Parameter(description = "Name of the task", required = true)
+ String taskName;
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-tools/src/main/resources/templates/HelloApp.java.txt
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/resources/templates/HelloApp.java.txt b/subprojects/s4-tools/src/main/resources/templates/HelloApp.java.txt
new file mode 100644
index 0000000..076b0a1
--- /dev/null
+++ b/subprojects/s4-tools/src/main/resources/templates/HelloApp.java.txt
@@ -0,0 +1,34 @@
+package hello;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+
+public class HelloApp extends App {
+
+ @Override
+ protected void onStart() {
+ }
+
+ @Override
+ protected void onInit() {
+ // create a prototype
+ HelloPE helloPE = createPE(HelloPE.class);
+ // Create a stream that listens to the "names" stream and passes events to the helloPE instance.
+ createInputStream("names", new KeyFinder<Event>() {
+
+ @Override
+ public List<String> get(Event event) {
+ return Arrays.asList(new String[] { event.get("name") });
+ }
+ }, helloPE);
+ }
+
+ @Override
+ protected void onClose() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-tools/src/main/resources/templates/HelloInputAdapter.java.txt
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/resources/templates/HelloInputAdapter.java.txt b/subprojects/s4-tools/src/main/resources/templates/HelloInputAdapter.java.txt
new file mode 100644
index 0000000..dff4d8e
--- /dev/null
+++ b/subprojects/s4-tools/src/main/resources/templates/HelloInputAdapter.java.txt
@@ -0,0 +1,60 @@
+package hello;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.adapter.AdapterApp;
+
+public class HelloInputAdapter extends AdapterApp {
+
+ @Override
+ protected void onStart() {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+
+ ServerSocket serverSocket = null;
+ Socket connectedSocket;
+ BufferedReader in = null;
+ try {
+ serverSocket = new ServerSocket(15000);
+ while (true) {
+ connectedSocket = serverSocket.accept();
+ in = new BufferedReader(new InputStreamReader(connectedSocket.getInputStream()));
+
+ String line = in.readLine();
+ System.out.println("read: " + line);
+ Event event = new Event();
+ event.put("name", String.class, line);
+ getRemoteStream().put(event);
+ connectedSocket.close();
+ }
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ // System.exit(-1);
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ if (serverSocket != null) {
+ try {
+ serverSocket.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+ }).start();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-tools/src/main/resources/templates/HelloPE.java.txt
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/resources/templates/HelloPE.java.txt b/subprojects/s4-tools/src/main/resources/templates/HelloPE.java.txt
new file mode 100644
index 0000000..f1c6014
--- /dev/null
+++ b/subprojects/s4-tools/src/main/resources/templates/HelloPE.java.txt
@@ -0,0 +1,29 @@
+package hello;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.core.ProcessingElement;
+
+public class HelloPE extends ProcessingElement {
+
+ // you should define downstream streams here and inject them in the app definition
+
+ boolean seen = false;
+
+ /**
+ * This method is called upon a new Event on an incoming stream
+ */
+ public void onEvent(Event event) {
+ // in this example, we use the default generic Event type, by you can also define your own type
+ System.out.println("Hello " + (seen ? "again " : "") + event.get("name") + "!");
+ seen = true;
+ }
+
+ @Override
+ protected void onCreate() {
+ }
+
+ @Override
+ protected void onRemove() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-tools/src/main/resources/templates/build.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/resources/templates/build.gradle b/subprojects/s4-tools/src/main/resources/templates/build.gradle
new file mode 100644
index 0000000..f60c3f3
--- /dev/null
+++ b/subprojects/s4-tools/src/main/resources/templates/build.gradle
@@ -0,0 +1,168 @@
+/**
+* Apache S4 Application Build File
+*
+* Use this script to build and package S4 apps.
+*
+* Run 'gradle install' on the s4 project to publish to your local maven repo.
+*
+* TODO: This should probably be distributed as an s4 plugin for Gradle.
+* TODO: There seem to be to be similarities with the war and jetty plugins. (war -> s4r, jetty -> s4Run).
+* We should make it easy to test the app from this script by a running a test task that starts and stops
+* an s4 server. See: http://www.gradle.org/releases/1.0-milestone-3/docs/userguide/userguide_single.html#war_plugin
+*
+* This is an interesting discussion:
+* http://gradle.1045684.n5.nabble.com/Exclude-properties-file-from-war-td3365147.html
+*
+*/
+
+/* Set the destination where we want to install the apps. */
+
+s4AppInstallDir = hasProperty('appsDir') ? "$appsDir" : "/tmp/appsDir"
+
+s4Version = '0.5.0-SNAPSHOT'
+description = 'Apache S4 App'
+//defaultTasks 'installS4R'
+archivesBaseName = "$project.name"
+distRootFolder = "$archivesBaseName-${-> version}"
+
+
+// Append the suffix 'SNAPSHOT' when the build is not for release.
+//version = new Version(major: 0, minor: 0, bugfix: 0, isRelease: false)
+group = 'org.apache.s4'
+
+apply plugin: 'java'
+apply plugin: 'eclipse'
+apply plugin:'application'
+
+/* The app classname is set automatically from the source files. */
+def appClassname = ''
+
+/* Set Java version. */
+sourceCompatibility = 1.6
+targetCompatibility = 1.6
+
+repositories {
+ mavenLocal()
+ mavenCentral()
+ mavenRepo name: "gson", urls: "http://google-gson.googlecode.com/svn/mavenrepo"
+
+ /* Add lib dir as a repo. Some jar files that are not available
+ in a public repo are distributed in the lib dir. */
+ flatDir name: 'libDir', dirs: "$rootDir/lib"
+}
+
+/* All project libraries must be defined here. */
+libraries = [
+ // for instance, adding twitter4j 2.2.5 will be:
+ //twitter4j_core: 'org.twitter4j:twitter4j-core:2.2.5'
+ // http://mvnrepository.com/ is a good source
+ // if you need to use a different repository, please specify it in the above "repositories" block
+
+ // you always need the s4 libraries for building your app
+ s4_base: 'org.apache.s4:s4-base:'+ s4Version,
+ s4_comm: 'org.apache.s4:s4-comm:'+s4Version,
+ s4_core: 'org.apache.s4:s4-core:'+s4Version
+ ]
+
+
+dependencies {
+
+ /* S4 Platform. We only need the API, not the transitive dependencies. */
+
+ compile (libraries.s4_base)
+ compile (libraries.s4_comm)
+ compile (libraries.s4_core)
+
+ // if you need to use the twitter4j lib defined above, you must reference it here as a dependency
+ // compile (libraries.twitter4j_core)
+
+
+}
+
+/* Set the manifest attributes for the S4 archive here.
+* TODO: separate custom properties from std ones and set custom properties at the top of the build script.
+*/
+manifest.mainAttributes(
+ provider: 'gradle',
+ 'Implementation-Url': 'http://incubator.apache.org/projects/s4.html',
+ 'Implementation-Version': version,
+ 'Implementation-Vendor': 'Apache S4',
+ 'Implementation-Vendor-Id': 's4app',
+ 'S4-App-Class': appClassname, // gets set by the s4r task.
+ 'S4-Version': s4Version
+ )
+
+appDependencies = ( configurations.compile )
+
+/* This task will extract all the class files and create a fat jar. We set the manifest and the extension to make it an S4 archive file. */
+// TODO: exclude schenma files as needed (not critical) see: http://forums.gradle.org/gradle/topics/using_gradle_to_fat_jar_a_spring_project
+task s4r(type: Jar) {
+ dependsOn jar
+ from { appDependencies.collect { it.isDirectory() ? it : zipTree(it) } }
+ from { configurations.archives.allArtifacts.files.collect { zipTree(it) } }
+ manifest = project.manifest
+ extension = 's4r'
+
+ /* Set class name in manifest. Parse source files until we find a class that extends App.
+ * Get fully qualified Java class name and set attribute in Manifest.
+ */
+ sourceSets.main.allSource.files.each { File file ->
+ if (appClassname =="" || appClassname == "UNKNOWN") {
+ // only execute the closure for this file if we haven't already found the app class name
+ appClassname = getAppClassname(file)
+ if(appClassname != "") {
+ manifest.mainAttributes('S4-App-Class': appClassname)
+ }
+ }
+ }
+
+ if (appClassname == "UNKNOWN") {
+
+ println "Couldn't find App class in source files...aborting."
+ exit(1)
+ }
+}
+
+
+/* List the artifacts that will br added to the s4 archive (and explode if needed). */
+s4r << {
+ appDependencies.each { File file -> println 'Adding to s4 archive: ' + file.name }
+ configurations.archives.allArtifacts.files.each { println 'Adding to s4 archive: ' + it.name }
+
+}
+
+task cp << {
+ description='dumps the classpath for running a class from this project, into a \'classpath.txt\' file in the current directory'
+ new File("classpath.txt").write(sourceSets.main.runtimeClasspath.asPath)
+}
+
+/* Install the S4 archive to the install directory. */
+task installS4R (type: Copy) {
+ dependsOn s4r
+ from s4r.archivePath
+ into s4AppInstallDir
+}
+
+/* Parse source file to get the app classname so we can use it in the manifest.
+* TODO: Use a real Java parser. (This is not skipping comments for example.)
+*/
+def getAppClassname(file) {
+ def classname = "UNKNOWN"
+ lines= file.readLines()
+ for(line in lines) {
+
+ def pn = line =~ /.*package\s+([\w\.]+)\s*;.*/
+ if(pn) {
+ packageName = pn[0][1] + "."
+ }
+
+ def an = line =~ /.*public\s+class\s+(\w+)\s+extends.+App.*\{/
+ if (an) {
+ classname = packageName + an[0][1]
+ println "Found app class name: " + classname
+ break
+ }
+
+ }
+ classname
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-tools/src/main/resources/templates/gradlew
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/resources/templates/gradlew b/subprojects/s4-tools/src/main/resources/templates/gradlew
new file mode 100644
index 0000000..93380d1
--- /dev/null
+++ b/subprojects/s4-tools/src/main/resources/templates/gradlew
@@ -0,0 +1,2 @@
+#!/bin/bash
+<s4_install_dir>/gradlew $@
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-tools/src/main/resources/templates/newApp.README
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/resources/templates/newApp.README b/subprojects/s4-tools/src/main/resources/templates/newApp.README
new file mode 100644
index 0000000..a340c19
--- /dev/null
+++ b/subprojects/s4-tools/src/main/resources/templates/newApp.README
@@ -0,0 +1,37 @@
+ _____ _ _
+ / ____|| || |
+ | (___ | || |_
+ \___ \ |__ _|
+ ____) | | |
+ |_____/ |_|
+
+ You just created a new S4 project in <appDir>!
+
+ It follows a maven-like structure:
+ - the build file is at the root of the project
+ - sources are in src/main/java
+
+ We use gradle for building this project.
+
+ To build the project:
+ - "./gradlew" (from the root of the project, this calls the gradle script from the s4 installation)
+
+ A s4 script has been created at the root of the project's directory. It calls the s4 script from your S4 installation.
+
+
+ To execute the application in a new S4 cluster:
+ 1. start a ZooKeeper instance "./s4 zkServer"
+ 2. define a logical cluster for your application "./s4 newCluster -cluster=<nameOfTheCluster> -nbTasks=<number of partitions> -firstListeningPort=<a port number for the first node, other nodes use an increment on this initial port>"
+ 3. start a node and attach it to the cluster "./s4 node -cluster=<nameOfTheCluster>"
+ 4. deploy the application "./s4 deploy -cluster=<nameOfTheCluster>"
+
+ If you want to inject events from application 2 into application 1 on cluster 1:
+ - application 1 must define an input stream with a name (say: stream1)
+ - application 2 must define an output stream with the same name (stream1)
+
+ If you want to use a simple adapter process, listening to an external source, converting incoming data into S4 events, and sending that to S4 apps, you can define
+ your own app that extends the AdapterApp class. Then to start it (for instance):
+ - ./s4 adapter -cluster=c1 -appClass=org.apache.s4.example.twitter.TwitterInputAdapter -namedStringParameters=adapter.output.stream:stream1
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/subprojects/s4-tools/src/main/resources/templates/s4
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/resources/templates/s4 b/subprojects/s4-tools/src/main/resources/templates/s4
new file mode 100644
index 0000000..fd54444
--- /dev/null
+++ b/subprojects/s4-tools/src/main/resources/templates/s4
@@ -0,0 +1,17 @@
+#!/bin/bash
+
+case "$1" in
+"adapter")
+ # we need something different in order to pass the classpath of the current project
+ # you must specify: appClassName (extends AdapterApp class) , cluster name, output stream name
+ # current syntax is not yet straightforward:
+ # example : ./s4 adapter -cluster=c1 -appClass=org.apache.s4.example.twitter.TwitterInputAdapter -namedStringParameters=adapter.output.stream:s1
+ shift 1
+ <s4_install_dir>/gradlew cp
+ java -cp `cat classpath.txt` org.apache.s4.core.Main $@
+;;
+*)
+ echo "calling referenced s4 script : <s4_script_path>"
+ (cd <s4_install_dir> && <s4_script_path> $@)
+;;
+esac
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/test-apps/twitter-adapter/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/build.gradle b/test-apps/twitter-adapter/build.gradle
index 5e7b2dc..c0f3576 100644
--- a/test-apps/twitter-adapter/build.gradle
+++ b/test-apps/twitter-adapter/build.gradle
@@ -166,6 +166,11 @@ task installS4R (type: Copy) {
http://www.gradle.org/1.0-milestone-3/docs/userguide/gradle_wrapper.html */
task wrapper(type: Wrapper) { gradleVersion = '1.0-milestone-3' }
+task cp << {
+ description='dumps the classpath for running a class from this project, into a \'classpath.txt\' file in the current directory'
+ new File("classpath.txt").write(sourceSets.main.runtimeClasspath.asPath)
+}
+
/* Parse source file to get the app classname so we can use it in the manifest.
* TODO: Use a real Java parser. (This is not skippong comments for example.)
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
index 0827e3a..4eb3f2b 100644
--- a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
+++ b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
@@ -7,8 +7,7 @@ import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.s4.base.Event;
-import org.apache.s4.core.App;
-import org.apache.s4.core.RemoteStream;
+import org.apache.s4.core.adapter.AdapterApp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -19,7 +18,7 @@ import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
-public class TwitterInputAdapter extends App {
+public class TwitterInputAdapter extends AdapterApp {
private static Logger logger = LoggerFactory.getLogger(TwitterInputAdapter.class);
@@ -32,19 +31,13 @@ public class TwitterInputAdapter extends App {
private Thread t;
- private int messageCount;
-
- private RemoteStream remoteStream;
-
@Override
protected void onClose() {
- // TODO Auto-generated method stub
-
}
@Override
protected void onInit() {
- remoteStream = createOutputStream("RawStatus");
+ super.onInit();
t = new Thread(new Dequeuer());
}
@@ -116,7 +109,7 @@ public class TwitterInputAdapter extends App {
Status status = messageQueue.take();
Event event = new Event();
event.put("statusText", String.class, status.getText());
- remoteStream.put(event);
+ getRemoteStream().put(event);
} catch (Exception e) {
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/test-apps/twitter-counter/build.gradle
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/build.gradle b/test-apps/twitter-counter/build.gradle
index b93d0b9..4b7f6ff 100644
--- a/test-apps/twitter-counter/build.gradle
+++ b/test-apps/twitter-counter/build.gradle
@@ -70,8 +70,7 @@ repositories {
/* All project libraries must be defined here. */
libraries = [
- twitter4j_core: 'org.twitter4j:twitter4j-core:2.2.5',
- twitter4j_stream: 'org.twitter4j:twitter4j-stream:2.2.5',
+
s4_base: 'org.apache.s4:s4-base:0.5.0-SNAPSHOT',
s4_comm: 'org.apache.s4:s4-comm:0.5.0-SNAPSHOT',
s4_core: 'org.apache.s4:s4-core:0.5.0-SNAPSHOT'
@@ -89,8 +88,7 @@ dependencies {
compile (libraries.s4_base)
compile (libraries.s4_comm)
compile (libraries.s4_core)
- compile (libraries.twitter4j_core)
- compile (libraries.twitter4j_stream)
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cad14b40/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
index 2862df9..f6b211c 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
@@ -1,17 +1,22 @@
package org.apache.s4.example.twitter;
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeSet;
-import org.apache.s4.base.Event;
import org.apache.s4.core.App;
import org.apache.s4.core.ProcessingElement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.common.io.Files;
public class TopNTopicPE extends ProcessingElement {
@@ -33,17 +38,25 @@ public class TopNTopicPE extends ProcessingElement {
sortedTopics.add(new TopNEntry(topicCount.getKey(), topicCount.getValue()));
}
- logger.info("\n------------------");
+ File f = new File("TopNTopics.txt");
+ StringBuilder sb = new StringBuilder();
int i = 0;
Iterator<TopNEntry> iterator = sortedTopics.iterator();
- long time = System.currentTimeMillis();
+ sb.append("----\n" + new SimpleDateFormat("yyyy-MMM-dd HH:mm:ss").format(new Date()) + "\n");
+
while (iterator.hasNext() && i < 10) {
TopNEntry entry = iterator.next();
- logger.info("{} : topic [{}] count [{}]",
- new String[] { String.valueOf(time), entry.topic, String.valueOf(entry.count) });
+ sb.append("topic [" + entry.topic + "] count [" + entry.count + "]\n");
i++;
}
+ sb.append("\n");
+ try {
+ Files.append(sb.toString(), f, Charsets.UTF_8);
+ logger.info("Wrote top 10 topics to file [{}] ", f.getAbsolutePath());
+ } catch (IOException e) {
+ logger.error("Cannot write top 10 topics to file [{}]", f.getAbsolutePath(), e);
+ }
}
@Override