You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/04 13:11:51 UTC

[1/2] incubator-quarks git commit: [QUARKS-165] [WIP] [skip-ci] add traditional parallel analytics

Repository: incubator-quarks
Updated Branches:
  refs/heads/master 6f26b5b4e -> 6ab46289f


[QUARKS-165] [WIP] [skip-ci] add traditional parallel analytics

- Add PlumbingStreams.parallel()
- Add PlumbingStreams.parallelMap()

These are in contrast to the new concurrent[Map]() in PR-99.
The impl is dependent on functionality in PR-99 and can't be merged
until that is merged. The hack makes the tests runnable and they pass.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/1a52f120
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/1a52f120
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/1a52f120

Branch: refs/heads/master
Commit: 1a52f120640c0d914f7afc99e10daf8565ea83c7
Parents: 6f26b5b
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Mon May 2 14:56:35 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Tue May 3 12:51:51 2016 -0400

----------------------------------------------------------------------
 .../topology/plumbing/PlumbingStreams.java      | 112 +++++++++-
 .../java/quarks/test/topology/PlumbingTest.java | 205 +++++++++++++++++--
 2 files changed, 298 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/1a52f120/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index c816eee..f23799c 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -19,11 +19,15 @@ under the License.
 package quarks.topology.plumbing;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import quarks.function.BiFunction;
 import quarks.function.Function;
+import quarks.function.ToIntFunction;
 import quarks.oplet.plumbing.Barrier;
 import quarks.oplet.plumbing.Isolate;
 import quarks.oplet.plumbing.PressureReliever;
