You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/04 13:13:34 UTC

[1/2] incubator-quarks-website git commit: [QUARKS-165] [AWAIT-CODE-MERGE] parallel analytics recipe

Repository: incubator-quarks-website
Updated Branches:
  refs/heads/master 59922d4e2 -> 2306c25f8


[QUARKS-165] [AWAIT-CODE-MERGE] parallel analytics recipe


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks-website/commit/630ff431
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks-website/tree/630ff431
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks-website/diff/630ff431

Branch: refs/heads/master
Commit: 630ff431d236ba49202ab6aaa745f84f5465537c
Parents: 59922d4
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Tue May 3 14:57:01 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Tue May 3 14:57:01 2016 -0400

----------------------------------------------------------------------
 site/_data/mydoc/mydoc_sidebar.yml        |   8 ++
 site/recipes/recipe_parallel_analytics.md | 172 +++++++++++++++++++++++++
 2 files changed, 180 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks-website/blob/630ff431/site/_data/mydoc/mydoc_sidebar.yml
----------------------------------------------------------------------
diff --git a/site/_data/mydoc/mydoc_sidebar.yml b/site/_data/mydoc/mydoc_sidebar.yml
index 3f484f7..f43d78a 100644
--- a/site/_data/mydoc/mydoc_sidebar.yml
+++ b/site/_data/mydoc/mydoc_sidebar.yml
@@ -162,6 +162,14 @@ entries:
       version: all
       output: web, pdf
 
+    - title: How can I run analytics on several tuples in parallel?
+      url: /recipes/recipe_parallel_analytics.html
+      audience: writers, designers
+      platform: all
+      product: all
+      version: all
+      output: web, pdf
+
     - title: How can I run several analytics on a tuple concurrently?
       url: /recipes/recipe_concurrent_analytics.html
       audience: writers, designers

