You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/09 19:09:48 UTC

incubator-quarks-website git commit: from d6bf448218acb1170c389acf694a138eb049f141

Repository: incubator-quarks-website
Updated Branches:
  refs/heads/asf-site 00c4d3304 -> a18203ee2


from d6bf448218acb1170c389acf694a138eb049f141


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks-website/commit/a18203ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks-website/tree/a18203ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks-website/diff/a18203ee

Branch: refs/heads/asf-site
Commit: a18203ee29fdb2566a65b2e7f96ca5d024139a22
Parents: 00c4d33
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Mon May 9 15:09:47 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Mon May 9 15:09:47 2016 -0400

----------------------------------------------------------------------
 content/algolia_search.json                    | 2 +-
 content/recipes/recipe_parallel_analytics.html | 9 +++++++++
 2 files changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks-website/blob/a18203ee/content/algolia_search.json
----------------------------------------------------------------------
diff --git a/content/algolia_search.json b/content/algolia_search.json
index db902b7..98fec8d 100644
--- a/content/algolia_search.json
+++ b/content/algolia_search.json
@@ -276,7 +276,7 @@
 "keywords": "",
 "url": "../recipes/recipe_parallel_analytics",
 "summary": "",
-"body": "If the duration of your per-tuple analytic processing makes your application unable to keep up with the tuple ingest rate or result generation rate, you can often run analytics on several tuples in parallel to improve performance.The overall proessing time for a single tuple is still the same but the processing for each tuple is overlapped. In the extreme your application may be able to process N tuples in the same time that it would have processed one.This usage model is in contrast to what's been called _concurrent analytics_, where multiple different independent analytics for a single tuple are performed concurrently, as when using `PlumbingStreams.concurrent()`.e.g., imagine your analytic pipeline has three stages to it: A1, A2, A3, and that A2 dominates the processing time. You want to change the serial processing flow graph from:```sensorReadings -> A1 -> A2 -> A3 -> results```to a flow where the A2 analytics run on several tuples in parallel in a flow like:```       
                     |-> A2-channel0 ->|sensorReadings -> A1 -> |-> A2-channel1 ->| -> A3 -> results                           |-> A2-channel2 ->|                           |-> A2-channel3 ->|                           |-> A2-channel4 ->|                                  ...```The key to the above flow is to use a _splitter_ to distribute the tuples among the parallel channels. Each of the parallel channels also needs a thread to run its analytic pipeline.`PlumbingStreams.parallel()` builds a parallel flow graph for you. Alternatively, you can use `TStream.split()`, `PlumbingStreams.isolate()`, and `TStream.union()` and build a parallel flow graph yourself.More specifically `parallel()` generates a flow like:```                                   |-> isolate(10) -> pipeline-ch0 -> |stream -> split(width,splitter) -> |-> isolate(10) -> pipeline-ch1 -> |-> union -> isolate(width)                                    |-> isolate(10) -> pipeline-ch2 -> |                                     
   ...```It's easy to use `parallel()`!## Define the splitterThe splitter function partitions the tuples among the parallel channels. `PlumbingStreams.roundRobinSplitter()` is a commonly used splitter that simply cycles among each channel in succession. The round robin strategy works great when the processing time of tuples is uniform. Other splitter functions may use information in the tuple to decide how to partition them.This recipe just uses the round robin splitter for a `TStream`.```javaint width = 5;  // number of parallel channelsToIntFunction splitter = PlumbingStreams.roundRobinSplitter(width);```## Define the pipeline to run in parallelDefine a `BiFunction, Integer, TStream>` that builds the pipeline. That is, define a function that receives a `TStream` and an integer `channel` and creates a pipeline for that channel that returns a `TStream`.Many pipelines don't care what channel they're being constructed for. While the pipeline function typically yields the same pipeline 
 processing for each channel there is no requirement for it to do so.In this simple recipe the pipeline receives a `TStream` as input and generates a `TStream` as output.```javastatic BiFunction, Integer, TStream> pipeline() {    // a simple 4 stage pipeline simulating some amount of work by sleeping    return (stream, channel) ->       {         String tagPrefix = \"pipeline-ch\"+channel;        return stream.map(tuple -> {            sleep(1000, TimeUnit.MILLISECONDS);            return \"This is the \"+tagPrefix+\" result for tuple \"+tuple;          }).tag(tagPrefix+\".stage1\")          .map(Functions.identity()).tag(tagPrefix+\".stage2\")          .map(Functions.identity()).tag(tagPrefix+\".stage3\");          .map(Functions.identity()).tag(tagPrefix+\".stage4\");      };}```## Build the parallel flowGiven a width, splitter and pipeline function it just takes a single call:```javaTStream results = PlumbingStreams.parallel(readings, width, splitter, pipeline());```## The final a
 pplicationWhen the application is run it prints out 5 (width) tuples every second. Without the parallel channels, it would only print one tuple each second.```javapackage quarks.samples.topology;import java.util.Date;import java.util.concurrent.TimeUnit;import quarks.console.server.HttpServer;import quarks.function.BiFunction;import quarks.function.Functions;import quarks.providers.development.DevelopmentProvider;import quarks.providers.direct.DirectProvider;import quarks.samples.utils.sensor.SimpleSimulatedSensor;import quarks.topology.TStream;import quarks.topology.Topology;import quarks.topology.plumbing.PlumbingStreams;/** * A recipe for parallel analytics. */public class ParallelRecipe {    /**     * Process several tuples in parallel in a replicated pipeline.     */    public static void main(String[] args) throws Exception {        DirectProvider dp = new DevelopmentProvider();        System.out.println(\"development console url: \"                + dp.getServices().getServic
 e(HttpServer.class).getConsoleUrl());        Topology top = dp.newTopology(\"ParallelRecipe\");        // The number of parallel processing channels to generate        int width = 5;                // Define the splitter        ToIntFunction splitter = PlumbingStreams.roundRobinSplitter(width);                // Generate a polled simulated sensor stream        SimpleSimulatedSensor sensor = new SimpleSimulatedSensor();        TStream readings = top.poll(sensor, 10, TimeUnit.MILLISECONDS)                                      .tag(\"readings\");                // Build the parallel analytic pipelines flow        TStream results =             PlumbingStreams.parallel(readings, width, splitter, pipeline())                .tag(\"results\");                // Print out the results.        results.sink(tuple -> System.out.println(new Date().toString() + \"   \" + tuple));        System.out.println(\"Notice that \"+width+\" results are generated every second - one from each parallel channel
 .\"            + \"\\nOnly one result would be generated each second if performed serially.\");        dp.submit(top);    }        /** Function to create analytic pipeline and add it to a stream */    private static BiFunction,Integer,TStream> pipeline() {        // a simple 3 stage pipeline simulating some amount of work by sleeping        return (stream, channel) ->           {             String tagPrefix = \"pipeline-ch\"+channel;            return stream.map(tuple -> {                sleep(1000, TimeUnit.MILLISECONDS);                return \"This is the \"+tagPrefix+\" result for tuple \"+tuple;              }).tag(tagPrefix+\".stage1\")              .map(Functions.identity()).tag(tagPrefix+\".stage2\")              .map(Functions.identity()).tag(tagPrefix+\".stage3\");          };    }    private static void sleep(long period, TimeUnit unit) throws RuntimeException {        try {            Thread.sleep(unit.toMillis(period));        } catch (InterruptedException e) {        
     throw new RuntimeException(\"Interrupted\", e);        }    }}```"