@@ -298,14 +302,14 @@ public class PlumbingStreams {
      * </P><P>
      * Logically, instead of doing this:
      * <pre>{@code
-     * sensorReadings<T> -> A1pipeline -> A2pipeline -> A3pipeline -> results<R>
+     * sensorReadings<T> -> A1 -> A2 -> A3 -> results<R>
      * }</pre>
      * create a graph that's logically like this:
      * <pre>{@code
      * - 
-     *                      |-> A1pipeline ->|
-     * sensorReadings<T> -> |-> A2pipeline ->| -> result<R>
-     *                      |-> A3pipeline ->|
+     *                      |-> A1 ->|
+     * sensorReadings<T> -> |-> A2 ->| -> results<R>
+     *                      |-> A3 ->|
      * 
      * }</pre>
      * more specifically a graph like this:
@@ -426,5 +430,105 @@ public class PlumbingStreams {
       TStream<T> s1 = others.remove(0);
       return s1.fanin(new Barrier<T>(queueCapacity), others);
     }
+
+    /**
+     * Perform an analytic function on tuples in parallel.
+     * <P>
+     * Same as {@code parallel(stream, width, splitter, (s,ch) -> s.map(t -> mapper.apply(t, ch))}
+     * </P>
+     * @param stream input stream
+     * @param splitter the tuple channel allocation function
+     * @param mapper analytic function
+     * @param width number of channels
+     * @return the unordered result stream
+     */
+    public static <T,U> TStream<U> parallelMap(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<T,Integer,U> mapper) {
+      BiFunction<TStream<T>,Integer,TStream<U>> pipeline = (s,ch) -> s.map(t -> mapper.apply(t, ch));
+      return parallel(stream, width, splitter, pipeline);
+    }
+    
+    /**
+     * Perform an analytic pipeline on tuples in parallel.
+     * <P>
+     * Splits {@code stream} into {@code width} parallel processing channels,
+     * partitioning tuples among the channels using {@code splitter}.
+     * Each channel runs a copy of {@code pipeline}.
+     * The resulting stream is isolated from the upstream parallel channels.
+     * <P></P>
+     * The ordering of tuples in {@code stream} is not maintained in the
+     * results from {@code parallel}.
+     * </P><P>
+     * {@code pipeline} is not required to yield a result for each input
+     * tuple.
+     * </P><P>
+     * A common splitter function is a {@link quarks.function.Functions#roundRobinSplitter(width) roundRobinSplitter}.
+     * </P><P>
+     * The generated graph looks like this:
+     * <pre>{@code
+     * -
+     *                                    |-> isolate(10) -> pipeline-ch1 -> |
+     * stream -> split(width,splitter) -> |-> isolate(10) -> pipeline-ch2 -> |-> union -> isolate(width)
+     *                                    |-> isolate(10) -> pipeline-ch3 -> |
+     *                                                . . .
+     * }</pre>
+     * </P>
+     * 
+     * @param <T> Input stream tuple type
+     * @param <R> Result stream tuple type
+     * 
+     * @param stream the input stream
+     * @param width number of parallel processing channels
+     * @param splitter the tuple channel allocation function
+     * @param pipeline the pipeline for each channel.  
+     *        {@code pipeline.apply(inputStream,channel)}
+     *        is called to generate the pipeline for each channel.
+     * @return the isolated unordered result from each parallel channel
+     * @see #concurrent(TStream, List, Function) concurrent
+     * @see quarks.function.Functions#roundRobinSplitter(width) roundRobinSplitter
+     */
+    public static <T,R> TStream<R> parallel(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<TStream<T>,Integer,TStream<R>> pipeline) {
+      Objects.requireNonNull(stream, "stream");
+      if (width < 1)
+        throw new IllegalArgumentException("width");
+      Objects.requireNonNull(splitter, "splitter");
+      Objects.requireNonNull(pipeline, "pipeline");
+      
+      // Add the splitter
+      List<TStream<T>> channels = stream.split(width, splitter);
+      for (int ch = 0; ch < width; ch++)
+        channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
+      
+      // Add concurrency (isolation) to the channels
+      int chBufferSize = 10; // don't immediately block stream if channel is busy
+      for (int ch = 0; ch < width; ch++)
+        channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
+      
+      // Add pipelines
+      List<TStream<R>> results = new ArrayList<>(width);
+      for (int ch = 0; ch < width; ch++) {
+        results.add(pipeline.apply(channels.get(ch), ch).tag("parallel-ch"+ch));
+      }
+      
+      // Add the Union
+      TStream<R> result =  results.get(0).union(new HashSet<>(results)).tag("parallel.union");
+      
+      // Add the isolate - keep channel threads to just their pipeline processing
+      return isolate(result, width);
+    }
+    
+    /**
+     * A round-robin splitter ToIntFunction
+     * <P>
+     * The splitter function cycles among the {@code width} channels
+     * on successive calls to {@code roundRobinSplitter.applyAsInt()},
+     * returning {@code 0, 1, ..., width-1, 0, 1, ..., width-1}.
+     * </P>
+     * @see TStream#split(int, ToIntFunction) TStream.split
+     * @see PlumbingStreams#parallel(TStream, int, ToIntFunction, BiFunction) parallel
+     */
+    public static ToIntFunction<Double> roundRobinSplitter(int width) {
+      AtomicInteger cnt = new AtomicInteger();
+      return tuple -> cnt.getAndIncrement() % width;
+    }
  
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/1a52f120/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index 856f7ea..92885de 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -35,8 +35,10 @@ import org.junit.Test;
 
 import com.google.gson.JsonObject;
 
+import quarks.function.BiFunction;
 import quarks.function.Function;
 import quarks.function.Functions;
+import quarks.function.ToIntFunction;
 import quarks.topology.TStream;
 import quarks.topology.Topology;
 import quarks.topology.plumbing.PlumbingStreams;
@@ -350,25 +352,22 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
         
         Condition<Long> count = top.getTester().tupleCount(result, 3);
         Condition<List<Integer>> contents = top.getTester().streamContents(result, resultTuples );
+
         long begin = System.currentTimeMillis();
         complete(top, count);
         long end = System.currentTimeMillis();
+
         assertTrue(contents.getResult().toString(), contents.valid());
         
         long actDuration = end - begin;
-        
         long expMinSerialDuration = resultTuples.length * mappers.size() * 100;
-        long expMaxDuration = resultTuples.length * (100 + 75/*slop and more tuple overhead*/);
+        long expMinDuration = resultTuples.length * 100;
         
-        System.out.println("expMaxDuration="+expMaxDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+        System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
         
-        // a gross level performance check
+        // a gross level performance check w/concurrent channels
         assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
             actDuration < 0.5 * expMinSerialDuration);
-        
-        // a tighter performance check
-        assertTrue("expMaxDuration="+expMaxDuration+" actDuration="+actDuration, 
-            actDuration <= expMaxDuration);
     }
     
     @Test
@@ -405,25 +404,201 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
         
         Condition<Long> count = top.getTester().tupleCount(result, 3);
         Condition<List<Integer>> contents = top.getTester().streamContents(result, resultTuples );
+
         long begin = System.currentTimeMillis();
         complete(top, count);
         long end = System.currentTimeMillis();
+        
         assertTrue(contents.getResult().toString(), contents.valid());
         
         long actDuration = end - begin;
-        
         long expMinSerialDuration = resultTuples.length * pipelines.size() * 100;
