You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2015/09/29 21:45:03 UTC

[1/2] incubator-apex-malhar git commit: MLHR-1824 Add App Data support as a separate app

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 66a1d58b8 -> 873667bce


MLHR-1824 Add App Data support as a separate app


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/85ca2e5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/85ca2e5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/85ca2e5f

Branch: refs/heads/devel-3
Commit: 85ca2e5f4a66586124719c3d5ff91f0fa57ec757
Parents: 91321ce
Author: Munagala V. Ramanath <ra...@apache.org>
Authored: Fri Aug 28 09:16:31 2015 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Tue Sep 29 12:38:37 2015 -0700

----------------------------------------------------------------------
 .../com/datatorrent/demos/pi/Application.java   |  53 +-------
 .../demos/pi/ApplicationAppData.java            | 132 +++++++++++++++++++
 .../src/main/resources/META-INF/properties.xml  |  22 +++-
 3 files changed, 157 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/85ca2e5f/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
index 382d9c8..b33a553 100644
--- a/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
+++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java
@@ -15,23 +15,15 @@
  */
 package com.datatorrent.demos.pi;
 
-import java.net.URI;
-
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.Operator;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.appdata.schemas.SchemaUtils;
-import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
-import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
-import com.datatorrent.lib.testbench.RandomEventGenerator;
-
 
 /**
  * Monte Carlo PI estimation demo : <br>
@@ -83,8 +75,6 @@ import com.datatorrent.lib.testbench.RandomEventGenerator;
 @ApplicationAnnotation(name="PiDemo")
 public class Application implements StreamingApplication
 {
-  public static final String SNAPSHOT_SCHEMA = "PiDemoDataSchema.json";
-
   private final Locality locality = null;
 
   @Override
@@ -92,42 +82,9 @@ public class Application implements StreamingApplication
   {
     RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
     PiCalculateOperator calc = dag.addOperator("picalc", new PiCalculateOperator());
-
-
+    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
     dag.addStream("rand_calc", rand.integer_data, calc.input).setLocality(locality);
-
-    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
-
-    if (StringUtils.isEmpty(gatewayAddress)) {
-      ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
-      dag.addStream("rand_console", calc.output, console.input).setLocality(locality);
-
-    } else {
-
-      URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
-
-      AppDataSnapshotServerMap snapshotServer
-        = dag.addOperator("Snapshot Server", new AppDataSnapshotServerMap());
-
-      String snapshotServerJSON = SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA);
-
-      snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON);
-
-      PubSubWebSocketAppDataQuery wsQuery = dag.addOperator("Query", new PubSubWebSocketAppDataQuery());
-      PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult());
-
-      wsQuery.setUri(uri);
-      wsResult.setUri(uri);
-      Operator.OutputPort<String> queryPort = wsQuery.outputPort;
-      Operator.InputPort<String> queryResultPort = wsResult.input;
-
-      NamedValueList<Object> adaptor = dag.addOperator("adaptor", new NamedValueList<Object>());
-
-      dag.addStream("PiValues", calc.output, adaptor.inPort);
-      dag.addStream("NamedPiValues", adaptor.outPort, snapshotServer.input);
-      dag.addStream("Query", queryPort, snapshotServer.query);
-      dag.addStream("Result", snapshotServer.queryResult, queryResultPort);
-    }
+    dag.addStream("rand_console",calc.output, console.input).setLocality(locality);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/85ca2e5f/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java
new file mode 100644
index 0000000..ffe4971
--- /dev/null
+++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java
@@ -0,0 +1,132 @@
+/**
+ * Copyright (C) 2015 DataTorrent, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.demos.pi;
+
+import java.net.URI;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
+import com.datatorrent.lib.testbench.RandomEventGenerator;
+
+/**
+ * Monte Carlo PI estimation demo : <br>
+ * This application computes value of PI using Monte Carlo pi estimation
+ * formula.
+ * <p>
+ * Very similar to PiDemo but data is also written to an App Data operator for visualization.
+ * <p>
+ * Running Java Test or Main app in IDE:
+ *
+ * <pre>
+ * LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see something like the
+ * following output on the console (since the input sequence of random numbers
+ * can vary from one run to the next, there will be some variation in the
+ * output values):
+ *
+ * <pre>
+ * 3.1430480549199085
+ * 3.1423454157782515
+ * 3.1431377245508982
+ * 3.142078799249531
+ * 2013-06-18 10:43:18,335 [main] INFO  stram.StramLocalCluster run - Application finished.
+ * </pre>
+ *
+ * Application DAG : <br>
+ * <img src="doc-files/Application.gif" width=600px > <br>
+ * <br>
+ *
+ * Streaming Window Size : 1000 ms(1 Sec) <br>
+ * Operator Details : <br>
+ * <ul>
+ * <li><b>The rand Operator : </b> This operator generates random integer
+ * between 0-30k. <br>
+ * Class : {@link com.datatorrent.lib.testbench.RandomEventGenerator}<br>
+ * StateFull : No</li>
+ * <li><b>The calc operator : </b> This operator computes value of pi using
+ * monte carlo estimation. <br>
+ * Class : com.datatorrent.demos.pi.PiCalculateOperator <br>
+ * StateFull : No</li>
+ * <li><b>The operator Console: </b> This operator just outputs the input tuples
+ * to the console (or stdout). You can use other output adapters if needed.<br>
+ * </li>
+ * </ul>
+ *
+ * @since 0.3.2
+ */
+@ApplicationAnnotation(name="PiDemoAppData")
+public class ApplicationAppData implements StreamingApplication
+{
+  public static final String SNAPSHOT_SCHEMA = "PiDemoDataSchema.json";
+
+  private final Locality locality = null;
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
+    PiCalculateOperator calc = dag.addOperator("picalc", new PiCalculateOperator());
+
+
+    dag.addStream("rand_calc", rand.integer_data, calc.input).setLocality(locality);
+
+    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
+
+    if (StringUtils.isEmpty(gatewayAddress)) {
+      throw new RuntimeException("Error: No GATEWAY_CONNECT_ADDRESS");
+    }
+
+    URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+
+    AppDataSnapshotServerMap snapshotServer
+      = dag.addOperator("Snapshot Server", new AppDataSnapshotServerMap());
+
+    String snapshotServerJSON = SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA);
+
+    snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON);
+
+    PubSubWebSocketAppDataQuery wsQuery = dag.addOperator("Query", new PubSubWebSocketAppDataQuery());
+    PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult());
+
+    wsQuery.setUri(uri);
+    wsResult.setUri(uri);
+    Operator.OutputPort<String> queryPort = wsQuery.outputPort;
+    Operator.InputPort<String> queryResultPort = wsResult.input;
+
+    NamedValueList<Object> adaptor = dag.addOperator("adaptor", new NamedValueList<Object>());
+    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
+
+    dag.addStream("PiValues", calc.output, adaptor.inPort, console.input).setLocality(locality);;
+    dag.addStream("NamedPiValues", adaptor.outPort, snapshotServer.input);
+    dag.addStream("Query", queryPort, snapshotServer.query);
+    dag.addStream("Result", snapshotServer.queryResult, queryResultPort);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/85ca2e5f/demos/pi/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/pi/src/main/resources/META-INF/properties.xml b/demos/pi/src/main/resources/META-INF/properties.xml
index 3bbafdb..c70d0cb 100644
--- a/demos/pi/src/main/resources/META-INF/properties.xml
+++ b/demos/pi/src/main/resources/META-INF/properties.xml
@@ -35,12 +35,30 @@
     <name>dt.application.PiDemo.operator.adaptor.valueName</name>
     <value>piValue</value>
   </property>