+"body": "If the duration of your per-tuple analytic processing makes your application unable to keep up with the tuple ingest rate or result generation rate, you can often run analytics on several tuples in parallel to improve performance.The overall proessing time for a single tuple is still the same but the processing for each tuple is overlapped. In the extreme your application may be able to process N tuples in the same time that it would have processed one.This usage model is in contrast to what's been called _concurrent analytics_, where multiple different independent analytics for a single tuple are performed concurrently, as when using `PlumbingStreams.concurrent()`.e.g., imagine your analytic pipeline has three stages to it: A1, A2, A3, and that A2 dominates the processing time. You want to change the serial processing flow graph from:```sensorReadings -> A1 -> A2 -> A3 -> results```to a flow where the A2 analytics run on several tuples in parallel in a flow like:```       
                     |-> A2-channel0 ->|sensorReadings -> A1 -> |-> A2-channel1 ->| -> A3 -> results                           |-> A2-channel2 ->|                           |-> A2-channel3 ->|                           |-> A2-channel4 ->|                                  ...```The key to the above flow is to use a _splitter_ to distribute the tuples among the parallel channels. Each of the parallel channels also needs a thread to run its analytic pipeline.`PlumbingStreams.parallel()` builds a parallel flow graph for you. Alternatively, you can use `TStream.split()`, `PlumbingStreams.isolate()`, and `TStream.union()` and build a parallel flow graph yourself.More specifically `parallel()` generates a flow like:```                                   |-> isolate(10) -> pipeline-ch0 -> |stream -> split(width,splitter) -> |-> isolate(10) -> pipeline-ch1 -> |-> union -> isolate(width)                                    |-> isolate(10) -> pipeline-ch2 -> |                                     
   ...```It's easy to use `parallel()`!## Define the splitterThe splitter function partitions the tuples among the parallel channels. `PlumbingStreams.roundRobinSplitter()` is a commonly used splitter that simply cycles among each channel in succession. The round robin strategy works great when the processing time of tuples is uniform. Other splitter functions may use information in the tuple to decide how to partition them.This recipe just uses the round robin splitter for a `TStream`.```javaint width = 5;  // number of parallel channelsToIntFunction splitter = PlumbingStreams.roundRobinSplitter(width);```Another possibility is to use a \"load balanced splitter\" configuration.  That is covered below.## Define the pipeline to run in parallelDefine a `BiFunction, Integer, TStream>` that builds the pipeline. That is, define a function that receives a `TStream` and an integer `channel` and creates a pipeline for that channel that returns a `TStream`.Many pipelines don't care what chann
 el they're being constructed for. While the pipeline function typically yields the same pipeline processing for each channel there is no requirement for it to do so.In this simple recipe the pipeline receives a `TStream` as input and generates a `TStream` as output.```javastatic BiFunction, Integer, TStream> pipeline() {    // a simple 4 stage pipeline simulating some amount of work by sleeping    return (stream, channel) ->       {         String tagPrefix = \"pipeline-ch\"+channel;        return stream.map(tuple -> {            sleep(1000, TimeUnit.MILLISECONDS);            return \"This is the \"+tagPrefix+\" result for tuple \"+tuple;          }).tag(tagPrefix+\".stage1\")          .map(Functions.identity()).tag(tagPrefix+\".stage2\")          .map(Functions.identity()).tag(tagPrefix+\".stage3\");          .map(Functions.identity()).tag(tagPrefix+\".stage4\");      };}```## Build the parallel flowGiven a width, splitter and pipeline function it just takes a single call:```javaTS
 tream results = PlumbingStreams.parallel(readings, width, splitter, pipeline());```## Load balanced parallel flowA load balanced parallel flow allocates an incoming tuple to the first available parallel channel. When tuple processing times are variable, using a load balanced parallel flow can result in greater overall throughput.To create a load balanced parallel flow simply use the `parallelBalanced()` method instead of `parallel()`. Everything is the same except you don't supply a splitter: ```javaTStream results = PlumbingStreams.parallelBalanced(readings, width, pipeline());```## The final applicationWhen the application is run it prints out 5 (width) tuples every second. Without the parallel channels, it would only print one tuple each second.```javapackage quarks.samples.topology;import java.util.Date;import java.util.concurrent.TimeUnit;import quarks.console.server.HttpServer;import quarks.function.BiFunction;import quarks.function.Functions;import quarks.providers.developmen
 t.DevelopmentProvider;import quarks.providers.direct.DirectProvider;import quarks.samples.utils.sensor.SimpleSimulatedSensor;import quarks.topology.TStream;import quarks.topology.Topology;import quarks.topology.plumbing.PlumbingStreams;/** * A recipe for parallel analytics. */public class ParallelRecipe {    /**     * Process several tuples in parallel in a replicated pipeline.     */    public static void main(String[] args) throws Exception {        DirectProvider dp = new DevelopmentProvider();        System.out.println(\"development console url: \"                + dp.getServices().getService(HttpServer.class).getConsoleUrl());        Topology top = dp.newTopology(\"ParallelRecipe\");        // The number of parallel processing channels to generate        int width = 5;                // Define the splitter        ToIntFunction splitter = PlumbingStreams.roundRobinSplitter(width);                // Generate a polled simulated sensor stream        SimpleSimulatedSensor sensor = n
 ew SimpleSimulatedSensor();        TStream readings = top.poll(sensor, 10, TimeUnit.MILLISECONDS)                                      .tag(\"readings\");                // Build the parallel analytic pipelines flow        TStream results =             PlumbingStreams.parallel(readings, width, splitter, pipeline())                .tag(\"results\");                // Print out the results.        results.sink(tuple -> System.out.println(new Date().toString() + \"   \" + tuple));        System.out.println(\"Notice that \"+width+\" results are generated every second - one from each parallel channel.\"            + \"\\nOnly one result would be generated each second if performed serially.\");        dp.submit(top);    }        /** Function to create analytic pipeline and add it to a stream */    private static BiFunction,Integer,TStream> pipeline() {        // a simple 3 stage pipeline simulating some amount of work by sleeping        return (stream, channel) ->           {             
 String tagPrefix = \"pipeline-ch\"+channel;            return stream.map(tuple -> {                sleep(1000, TimeUnit.MILLISECONDS);                return \"This is the \"+tagPrefix+\" result for tuple \"+tuple;              }).tag(tagPrefix+\".stage1\")              .map(Functions.identity()).tag(tagPrefix+\".stage2\")              .map(Functions.identity()).tag(tagPrefix+\".stage3\");          };    }    private static void sleep(long period, TimeUnit unit) throws RuntimeException {        try {            Thread.sleep(unit.toMillis(period));        } catch (InterruptedException e) {            throw new RuntimeException(\"Interrupted\", e);        }    }}```"
 
 },
 

http://git-wip-us.apache.org/repos/asf/incubator-quarks-website/blob/a18203ee/content/recipes/recipe_parallel_analytics.html
----------------------------------------------------------------------
diff --git a/content/recipes/recipe_parallel_analytics.html b/content/recipes/recipe_parallel_analytics.html
index 3f3cdab..49ec137 100644
--- a/content/recipes/recipe_parallel_analytics.html
+++ b/content/recipes/recipe_parallel_analytics.html
@@ -637,6 +637,8 @@ stream -&gt; split(width,splitter) -&gt; |-&gt; isolate(10) -&gt; pipeline-ch1 -
 
 <span class="n">ToIntFunction</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">splitter</span> <span class="o">=</span> <span class="n">PlumbingStreams</span><span class="o">.</span><span class="na">roundRobinSplitter</span><span class="o">(</span><span class="n">width</span><span class="o">);</span>
 </code></pre></div>
+<p>Another possibility is to use a &quot;load balanced splitter&quot; configuration.  That is covered below.</p>
+
 <h2 id="define-the-pipeline-to-run-in-parallel">Define the pipeline to run in parallel</h2>
 
 <p>Define a <code>BiFunction&lt;TStream&lt;T&gt;, Integer, TStream&lt;R&gt;&gt;</code> that builds the pipeline. That is, define a function that receives a <code>TStream&lt;T&gt;</code> and an integer <code>channel</code> and creates a pipeline for that channel that returns a <code>TStream&lt;R&gt;</code>.</p>
@@ -664,6 +666,13 @@ stream -&gt; split(width,splitter) -&gt; |-&gt; isolate(10) -&gt; pipeline-ch1 -
 <p>Given a width, splitter and pipeline function it just takes a single call:</p>
 <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">TStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">results</span> <span class="o">=</span> <span class="n">PlumbingStreams</span><span class="o">.</span><span class="na">parallel</span><span class="o">(</span><span class="n">readings</span><span class="o">,</span> <span class="n">width</span><span class="o">,</span> <span class="n">splitter</span><span class="o">,</span> <span class="n">pipeline</span><span class="o">());</span>
 </code></pre></div>
+<h2 id="load-balanced-parallel-flow">Load balanced parallel flow</h2>
+
+<p>A load balanced parallel flow allocates an incoming tuple to the first available parallel channel. When tuple processing times are variable, using a load balanced parallel flow can result in greater overall throughput.</p>
+
+<p>To create a load balanced parallel flow simply use the <code>parallelBalanced()</code> method instead of <code>parallel()</code>. Everything is the same except you don&#39;t supply a splitter: </p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">TStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">results</span> <span class="o">=</span> <span class="n">PlumbingStreams</span><span class="o">.</span><span class="na">parallelBalanced</span><span class="o">(</span><span class="n">readings</span><span class="o">,</span> <span class="n">width</span><span class="o">,</span> <span class="n">pipeline</span><span class="o">());</span>
+</code></pre></div>
 <h2 id="the-final-application">The final application</h2>
 
 <p>When the application is run it prints out 5 (width) tuples every second. Without the parallel channels, it would only print one tuple each second.</p>