-        long expMaxDuration = resultTuples.length * (100 + 75/*slop and more tuple overhead*/);
+        long expMinDuration = resultTuples.length * 100;
+        
+        System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+        
+        // a gross level performance check w/concurrent channels
+        assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
+            actDuration < 0.5 * expMinSerialDuration);
+    }
+
+    private BiFunction<Integer,Integer,JsonObject> fakeParallelAnalytic(long period, TimeUnit unit) {
+      return (value,channel) -> { 
+        try {
+          Thread.sleep(unit.toMillis(period));  // simulate work for this period
+          JsonObject jo = new JsonObject();
+          jo.addProperty("channel", channel);
+          jo.addProperty("result", value);
+          return jo;
+        } catch (InterruptedException e) {
+          throw new RuntimeException("channel="+channel+" interrupted", e);
+        }
+      };
+    }
+    
+    private BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> fakeParallelPipeline(long period, TimeUnit unit) {
+      return (stream,channel) -> stream
+          .map(value -> fakeParallelAnalytic(period, unit).apply(value,channel))
+          .filter(t->true)
+          .tag("pipeline-ch"+channel);
+    }
+    
+    private Function<JsonObject,JsonObject> fakeJsonAnalytic(int channel, long period, TimeUnit unit) {
+      return jo -> { 
+        try {
+          Thread.sleep(unit.toMillis(period));  // simulate work for this period
+          return jo;
+        } catch (InterruptedException e) {
+          throw new RuntimeException("channel="+channel+" interrupted", e);
+        }
+      };
+    }
+    
+    @SuppressWarnings("unused")
+    private BiFunction<TStream<JsonObject>,Integer,TStream<JsonObject>> fakeParallelPipelineTiming(long period, TimeUnit unit) {
+      return (stream,channel) -> stream
+          .map(jo -> { jo.addProperty("startPipelineMsec", System.currentTimeMillis());
+                       return jo; })
+          .map(fakeJsonAnalytic(channel, period, unit))
+          .filter(t->true)
+          .map(jo -> { jo.addProperty("endPipelineMsec", System.currentTimeMillis());
+                      return jo; })
+          .tag("pipeline-ch"+channel);
+    }
+    
+    @Test
+    public void testParallelMap() throws Exception {
+        Topology top = newTopology("testParallelMap");
+        
+        BiFunction<Integer,Integer,JsonObject> mapper = 
+            fakeParallelAnalytic(100, TimeUnit.MILLISECONDS);
+        
+        int width = 5;
+        ToIntFunction<Integer> splitter = tuple -> tuple % width;
+        
+        Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
+        TStream<Integer> values = top.of(resultTuples);
+        
+        TStream<JsonObject> result = PlumbingStreams.parallelMap(values, width, splitter, mapper).tag("result");
+        TStream<Integer> result2 = result.map(jo -> {
+            int r = jo.getAsJsonPrimitive("result").getAsInt();
+            assertEquals(splitter.applyAsInt(r), jo.getAsJsonPrimitive("channel").getAsInt());
+            return r;
+          });
+        
+        Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
+        Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
+    
+        long begin = System.currentTimeMillis();
+        complete(top, count);
+        long end = System.currentTimeMillis();
         
-        System.out.println("expMaxDuration="+expMaxDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+        assertTrue(contents.getResult().toString(), contents.valid());
+        
+        long actDuration = end - begin;
+        long expMinSerialDuration = resultTuples.length * 100;
+        long expMinDuration = (resultTuples.length / width) * 100;
         
-        // a gross level performance check
+        System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+        
+        // a gross level performance check w/parallel channels
         assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
             actDuration < 0.5 * expMinSerialDuration);
+    }
+    
+    @Test
+    public void testParallel() throws Exception {
+        Topology top = newTopology("testParallel");
         
-        // a tighter performance check
-        assertTrue("expMaxDuration="+expMaxDuration+" actDuration="+actDuration, 
-            actDuration <= expMaxDuration);
+        BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> pipeline = 
+            fakeParallelPipeline(100, TimeUnit.MILLISECONDS);
+        
+        int width = 5;
+        ToIntFunction<Integer> splitter = tuple -> tuple % width;
+        
+        Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
+        TStream<Integer> values = top.of(resultTuples);
+        
+        TStream<JsonObject> result = PlumbingStreams.parallel(values, width, splitter, pipeline).tag("result");
+        TStream<Integer> result2 = result.map(jo -> {
+            int r = jo.getAsJsonPrimitive("result").getAsInt();
+            assertEquals(splitter.applyAsInt(r), jo.getAsJsonPrimitive("channel").getAsInt());
+            return r;
+          });
+        
+        Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
+        Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
+        
+        long begin = System.currentTimeMillis();
+        complete(top, count);
+        long end = System.currentTimeMillis();
+        
+        assertTrue(contents.getResult().toString(), contents.valid());
+        
+        long actDuration = end - begin;
+        long expMinSerialDuration = resultTuples.length * 100;
+        long expMinDuration = (resultTuples.length / width) * 100;
+        
+        System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+        
+        // a gross level performance check w/parallel channels
+        assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
+            actDuration < 0.5 * expMinSerialDuration);
     }