+
+  <!-- PiDemoAppData  -->
+  <property>
+    <name>dt.application.PiDemoAppData.operator.rand.minvalue</name>
+    <value>0</value>
+  </property>
+  <property>
+    <name>dt.application.PiDemoAppData.operator.rand.maxvalue</name>
+    <value>30000</value>
+  </property>
+  <property>
+    <name>dt.application.PiDemoAppData.operator.picalc.base</name>
+    <value>900000000</value>
+  </property>
+  <property>
+    <name>dt.application.PiDemoAppData.operator.adaptor.valueName</name>
+    <value>piValue</value>
+  </property>
   <property>
-    <name>dt.application.PiDemo.operator.Query.topic</name>
+    <name>dt.application.PiDemoAppData.operator.Query.topic</name>
     <value>PiDemoQuery</value>
   </property>
   <property>
-    <name>dt.application.PiDemo.operator.QueryResult.topic</name>
+    <name>dt.application.PiDemoAppData.operator.QueryResult.topic</name>
     <value>PiDemoQueryResult</value>
   </property>
 


[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1824-ram3' into devel-3

Posted by ti...@apache.org.
Merge branch 'MLHR-1824-ram3' into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/873667bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/873667bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/873667bc

Branch: refs/heads/devel-3
Commit: 873667bcef8668adbb76ec17454aedfa7ba0221e
Parents: 66a1d58 85ca2e5
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Tue Sep 29 12:39:47 2015 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Tue Sep 29 12:39:47 2015 -0700

----------------------------------------------------------------------
 .../com/datatorrent/demos/pi/Application.java   |  53 +-------
 .../demos/pi/ApplicationAppData.java            | 132 +++++++++++++++++++
 .../src/main/resources/META-INF/properties.xml  |  22 +++-
 3 files changed, 157 insertions(+), 50 deletions(-)
----------------------------------------------------------------------