You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/04 13:11:51 UTC
[1/2] incubator-quarks git commit: [QUARKS-165] [WIP] [skip-ci] add
traditional parallel analytics
Repository: incubator-quarks
Updated Branches:
refs/heads/master 6f26b5b4e -> 6ab46289f
[QUARKS-165] [WIP] [skip-ci] add traditional parallel analytics
- Add PlumbingStreams.parallel()
- Add PlumbingStreams.parallelMap()
These are in contrast to the new concurrent[Map]() in PR-99.
The impl is dependent on functionality in PR-99 and can't be merged
until that is merged. The hack makes the tests runnable and they pass.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/1a52f120
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/1a52f120
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/1a52f120
Branch: refs/heads/master
Commit: 1a52f120640c0d914f7afc99e10daf8565ea83c7
Parents: 6f26b5b
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Mon May 2 14:56:35 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Tue May 3 12:51:51 2016 -0400
----------------------------------------------------------------------
.../topology/plumbing/PlumbingStreams.java | 112 +++++++++-
.../java/quarks/test/topology/PlumbingTest.java | 205 +++++++++++++++++--
2 files changed, 298 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/1a52f120/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index c816eee..f23799c 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -19,11 +19,15 @@ under the License.
package quarks.topology.plumbing;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import quarks.function.BiFunction;
import quarks.function.Function;
+import quarks.function.ToIntFunction;
import quarks.oplet.plumbing.Barrier;
import quarks.oplet.plumbing.Isolate;
import quarks.oplet.plumbing.PressureReliever;
@@ -298,14 +302,14 @@ public class PlumbingStreams {
* </P><P>
* Logically, instead of doing this:
* <pre>{@code
- * sensorReadings<T> -> A1pipeline -> A2pipeline -> A3pipeline -> results<R>
+ * sensorReadings<T> -> A1 -> A2 -> A3 -> results<R>
* }</pre>
* create a graph that's logically like this:
* <pre>{@code
* -
- * |-> A1pipeline ->|
- * sensorReadings<T> -> |-> A2pipeline ->| -> result<R>
- * |-> A3pipeline ->|
+ * |-> A1 ->|
+ * sensorReadings<T> -> |-> A2 ->| -> results<R>
+ * |-> A3 ->|
*
* }</pre>
* more specifically a graph like this:
@@ -426,5 +430,105 @@ public class PlumbingStreams {
TStream<T> s1 = others.remove(0);
return s1.fanin(new Barrier<T>(queueCapacity), others);
}
+
+ /**
+ * Perform an analytic function on tuples in parallel.
+ * <P>
+ * Same as {@code parallel(stream, width, splitter, (s,ch) -> s.map(t -> mapper.apply(t, ch))}
+ * </P>
+ * @param stream input stream
+ * @param splitter the tuple channel allocation function
+ * @param mapper analytic function
+ * @param width number of channels
+ * @return the unordered result stream
+ */
+ public static <T,U> TStream<U> parallelMap(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<T,Integer,U> mapper) {
+ BiFunction<TStream<T>,Integer,TStream<U>> pipeline = (s,ch) -> s.map(t -> mapper.apply(t, ch));
+ return parallel(stream, width, splitter, pipeline);
+ }
+
+ /**
+ * Perform an analytic pipeline on tuples in parallel.
+ * <P>
+ * Splits {@code stream} into {@code width} parallel processing channels,
+ * partitioning tuples among the channels using {@code splitter}.
+ * Each channel runs a copy of {@code pipeline}.
+ * The resulting stream is isolated from the upstream parallel channels.
+ * <P></P>
+ * The ordering of tuples in {@code stream} is not maintained in the
+ * results from {@code parallel}.
+ * </P><P>
+ * {@code pipeline} is not required to yield a result for each input
+ * tuple.
+ * </P><P>
+ * A common splitter function is a {@link quarks.function.Functions#roundRobinSplitter(width) roundRobinSplitter}.
+ * </P><P>
+ * The generated graph looks like this:
+ * <pre>{@code
+ * -
+ * |-> isolate(10) -> pipeline-ch1 -> |
+ * stream -> split(width,splitter) -> |-> isolate(10) -> pipeline-ch2 -> |-> union -> isolate(width)
+ * |-> isolate(10) -> pipeline-ch3 -> |
+ * . . .
+ * }</pre>
+ * </P>
+ *
+ * @param <T> Input stream tuple type
+ * @param <R> Result stream tuple type
+ *
+ * @param stream the input stream
+ * @param width number of parallel processing channels
+ * @param splitter the tuple channel allocation function
+ * @param pipeline the pipeline for each channel.
+ * {@code pipeline.apply(inputStream,channel)}
+ * is called to generate the pipeline for each channel.
+ * @return the isolated unordered result from each parallel channel
+ * @see #concurrent(TStream, List, Function) concurrent
+ * @see quarks.function.Functions#roundRobinSplitter(width) roundRobinSplitter
+ */
+ public static <T,R> TStream<R> parallel(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<TStream<T>,Integer,TStream<R>> pipeline) {
+ Objects.requireNonNull(stream, "stream");
+ if (width < 1)
+ throw new IllegalArgumentException("width");
+ Objects.requireNonNull(splitter, "splitter");
+ Objects.requireNonNull(pipeline, "pipeline");
+
+ // Add the splitter
+ List<TStream<T>> channels = stream.split(width, splitter);
+ for (int ch = 0; ch < width; ch++)
+ channels.set(ch, channels.get(ch).tag("parallel.split-ch"+ch));
+
+ // Add concurrency (isolation) to the channels
+ int chBufferSize = 10; // don't immediately block stream if channel is busy
+ for (int ch = 0; ch < width; ch++)
+ channels.set(ch, isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch"+ch));
+
+ // Add pipelines
+ List<TStream<R>> results = new ArrayList<>(width);
+ for (int ch = 0; ch < width; ch++) {
+ results.add(pipeline.apply(channels.get(ch), ch).tag("parallel-ch"+ch));
+ }
+
+ // Add the Union
+ TStream<R> result = results.get(0).union(new HashSet<>(results)).tag("parallel.union");
+
+ // Add the isolate - keep channel threads to just their pipeline processing
+ return isolate(result, width);
+ }
+
+ /**
+ * A round-robin splitter ToIntFunction
+ * <P>
+ * The splitter function cycles among the {@code width} channels
+ * on successive calls to {@code roundRobinSplitter.applyAsInt()},
+ * returning {@code 0, 1, ..., width-1, 0, 1, ..., width-1}.
+ * </P>
+ * @see TStream#split(int, ToIntFunction) TStream.split
+ * @see PlumbingStreams#parallel(TStream, int, ToIntFunction, BiFunction) parallel
+ */
+ public static ToIntFunction<Double> roundRobinSplitter(int width) {
+ AtomicInteger cnt = new AtomicInteger();
+ return tuple -> cnt.getAndIncrement() % width;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/1a52f120/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index 856f7ea..92885de 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -35,8 +35,10 @@ import org.junit.Test;
import com.google.gson.JsonObject;
+import quarks.function.BiFunction;
import quarks.function.Function;
import quarks.function.Functions;
+import quarks.function.ToIntFunction;
import quarks.topology.TStream;
import quarks.topology.Topology;
import quarks.topology.plumbing.PlumbingStreams;
@@ -350,25 +352,22 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
Condition<Long> count = top.getTester().tupleCount(result, 3);
Condition<List<Integer>> contents = top.getTester().streamContents(result, resultTuples );
+
long begin = System.currentTimeMillis();
complete(top, count);
long end = System.currentTimeMillis();
+
assertTrue(contents.getResult().toString(), contents.valid());
long actDuration = end - begin;
-
long expMinSerialDuration = resultTuples.length * mappers.size() * 100;
- long expMaxDuration = resultTuples.length * (100 + 75/*slop and more tuple overhead*/);
+ long expMinDuration = resultTuples.length * 100;
- System.out.println("expMaxDuration="+expMaxDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+ System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
- // a gross level performance check
+ // a gross level performance check w/concurrent channels
assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
actDuration < 0.5 * expMinSerialDuration);
-
- // a tighter performance check
- assertTrue("expMaxDuration="+expMaxDuration+" actDuration="+actDuration,
- actDuration <= expMaxDuration);
}
@Test
@@ -405,25 +404,201 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
Condition<Long> count = top.getTester().tupleCount(result, 3);
Condition<List<Integer>> contents = top.getTester().streamContents(result, resultTuples );
+
long begin = System.currentTimeMillis();
complete(top, count);
long end = System.currentTimeMillis();
+
assertTrue(contents.getResult().toString(), contents.valid());
long actDuration = end - begin;
-
long expMinSerialDuration = resultTuples.length * pipelines.size() * 100;
- long expMaxDuration = resultTuples.length * (100 + 75/*slop and more tuple overhead*/);
+ long expMinDuration = resultTuples.length * 100;
+
+ System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+
+ // a gross level performance check w/concurrent channels
+ assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
+ actDuration < 0.5 * expMinSerialDuration);
+ }
+
+ private BiFunction<Integer,Integer,JsonObject> fakeParallelAnalytic(long period, TimeUnit unit) {
+ return (value,channel) -> {
+ try {
+ Thread.sleep(unit.toMillis(period)); // simulate work for this period
+ JsonObject jo = new JsonObject();
+ jo.addProperty("channel", channel);
+ jo.addProperty("result", value);
+ return jo;
+ } catch (InterruptedException e) {
+ throw new RuntimeException("channel="+channel+" interrupted", e);
+ }
+ };
+ }
+
+ private BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> fakeParallelPipeline(long period, TimeUnit unit) {
+ return (stream,channel) -> stream
+ .map(value -> fakeParallelAnalytic(period, unit).apply(value,channel))
+ .filter(t->true)
+ .tag("pipeline-ch"+channel);
+ }
+
+ private Function<JsonObject,JsonObject> fakeJsonAnalytic(int channel, long period, TimeUnit unit) {
+ return jo -> {
+ try {
+ Thread.sleep(unit.toMillis(period)); // simulate work for this period
+ return jo;
+ } catch (InterruptedException e) {
+ throw new RuntimeException("channel="+channel+" interrupted", e);
+ }
+ };
+ }
+
+ @SuppressWarnings("unused")
+ private BiFunction<TStream<JsonObject>,Integer,TStream<JsonObject>> fakeParallelPipelineTiming(long period, TimeUnit unit) {
+ return (stream,channel) -> stream
+ .map(jo -> { jo.addProperty("startPipelineMsec", System.currentTimeMillis());
+ return jo; })
+ .map(fakeJsonAnalytic(channel, period, unit))
+ .filter(t->true)
+ .map(jo -> { jo.addProperty("endPipelineMsec", System.currentTimeMillis());
+ return jo; })
+ .tag("pipeline-ch"+channel);
+ }
+
+ @Test
+ public void testParallelMap() throws Exception {
+ Topology top = newTopology("testParallelMap");
+
+ BiFunction<Integer,Integer,JsonObject> mapper =
+ fakeParallelAnalytic(100, TimeUnit.MILLISECONDS);
+
+ int width = 5;
+ ToIntFunction<Integer> splitter = tuple -> tuple % width;
+
+ Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
+ TStream<Integer> values = top.of(resultTuples);
+
+ TStream<JsonObject> result = PlumbingStreams.parallelMap(values, width, splitter, mapper).tag("result");
+ TStream<Integer> result2 = result.map(jo -> {
+ int r = jo.getAsJsonPrimitive("result").getAsInt();
+ assertEquals(splitter.applyAsInt(r), jo.getAsJsonPrimitive("channel").getAsInt());
+ return r;
+ });
+
+ Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
+ Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
+
+ long begin = System.currentTimeMillis();
+ complete(top, count);
+ long end = System.currentTimeMillis();
- System.out.println("expMaxDuration="+expMaxDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+ assertTrue(contents.getResult().toString(), contents.valid());
+
+ long actDuration = end - begin;
+ long expMinSerialDuration = resultTuples.length * 100;
+ long expMinDuration = (resultTuples.length / width) * 100;
- // a gross level performance check
+ System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+
+ // a gross level performance check w/parallel channels
assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
actDuration < 0.5 * expMinSerialDuration);
+ }
+
+ @Test
+ public void testParallel() throws Exception {
+ Topology top = newTopology("testParallel");
- // a tighter performance check
- assertTrue("expMaxDuration="+expMaxDuration+" actDuration="+actDuration,
- actDuration <= expMaxDuration);
+ BiFunction<TStream<Integer>,Integer,TStream<JsonObject>> pipeline =
+ fakeParallelPipeline(100, TimeUnit.MILLISECONDS);
+
+ int width = 5;
+ ToIntFunction<Integer> splitter = tuple -> tuple % width;
+
+ Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
+ TStream<Integer> values = top.of(resultTuples);
+
+ TStream<JsonObject> result = PlumbingStreams.parallel(values, width, splitter, pipeline).tag("result");
+ TStream<Integer> result2 = result.map(jo -> {
+ int r = jo.getAsJsonPrimitive("result").getAsInt();
+ assertEquals(splitter.applyAsInt(r), jo.getAsJsonPrimitive("channel").getAsInt());
+ return r;
+ });
+
+ Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
+ Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
+
+ long begin = System.currentTimeMillis();
+ complete(top, count);
+ long end = System.currentTimeMillis();
+
+ assertTrue(contents.getResult().toString(), contents.valid());
+
+ long actDuration = end - begin;
+ long expMinSerialDuration = resultTuples.length * 100;
+ long expMinDuration = (resultTuples.length / width) * 100;
+
+ System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+
+ // a gross level performance check w/parallel channels
+ assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
+ actDuration < 0.5 * expMinSerialDuration);
}
+
+// @Test
+// public void testParallelTiming() throws Exception {
+// Topology top = newTopology("testParallelTiming");
+//
+// BiFunction<TStream<JsonObject>,Integer,TStream<JsonObject>> pipeline =
+// fakeParallelPipelineTiming(100, TimeUnit.MILLISECONDS);
+//
+// int width = 5;
+// // ToIntFunction<Integer> splitter = tuple -> tuple % width;
+// ToIntFunction<JsonObject> splitter = jo -> jo.getAsJsonPrimitive("result").getAsInt() % width;
+//
+// Integer[] resultTuples = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
+// TStream<Integer> values = top.of(resultTuples);
+//
+// TStream<JsonObject> inStream = values.map(value -> {
+// JsonObject jo = new JsonObject();
+// jo.addProperty("result", value);
+// jo.addProperty("channel", splitter.applyAsInt(jo));
+// jo.addProperty("enterParallelMsec", System.currentTimeMillis());
+// return jo;
+// });
+// TStream<JsonObject> result = PlumbingStreams.parallel(inStream, width, splitter, pipeline).tag("result");
+// TStream<Integer> result2 = result.map(jo -> {
+// jo.addProperty("exitParallelMsec", System.currentTimeMillis());
+// System.out.println("ch="+jo.getAsJsonPrimitive("channel").getAsInt()
+// +" endPipeline-startPipeline="
+// +(jo.getAsJsonPrimitive("endPipelineMsec").getAsLong()
+// - jo.getAsJsonPrimitive("startPipelineMsec").getAsLong())
+// +" exitParallel-startPipeine="
+// +(jo.getAsJsonPrimitive("exitParallelMsec").getAsLong()
+// - jo.getAsJsonPrimitive("startPipelineMsec").getAsLong()));
+// int r = jo.getAsJsonPrimitive("result").getAsInt();
+// assertEquals(splitter.applyAsInt(jo), jo.getAsJsonPrimitive("channel").getAsInt());
+// return r;
+// });
+//
+// Condition<Long> count = top.getTester().tupleCount(result, resultTuples.length);
+// Condition<List<Integer>> contents = top.getTester().contentsUnordered(result2, resultTuples);
+// long begin = System.currentTimeMillis();
+// complete(top, count);
+// long end = System.currentTimeMillis();
+// assertTrue(contents.getResult().toString(), contents.valid());
+//
+// long actDuration = end - begin;
+//
+// long expMinSerialDuration = resultTuples.length * 100;
+// long expMinDuration = (resultTuples.length / width) * 100;
+//
+// System.out.println(top.getName()+" expMinDuration="+expMinDuration+" actDuration="+actDuration+" expMinSerialDuration="+expMinSerialDuration);
+//
+// // a gross level performance check w/parallel channels
+// assertTrue("expMinSerialDuration="+expMinSerialDuration+" actDuration="+actDuration,
+// actDuration < 0.5 * expMinSerialDuration);
+// }
}
[2/2] incubator-quarks git commit: rebase, tidy up
Posted by dl...@apache.org.
rebase, tidy up
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/6ab46289
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/6ab46289
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/6ab46289
Branch: refs/heads/master
Commit: 6ab46289f97bac6d1cc02f2f03e8f4305619ce9a
Parents: 1a52f12
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Tue May 3 12:58:28 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Tue May 3 12:58:28 2016 -0400
----------------------------------------------------------------------
.../main/java/quarks/topology/plumbing/PlumbingStreams.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/6ab46289/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index f23799c..f33651b 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -441,6 +441,8 @@ public class PlumbingStreams {
* @param mapper analytic function
* @param width number of channels
* @return the unordered result stream
+ * @see #roundRobinSplitter(int) roundRobinSplitter
+ * @see #concurrentMap(TStream, List, Function) concurrentMap
*/
public static <T,U> TStream<U> parallelMap(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<T,Integer,U> mapper) {
BiFunction<TStream<T>,Integer,TStream<U>> pipeline = (s,ch) -> s.map(t -> mapper.apply(t, ch));
@@ -461,7 +463,7 @@ public class PlumbingStreams {
* {@code pipeline} is not required to yield a result for each input
* tuple.
* </P><P>
- * A common splitter function is a {@link quarks.function.Functions#roundRobinSplitter(width) roundRobinSplitter}.
+ * A common splitter function is a {@link #roundRobinSplitter(int) roundRobinSplitter}.
* </P><P>
* The generated graph looks like this:
* <pre>{@code
@@ -483,8 +485,8 @@ public class PlumbingStreams {
* {@code pipeline.apply(inputStream,channel)}
* is called to generate the pipeline for each channel.
* @return the isolated unordered result from each parallel channel
+ * @see #roundRobinSplitter(int) roundRobinSplitter
* @see #concurrent(TStream, List, Function) concurrent
- * @see quarks.function.Functions#roundRobinSplitter(width) roundRobinSplitter
*/
public static <T,R> TStream<R> parallel(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<TStream<T>,Integer,TStream<R>> pipeline) {
Objects.requireNonNull(stream, "stream");