http://git-wip-us.apache.org/repos/asf/incubator-quarks-website/blob/630ff431/site/recipes/recipe_parallel_analytics.md
----------------------------------------------------------------------
diff --git a/site/recipes/recipe_parallel_analytics.md b/site/recipes/recipe_parallel_analytics.md
new file mode 100644
index 0000000..5084465
--- /dev/null
+++ b/site/recipes/recipe_parallel_analytics.md
@@ -0,0 +1,172 @@
+---
+title: How can I run analytics on several tuples in parallel?
+---
+
+If the duration of your per-tuple analytic processing makes your application unable to keep up with the tuple injest rate or result generation rate, you can often run analytics on several tuples in parallel to improve performance.
+
+The overall proessing time for a single tuple is still the same but the processing for each tuple is overlapped. In the extreme your application may be able to process N tuples in the same time that it would have processed one.
+
+This usage model is in contrast to what's been called _concurrent analytics_, where multiple different independent analytics for a single tuple are performed concurrently, as when using `PlumbingStreams.concurrent()`.
+
+e.g., imagine your analytic pipeline has three stages to it: A1, A2, A3, and that A2 dominates the processing time. You want to change the serial processing flow graph from:
+
+```
+sensorReadings<T> -> A1 -> A2 -> A3 -> results<R>
+```
+
+to a flow where the A2 analytics run on several tuples in parallel in a flow like:
+
+```
+                           |-> A2-channel0 ->|
+sensorReadings<T> -> A1 -> |-> A2-channel1 ->| -> A3 -> results<R>
+                           |-> A2-channel2 ->|
+                           |-> A2-channel3 ->|
+                           |-> A2-channel4 ->|
+                                  ...
+```
+
+The key to the above flow is to use a _splitter_ to distribute the tuples among the parallel channels.  Each of the parallel channels also needs a thread to run its analytic pipeline.
+
+`PlumbingStreams.parallel()` builds a parallel flow graph for you.  Alternatively, you can use `TStream.split()`, `PlumbingStreams.isolate()`, and `TStream.union()` and build a parallel flow graph yourself.
+
+More specifically `parallel()` generates a flow like:
+
+```
+                                   |-> isolate(10) -> pipeline-ch0 -> |
+stream -> split(width,splitter) -> |-> isolate(10) -> pipeline-ch1 -> |-> union -> isolate(width) 
+                                   |-> isolate(10) -> pipeline-ch2 -> |
+                                       ...
+```
+
+It's easy to use `parallel()`!
+
+## Define the splitter
+
+The splitter function partitions the tuples among the parallel channels.  `PlumbingStreams.roundRobinSplitter()` is a commonly used splitter that simply cycles among each channel in succession.  The round robin strategy works great when the processing time of tuples is uniform.  Other splitter functions may use information in the tuple to decide how to partition them.
+
+This recipe just uses the round robin splitter for a `TStream<Double>`.
+
+```java
+int width = 5;  // number of parallel channels
+
+ToIntFunction<Double> splitter = PlumbingStreams.roundRobinSplitter(width);
+```
+
+## Define the pipeline to run in parallel
+
+Define a `BiFunction<TStream<T>, Integer, TStream<R>>` that builds the pipeline. That is, define a function that receives a `TStream<T>` and an integer `channel` and creates a pipeline for that channel that returns a `TStream<R>`.  
+
+Many pipelines don't care what channel they're being constructed for. While the pipeline function typically yields the same pipeline processing for each channel there is no requirement for it to do so.
+
+In this simple recipe the pipeline receives a `TStream<Double>` as input and generates a `TStream<String>` as output.
+
+```java
+static BiFunction<TStream<Double>, Integer, TStream<String>> pipeline() {
+    // a simple 4 stage pipeline simulating some amount of work by sleeping
+    return (stream, channel) -> 
+      { 
+        String tagPrefix = "pipeline-ch"+channel;
+        return stream.map(tuple -> {
+            sleep(1000, TimeUnit.MILLISECONDS);
+            return "This is the "+tagPrefix+" result for tuple "+tuple;
+          }).tag(tagPrefix+".stage1")
+          .map(Functions.identity()).tag(tagPrefix+".stage2")
+          .map(Functions.identity()).tag(tagPrefix+".stage3");
+          .map(Functions.identity()).tag(tagPrefix+".stage4");
+      };
+}
+```
+
+## Build the parallel flow
+
+Given a width, splitter and pipeline function it just takes a single call:
+
+```java
+TStream<String> results = PlumbingStreams.parallel(readings, width, splitter, pipeline());
+```
+
+## The final application
+
+When the application is run it prints out 5 (width) tuples every second. Without the parallel channels, it would only print one tuple each second.
+
+```java
+package quarks.samples.topology;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import quarks.console.server.HttpServer;
+import quarks.function.BiFunction;
+import quarks.function.Functions;
+import quarks.providers.development.DevelopmentProvider;
+import quarks.providers.direct.DirectProvider;
+import quarks.samples.utils.sensor.SimpleSimulatedSensor;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+import quarks.topology.plumbing.PlumbingStreams;
+
+/**
+ * A recipe for parallel analytics.
+ */
+public class ParallelRecipe {
+
+    /**
+     * Process several tuples in parallel in a replicated pipeline.
+     */
+    public static void main(String[] args) throws Exception {
+
+        DirectProvider dp = new DevelopmentProvider();
+        System.out.println("development console url: "
+                + dp.getServices().getService(HttpServer.class).getConsoleUrl());
+
+        Topology top = dp.newTopology("ParallelRecipe");
+
+        // The number of parallel processing channels to generate
+        int width = 5;
+        
+        // Define the splitter
+        ToIntFunction<Double> splitter = PlumbingStreams.roundRobinSplitter(width);
+        
+        // Generate a polled simulated sensor stream
+        SimpleSimulatedSensor sensor = new SimpleSimulatedSensor();
+        TStream<Double> readings = top.poll(sensor, 10, TimeUnit.MILLISECONDS)
+                                      .tag("readings");
+        
+        // Build the parallel analytic pipelines flow
+        TStream<String> results = 
+            PlumbingStreams.parallel(readings, width, splitter, pipeline())
+                .tag("results");
+        
+        // Print out the results.
+        results.sink(tuple -> System.out.println(new Date().toString() + "   " + tuple));
+
+        System.out.println("Notice that "+width+" results are generated every second - one from each parallel channel."
+            + "\nOnly one result would be generated each second if performed serially.");
+        dp.submit(top);
+    }
+    
+    /** Function to create analytic pipeline and add it to a stream */
+    private static BiFunction<TStream<Double>,Integer,TStream<String>> pipeline() {
+        // a simple 3 stage pipeline simulating some amount of work by sleeping
+        return (stream, channel) -> 
+          { 
+            String tagPrefix = "pipeline-ch"+channel;
+            return stream.map(tuple -> {
+                sleep(1000, TimeUnit.MILLISECONDS);
+                return "This is the "+tagPrefix+" result for tuple "+tuple;
+              }).tag(tagPrefix+".stage1")
+              .map(Functions.identity()).tag(tagPrefix+".stage2")
+              .map(Functions.identity()).tag(tagPrefix+".stage3");
+          };
+    }
+
+    private static void sleep(long period, TimeUnit unit) throws RuntimeException {
+        try {
+            Thread.sleep(unit.toMillis(period));
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Interrupted", e);
+        }
+    }
+
+}
+```


