You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2015/09/29 21:45:03 UTC
[1/2] incubator-apex-malhar git commit: MLHR-1824 Add App Data
support as a separate app
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 66a1d58b8 -> 873667bce
MLHR-1824 Add App Data support as a separate app
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/85ca2e5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/85ca2e5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/85ca2e5f
Branch: refs/heads/devel-3
Commit: 85ca2e5f4a66586124719c3d5ff91f0fa57ec757
Parents: 91321ce
Author: Munagala V. Ramanath <ra...@apache.org>
Authored: Fri Aug 28 09:16:31 2015 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Tue Sep 29 12:38:37 2015 -0700
----------------------------------------------------------------------
.../com/datatorrent/demos/pi/Application.java | 53 +-------
.../demos/pi/ApplicationAppData.java | 132 +++++++++++++++++++
.../src/main/resources/META-INF/properties.xml | 22 +++-
3 files changed, 157 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/85ca2e5f/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
index 382d9c8..b33a553 100644
--- a/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
+++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
@@ -15,23 +15,15 @@
*/
package com.datatorrent.demos.pi;
-import java.net.URI;
-
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.Operator;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.appdata.schemas.SchemaUtils;
-import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
-import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
-
/**
* Monte Carlo PI estimation demo : <br>
@@ -83,8 +75,6 @@ import com.datatorrent.lib.testbench.RandomEventGenerator;
@ApplicationAnnotation(name="PiDemo")
public class Application implements StreamingApplication
{
- public static final String SNAPSHOT_SCHEMA = "PiDemoDataSchema.json";
-
private final Locality locality = null;
@Override
@@ -92,42 +82,9 @@ public class Application implements StreamingApplication
{
RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
PiCalculateOperator calc = dag.addOperator("picalc", new PiCalculateOperator());
-
-
+ ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
dag.addStream("rand_calc", rand.integer_data, calc.input).setLocality(locality);
-
- String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-
- if (StringUtils.isEmpty(gatewayAddress)) {
- ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
- dag.addStream("rand_console", calc.output, console.input).setLocality(locality);
-
- } else {
-
- URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
-
- AppDataSnapshotServerMap snapshotServer
- = dag.addOperator("Snapshot Server", new AppDataSnapshotServerMap());
-
- String snapshotServerJSON = SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA);
-
- snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON);
-
- PubSubWebSocketAppDataQuery wsQuery = dag.addOperator("Query", new PubSubWebSocketAppDataQuery());
- PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult());
-
- wsQuery.setUri(uri);
- wsResult.setUri(uri);
- Operator.OutputPort<String> queryPort = wsQuery.outputPort;
- Operator.InputPort<String> queryResultPort = wsResult.input;
-
- NamedValueList<Object> adaptor = dag.addOperator("adaptor", new NamedValueList<Object>());
-
- dag.addStream("PiValues", calc.output, adaptor.inPort);
- dag.addStream("NamedPiValues", adaptor.outPort, snapshotServer.input);
- dag.addStream("Query", queryPort, snapshotServer.query);
- dag.addStream("Result", snapshotServer.queryResult, queryResultPort);
- }
+ dag.addStream("rand_console",calc.output, console.input).setLocality(locality);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/85ca2e5f/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java
new file mode 100644
index 0000000..ffe4971
--- /dev/null
+++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java
@@ -0,0 +1,132 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.pi;
+
+import java.net.URI;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
+/**
+ * Monte Carlo PI estimation demo : <br>
+ * This application computes value of PI using Monte Carlo pi estimation
+ * formula.
+ * <p>
+ * Very similar to PiDemo but data is also written to an App Data operator for visualization.
+ * <p>
+ * Running Java Test or Main app in IDE:
+ *
+ * <pre>
+ * LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see something like the
+ * following output on the console (since the input sequence of random numbers
+ * can vary from one run to the next, there will be some variation in the
+ * output values):
+ *
+ * <pre>
+ * 3.1430480549199085
+ * 3.1423454157782515
+ * 3.1431377245508982
+ * 3.142078799249531
+ * 2013-06-18 10:43:18,335 [main] INFO stram.StramLocalCluster run - Application finished.
+ * </pre>
+ *
+ * Application DAG : <br>
+ * <img src="doc-files/Application.gif" width=600px > <br>
+ * <br>
+ *
+ * Streaming Window Size : 1000 ms(1 Sec) <br>
+ * Operator Details : <br>
+ * <ul>
+ * <li><b>The rand Operator : </b> This operator generates random integer
+ * between 0-30k. <br>
+ * Class : {@link com.datatorrent.lib.testbench.RandomEventGenerator}<br>
+ * StateFull : No</li>
+ * <li><b>The calc operator : </b> This operator computes value of pi using
+ * monte carlo estimation. <br>
+ * Class : com.datatorrent.demos.pi.PiCalculateOperator <br>
+ * StateFull : No</li>
+ * <li><b>The operator Console: </b> This operator just outputs the input tuples
+ * to the console (or stdout). You can use other output adapters if needed.<br>
+ * </li>
+ * </ul>
+ *
+ * @since 0.3.2
+ */
+@ApplicationAnnotation(name="PiDemoAppData")
+public class ApplicationAppData implements StreamingApplication
+{
+ public static final String SNAPSHOT_SCHEMA = "PiDemoDataSchema.json";
+
+ private final Locality locality = null;
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+ PiCalculateOperator calc = dag.addOperator("picalc", new PiCalculateOperator());
+
+
+ dag.addStream("rand_calc", rand.integer_data, calc.input).setLocality(locality);
+
+ String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
+
+ if (StringUtils.isEmpty(gatewayAddress)) {
+ throw new RuntimeException("Error: No GATEWAY_CONNECT_ADDRESS");
+ }
+
+ URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+
+ AppDataSnapshotServerMap snapshotServer
+ = dag.addOperator("Snapshot Server", new AppDataSnapshotServerMap());
+
+ String snapshotServerJSON = SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA);
+
+ snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON);
+
+ PubSubWebSocketAppDataQuery wsQuery = dag.addOperator("Query", new PubSubWebSocketAppDataQuery());
+ PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult());
+
+ wsQuery.setUri(uri);
+ wsResult.setUri(uri);
+ Operator.OutputPort<String> queryPort = wsQuery.outputPort;
+ Operator.InputPort<String> queryResultPort = wsResult.input;
+
+ NamedValueList<Object> adaptor = dag.addOperator("adaptor", new NamedValueList<Object>());
+ ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
+
+ dag.addStream("PiValues", calc.output, adaptor.inPort, console.input).setLocality(locality);;
+ dag.addStream("NamedPiValues", adaptor.outPort, snapshotServer.input);
+ dag.addStream("Query", queryPort, snapshotServer.query);
+ dag.addStream("Result", snapshotServer.queryResult, queryResultPort);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/85ca2e5f/demos/pi/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/resources/META-INF/properties.xml b/demos/pi/src/main/resources/META-INF/properties.xml
index 3bbafdb..c70d0cb 100644
--- a/demos/pi/src/main/resources/META-INF/properties.xml
+++ b/demos/pi/src/main/resources/META-INF/properties.xml
@@ -35,12 +35,30 @@
<name>dt.application.PiDemo.operator.adaptor.valueName</name>
<value>piValue</value>
</property>
+
+ <!-- PiDemoAppData -->
+ <property>
+ <name>dt.application.PiDemoAppData.operator.rand.minvalue</name>
+ <value>0</value>
+ </property>
+ <property>
+ <name>dt.application.PiDemoAppData.operator.rand.maxvalue</name>
+ <value>30000</value>
+ </property>
+ <property>
+ <name>dt.application.PiDemoAppData.operator.picalc.base</name>
+ <value>900000000</value>
+ </property>
+ <property>
+ <name>dt.application.PiDemoAppData.operator.adaptor.valueName</name>
+ <value>piValue</value>
+ </property>
<property>
- <name>dt.application.PiDemo.operator.Query.topic</name>
+ <name>dt.application.PiDemoAppData.operator.Query.topic</name>
<value>PiDemoQuery</value>
</property>
<property>
- <name>dt.application.PiDemo.operator.QueryResult.topic</name>
+ <name>dt.application.PiDemoAppData.operator.QueryResult.topic</name>
<value>PiDemoQueryResult</value>
</property>
[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1824-ram3'
into devel-3
Posted by ti...@apache.org.
Merge branch 'MLHR-1824-ram3' into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/873667bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/873667bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/873667bc
Branch: refs/heads/devel-3
Commit: 873667bcef8668adbb76ec17454aedfa7ba0221e
Parents: 66a1d58 85ca2e5
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Tue Sep 29 12:39:47 2015 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Tue Sep 29 12:39:47 2015 -0700
----------------------------------------------------------------------
.../com/datatorrent/demos/pi/Application.java | 53 +-------
.../demos/pi/ApplicationAppData.java | 132 +++++++++++++++++++
.../src/main/resources/META-INF/properties.xml | 22 +++-
3 files changed, 157 insertions(+), 50 deletions(-)
----------------------------------------------------------------------