You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/04/12 08:46:04 UTC
[1/2] git commit: added option for deploying prepackaged s4r file - +
minor refactorings / comments
Updated Branches:
refs/heads/S4-22 593d04bae -> 4c96825ac
added option for deploying prepackaged s4r file
- + minor refactorings / comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/4c96825a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/4c96825a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/4c96825a
Branch: refs/heads/S4-22
Commit: 4c96825ac65195338004a1d5c4f5b39d703a646d
Parents: 454450c
Author: Matthieu Morel <mm...@apache.org>
Authored: Thu Apr 12 10:39:17 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Apr 12 10:39:17 2012 +0200
----------------------------------------------------------------------
.../java/org/apache/s4/core/ProcessingElement.java | 79 +++++++--------
.../main/java/org/apache/s4/core/WindowingPE.java | 4 +-
.../src/main/java/org/apache/s4/tools/Deploy.java | 27 ++++--
3 files changed, 58 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4c96825a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 99d6f1f..804b9b5 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -39,42 +39,38 @@ import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
/**
- * @author Leo Neumeyer
- * @author Matthieu Morel
- * <p>
- * Base class for implementing processing in S4. All instances are organized as follows:
- * <ul>
- * <li>A PE prototype is a special type of instance that, along with {@link Stream} defines the topology of the
- * application graph.
- * <li>PE prototypes manage the creation and destruction of PE instances.
- * <li>All PE instances are clones of a PE prototype.
- * <li>PE instances are associated with a unique key.
- * <li>PE instances do the actual work by processing any number of input events of various types and emit output
- * events of various types.
- * <li>To process events, {@code ProcessingElement} dynamically matches an event type to a processing method.
- * See {@link org.apache.s4.core.gen.OverloadDispatcher} . There are two types of processing methods:
- * <ul>
- * <li>{@code onEvent(SomeEvent event)} When implemented, input events of type {@code SomeEvent} will be
- * dispatched to this method.
- * <li>{@code onTrigger(AnotherEvent event)} When implemented, input events of type {@code AnotherEvent} will be
- * dispatched to this method when certain conditions are met. See
- * {@link #setTrigger(Class, int, long, TimeUnit)}.
- * </ul>
- * <li>
- * A PE implementation must not create threads. A periodic task can be implemented by overloading the
- * {@link #onTime()} method. See {@link #setTimerInterval(long, TimeUnit)}
- * <li>If a reference in the PE prototype shared by the PE instances, the object must be thread safe.
- * <li>The code in a PE instance is synchronized by the framework to avoid concurrency problems.
- * <li>In some special cases, it may be desirable to allow concurrency in the PE instance. For example, there
- * may be several event processing methods that can safely run concurrently. To enable concurrency, annotate the
- * implementation of {@code ProcessingElement} with {@link ThreadSafe}.
- * <li>PE instances never use the constructor. They must be initialized by implementing the {@link #onCreate()}
- * method.
- * <li>PE class fields are cloned from the prototype. References are also copied which means that if the
- * prototype creates a collection object, all instances will be sharing the same collection object which is
- * usually <em>NOT</em> what the programmer intended . The application developer is responsible for initializing
- * objects in the {@link #onCreate()} method. For example, if each instance requires a
- * <tt>List<tt/> object the PE should implement the following:
+ * <p>
+ * Base class for implementing processing in S4. All instances are organized as follows:
+ * <ul>
+ * <li>A PE prototype is a special type of instance that, along with {@link Stream} defines the topology of the
+ * application graph.
+ * <li>PE prototypes manage the creation and destruction of PE instances.
+ * <li>All PE instances are clones of a PE prototype.
+ * <li>PE instances are associated with a unique key.
+ * <li>PE instances do the actual work by processing any number of input events of various types and emit output events
+ * of various types.
+ * <li>To process events, {@code ProcessingElement} dynamically matches an event type to a processing method. See
+ * {@link org.apache.s4.core.gen.OverloadDispatcher} . There are two types of processing methods:
+ * <ul>
+ * <li>{@code onEvent(SomeEvent event)} When implemented, input events of type {@code SomeEvent} will be dispatched to
+ * this method.
+ * <li>{@code onTrigger(AnotherEvent event)} When implemented, input events of type {@code AnotherEvent} will be
+ * dispatched to this method when certain conditions are met. See {@link #setTrigger(Class, int, long, TimeUnit)}.
+ * </ul>
+ * <li>
+ * A PE implementation must not create threads. A periodic task can be implemented by overloading the {@link #onTime()}
+ * method. See {@link #setTimerInterval(long, TimeUnit)}
+ * <li>If a reference in the PE prototype shared by the PE instances, the object must be thread safe.
+ * <li>The code in a PE instance is synchronized by the framework to avoid concurrency problems.
+ * <li>In some special cases, it may be desirable to allow concurrency in the PE instance. For example, there may be
+ * several event processing methods that can safely run concurrently. To enable concurrency, annotate the implementation
+ * of {@code ProcessingElement} with {@link ThreadSafe}.
+ * <li>PE instances never use the constructor. They must be initialized by implementing the {@link #onCreate()} method.
+ * <li>PE class fields are cloned from the prototype. References are also copied which means that if the prototype
+ * creates a collection object, all instances will be sharing the same collection object which is usually <em>NOT</em>
+ * what the programmer intended . The application developer is responsible for initializing objects in the
+ * {@link #onCreate()} method. For example, if each instance requires a
+ * <tt>List<tt/> object the PE should implement the following:
* <pre>
* {@code
* public class MyPE extends ProcessingElement {
@@ -92,7 +88,7 @@ import com.google.common.collect.Maps;
* </pre>
*
*
- * </ul>
+ * </ul>
*
*
*
@@ -176,8 +172,11 @@ public abstract class ProcessingElement implements Cloneable {
/**
* This method is called after a PE instance is created. Use it to initialize fields that are PE instance specific.
- * PE instances are created using {#clone()}. Fields initialized in the class constructor are shared by all PE
- * instances.
+ * PE instances are created using {#clone()}.
+ *
+ * <p>
+ * <b>Fields initialized in the class constructor are shared by all PE instances.</b>
+ * </p>
*/
abstract protected void onCreate();
@@ -621,7 +620,7 @@ public abstract class ProcessingElement implements Cloneable {
}
}
} catch (Exception e) {
- logger.error("Cought exception in timer when calling PE instance [{}] with id [{}].", peInstance,
+ logger.error("Caught exception in timer when calling PE instance [{}] with id [{}].", peInstance,
peInstance.id);
logger.error("Timer error.", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4c96825a/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
index 5418b83..0b6c0af 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
@@ -113,9 +113,7 @@ public abstract class WindowingPE<T> extends ProcessingElement {
}
protected void onCreate() {
- if (circularBuffer == null) {
- circularBuffer = new CircularFifoBuffer<T>(numSlots);
- }
+ circularBuffer = new CircularFifoBuffer<T>(numSlots);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/4c96825a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
index 6573e8f..df3de81 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
@@ -51,13 +51,19 @@ public class Deploy {
File s4rToDeploy = File.createTempFile("testapp" + System.currentTimeMillis(), "s4r");
- String generatedS4RPath = null;
-
- ExecGradle.exec(deployArgs.gradleExecPath, deployArgs.gradleBuildFilePath, "installS4R", new String[] {
- "appsDir=" + tmpAppsDir.getAbsolutePath(), "appName=" + deployArgs.appName });
- generatedS4RPath = tmpAppsDir.getAbsolutePath() + "/" + deployArgs.appName + ".s4r";
-
- Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(generatedS4RPath)),
+ String s4rPath = null;
+
+ if (deployArgs.s4rPath != null) {
+ s4rPath = deployArgs.s4rPath;
+ logger.info(
+ "Using specified S4R [{}], the S4R archive will not be built from source (and corresponding parameters are ignored)",
+ s4rPath);
+ } else {
+ ExecGradle.exec(deployArgs.gradleExecPath, deployArgs.gradleBuildFilePath, "installS4R", new String[] {
+ "appsDir=" + tmpAppsDir.getAbsolutePath(), "appName=" + deployArgs.appName });
+ s4rPath = tmpAppsDir.getAbsolutePath() + "/" + deployArgs.appName + ".s4r";
+ }
+ Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(s4rPath)),
Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
final String uri = s4rToDeploy.toURI().toString();
@@ -77,12 +83,15 @@ public class Deploy {
@Parameters(commandNames = "s4 deploy", commandDescription = "Package and deploy application to S4 cluster", separators = "=")
static class DeployAppArgs extends S4ArgsBase {
- @Parameter(names = "-gradle", description = "path to gradle/gradlew executable", required = true)
+ @Parameter(names = "-gradle", description = "path to gradle/gradlew executable", required = false)
String gradleExecPath;
- @Parameter(names = "-buildFile", description = "path to gradle build file for the S4 application", required = true)
+ @Parameter(names = "-buildFile", description = "path to gradle build file for the S4 application", required = false)
String gradleBuildFilePath;
+ @Parameter(names = "-s4r", description = "path to s4r file", required = false)
+ String s4rPath;
+
@Parameter(names = "-appName", description = "name of S4 application", required = true)
String appName;