[2/2] incubator-quarks-website git commit: address feedback

Posted by dl...@apache.org.
address feedback


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks-website/commit/2306c25f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks-website/tree/2306c25f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks-website/diff/2306c25f

Branch: refs/heads/master
Commit: 2306c25f81bccbcc43b8ff799d2608d1c2b00490
Parents: 630ff43
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Tue May 3 15:08:35 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Tue May 3 15:08:35 2016 -0400

----------------------------------------------------------------------
 site/recipes/recipe_parallel_analytics.md | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks-website/blob/2306c25f/site/recipes/recipe_parallel_analytics.md
----------------------------------------------------------------------
diff --git a/site/recipes/recipe_parallel_analytics.md b/site/recipes/recipe_parallel_analytics.md
index 5084465..965f455 100644
--- a/site/recipes/recipe_parallel_analytics.md
+++ b/site/recipes/recipe_parallel_analytics.md
@@ -2,7 +2,7 @@
 title: How can I run analytics on several tuples in parallel?
 ---
 
-If the duration of your per-tuple analytic processing makes your application unable to keep up with the tuple injest rate or result generation rate, you can often run analytics on several tuples in parallel to improve performance.
+If the duration of your per-tuple analytic processing makes your application unable to keep up with the tuple ingest rate or result generation rate, you can often run analytics on several tuples in parallel to improve performance.
 
 The overall proessing time for a single tuple is still the same but the processing for each tuple is overlapped. In the extreme your application may be able to process N tuples in the same time that it would have processed one.
 
@@ -25,9 +25,9 @@ sensorReadings<T> -> A1 -> |-> A2-channel1 ->| -> A3 -> results<R>
                                   ...
 ```
 
-The key to the above flow is to use a _splitter_ to distribute the tuples among the parallel channels.  Each of the parallel channels also needs a thread to run its analytic pipeline.
+The key to the above flow is to use a _splitter_ to distribute the tuples among the parallel channels. Each of the parallel channels also needs a thread to run its analytic pipeline.
 
-`PlumbingStreams.parallel()` builds a parallel flow graph for you.  Alternatively, you can use `TStream.split()`, `PlumbingStreams.isolate()`, and `TStream.union()` and build a parallel flow graph yourself.
+`PlumbingStreams.parallel()` builds a parallel flow graph for you. Alternatively, you can use `TStream.split()`, `PlumbingStreams.isolate()`, and `TStream.union()` and build a parallel flow graph yourself.
 
 More specifically `parallel()` generates a flow like:
 
@@ -42,7 +42,7 @@ It's easy to use `parallel()`!
 
 ## Define the splitter
 
-The splitter function partitions the tuples among the parallel channels.  `PlumbingStreams.roundRobinSplitter()` is a commonly used splitter that simply cycles among each channel in succession.  The round robin strategy works great when the processing time of tuples is uniform.  Other splitter functions may use information in the tuple to decide how to partition them.
+The splitter function partitions the tuples among the parallel channels. `PlumbingStreams.roundRobinSplitter()` is a commonly used splitter that simply cycles among each channel in succession. The round robin strategy works great when the processing time of tuples is uniform. Other splitter functions may use information in the tuple to decide how to partition them.
 
 This recipe just uses the round robin splitter for a `TStream<Double>`.
 
@@ -54,7 +54,7 @@ ToIntFunction<Double> splitter = PlumbingStreams.roundRobinSplitter(width);
 
 ## Define the pipeline to run in parallel
 
-Define a `BiFunction<TStream<T>, Integer, TStream<R>>` that builds the pipeline. That is, define a function that receives a `TStream<T>` and an integer `channel` and creates a pipeline for that channel that returns a `TStream<R>`.  
+Define a `BiFunction<TStream<T>, Integer, TStream<R>>` that builds the pipeline. That is, define a function that receives a `TStream<T>` and an integer `channel` and creates a pipeline for that channel that returns a `TStream<R>`.
 
 Many pipelines don't care what channel they're being constructed for. While the pipeline function typically yields the same pipeline processing for each channel there is no requirement for it to do so.