+    
+//    @Test
+//    public void testParallelTiming() throws Exception {
+//        Topology top = newTopology("testParallelTiming");
+//        
+//        BiFunction<TStream<JsonObject>,Integer,TStream<JsonObject>> pipeline = 
+//            fakeParallelPipelineTiming(100, TimeUnit.MILLISECONDS);
+//        
+//        int width = 5;
+//        // ToIntFunction<Integer> splitter = tuple -> tuple % width;
+//        ToIntFunction<JsonObject> splitter = jo -> jo.getAsJsonPrimitive("result").getAsInt() % width;
+//        
+//        Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
+//        TStream<Integer> values = top.of(resultTuples);
+//        
+//        TStream<JsonObject> inStream = values.map(value -> {
+//            JsonObject jo = new JsonObject();
+//            jo.addProperty("result", value);
+//            jo.addProperty("channel", splitter.applyAsInt(jo));
+//            jo.addProperty("enterParallelMsec", System.currentTimeMillis());
+//            return jo;
+//          });
+//        TStream<JsonObject> result = PlumbingStreams.parallel(inStream, width, splitter, pipeline).tag("result");
+//        TStream<Integer> result2 = result.map(jo -> {
+//            jo.addProperty("exitParallelMsec", System.currentTimeMillis());
+//            System.out.println("ch="+jo.getAsJsonPrimitive("channel").getAsInt()
+//                +" endPipeline-startPipeline="
+//                  +(jo.getAsJsonPrimitive("endPipelineMsec").getAsLong()
+//                    - jo.getAsJsonPrimitive("startPipelineMsec").getAsLong())
+//                +" exitParallel-startPipeine="
+//                  +(jo.getAsJsonPrimitive("exitParallelMsec").getAsLong()
+//                      - jo.getAsJsonPrimitive("startPipelineMsec").getAsLong()));
+//            int r = jo.getAsJsonPrimitive("result").getAsInt();
+//            assertEquals(splitter.applyAsInt(jo), jo.getAsJsonPrimitive("channel").getAsInt());
+//            return r;
+//          });
+//        
+//        Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
+//        Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
+//        long begin = System.currentTimeMillis();
+//        complete(top, count);
+//        long end = System.currentTimeMillis();
+//        assertTrue(contents.getResult().toString(), contents.valid());
+//        
+//        long actDuration = end - begin;
+//        
+//        long expMinSerialDuration = resultTuples.length * 100;
+//        long expMinDuration = (resultTuples.length / width) * 100;
+//        
+//        System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+//        
+//        // a gross level performance check w/parallel channels
+//        assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration, 
+//            actDuration < 0.5 * expMinSerialDuration);
+//    }
 
 }


[2/2] incubator-quarks git commit: rebase, tidy up

Posted by dl...@apache.org.
rebase, tidy up

Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/6ab46289
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/6ab46289
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/6ab46289

Branch: refs/heads/master
Commit: 6ab46289f97bac6d1cc02f2f03e8f4305619ce9a
Parents: 1a52f12
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Tue May 3 12:58:28 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Tue May 3 12:58:28 2016 -0400

----------------------------------------------------------------------
 .../main/java/quarks/topology/plumbing/PlumbingStreams.java    | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/6ab46289/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index f23799c..f33651b 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -441,6 +441,8 @@ public class PlumbingStreams {
      * @param mapper analytic function
      * @param width number of channels
      * @return the unordered result stream
+     * @see #roundRobinSplitter(int) roundRobinSplitter
+     * @see #concurrentMap(TStream, List, Function) concurrentMap
      */
     public static <T,U> TStream<U> parallelMap(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<T,Integer,U> mapper) {
       BiFunction<TStream<T>,Integer,TStream<U>> pipeline = (s,ch) -> s.map(t -> mapper.apply(t, ch));
@@ -461,7 +463,7 @@ public class PlumbingStreams {
      * {@code pipeline} is not required to yield a result for each input
      * tuple.
      * </P><P>
-     * A common splitter function is a {@link quarks.function.Functions#roundRobinSplitter(width) roundRobinSplitter}.
+     * A common splitter function is a {@link #roundRobinSplitter(int) roundRobinSplitter}.
      * </P><P>
      * The generated graph looks like this:
      * <pre>{@code
@@ -483,8 +485,8 @@ public class PlumbingStreams {
      *        {@code pipeline.apply(inputStream,channel)}
      *        is called to generate the pipeline for each channel.
      * @return the isolated unordered result from each parallel channel
+     * @see #roundRobinSplitter(int) roundRobinSplitter
      * @see #concurrent(TStream, List, Function) concurrent
-     * @see quarks.function.Functions#roundRobinSplitter(width) roundRobinSplitter
      */
     public static <T,R> TStream<R> parallel(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<TStream<T>,Integer,TStream<R>> pipeline) {
       Objects.requireNonNull(stream, "stream");