You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/05/03 17:27:52 UTC
[5/9] incubator-quarks git commit: tidy up
tidy up
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/cde26788
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/cde26788
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/cde26788
Branch: refs/heads/master
Commit: cde26788a61a707bc9eea845cf909d75c5c9905f
Parents: 734f146
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Fri Apr 29 15:47:35 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Fri Apr 29 15:47:35 2016 -0400
----------------------------------------------------------------------
.../topology/plumbing/PlumbingStreams.java | 72 ++++----------------
.../java/quarks/test/topology/PlumbingTest.java | 30 ++++----
2 files changed, 29 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/cde26788/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
index 554fd39..ac75103 100644
--- a/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
+++ b/api/topology/src/main/java/quarks/topology/plumbing/PlumbingStreams.java
@@ -25,12 +25,10 @@ import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import quarks.function.Function;
-import quarks.function.UnaryOperator;
import quarks.oplet.plumbing.Barrier;
import quarks.oplet.plumbing.Isolate;
import quarks.oplet.plumbing.PressureReliever;
@@ -445,32 +443,18 @@ public class PlumbingStreams {
Objects.requireNonNull(pipelines, "pipelines");
Objects.requireNonNull(combiner, "combiner");
- // INITIAL IMPL TO GET STARTED - validate interface and test
- // explore an impl with no new oplets
- //
- // A few impl options exist. Some with feedback loop to control stepping,
- // some without. Feedback is OK for single JVM case, less so for
- // multi-JVM/distributed case.
- //
- // Some impls with special oplets some that avoid them.
- //
// Summary of what's below:
- // feedback loop and no new oplets:
- // |-> isolate -> p1 -> map.toPair |
- // stream -> map.gate =>|-> isolate -> p2 -> map.toPair |-> union -> map.Collector -> combiner
- // |-> isolate -> p3 -> map.toPair |
- // . . .
- //
-
- // Add a gate. This keeps all pipelines working lock-step.
- // It also provides the guarantee needed by gatedBarrier below.
- Semaphore gateSemaphore = new Semaphore(1);
- stream = gate(stream, gateSemaphore).tag("concurrent.gate");
+ // |-> isolate(1) -> p1 -> |
+ // stream -> |-> isolate(1) -> p2 -> |-> barrier(10) -> combiner
+ // |-> isolate(1) -> p3 -> |
+ // . . .
+
+ int barrierQueueCapacity = 10; // don't preclude pipelines from getting ahead some.
- // Add parallel fanout - with the gate the queue size really doesn't matter
- List<TStream<T>> fanouts = parallelFanout(stream, pipelines.size(), 1);
- for (int i = 0; i < fanouts.size(); i++)
- fanouts.get(i).tag("concurrent.isolated-ch"+i);
+ // Add concurrent (isolated) fanouts
+ List<TStream<T>> fanouts = new ArrayList<>(pipelines.size());
+ for (int i = 0; i < pipelines.size(); i++)
+ fanouts.add(isolate(stream, 1).tag("concurrent.isolated-ch"+i));
// Add pipelines
List<TStream<U>> results = new ArrayList<>(pipelines.size());
@@ -481,43 +465,11 @@ public class PlumbingStreams {
}
// Add the barrier
- // TStream<List<U>> barrier = gatedBarrier(results).tag("concurrent.barrier");
- TStream<List<U>> barrier = barrier(results).tag("concurrent.barrier");
-
- // barrier = barrier.peek(tuple -> System.out.println("concurrent.barrier List<U> "+tuple));
-
- // Add peek() to signal ok to begin next tuple
- barrier = barrier.peek(tuple -> { gateSemaphore.release(); }).tag("concurrent.gateRelease");
+ TStream<List<U>> barrier = barrier(results, barrierQueueCapacity).tag("concurrent.barrier");
- // Add the combiner to the topology
+ // Add the combiner
return combiner.apply(barrier);
}
-
- private static <T> TStream<T> gate(TStream<T> stream, Semaphore semaphore) {
- return stream.map(tuple -> {
- try {
- semaphore.acquire();
- return tuple;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("interrupted", e);
- }});
- }
-
- private static <T> List<TStream<T>> parallelFanout(TStream<T> stream, int numFanouts, int queueCapacity) {
- // TODO want an ExecutorService to enable nThreads for mPipelines.
- // i.e., not use "isolate", or need a way to create a collection of
- // related "isolate" that use an executor
- return parallelFanout(stream, numFanouts, inStream -> PlumbingStreams.isolate(inStream, queueCapacity));
- }
-
- private static <T> List<TStream<T>> parallelFanout(TStream<T> stream, int numFanouts, UnaryOperator<TStream<T>> isolator) {
- List<TStream<T>> fanouts = new ArrayList<>(numFanouts);
- for (int i = 0; i < numFanouts; i++) {
- fanouts.add(isolator.apply(stream));
- }
- return fanouts;
- }
/**
* A tuple synchronization barrier.
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/cde26788/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
----------------------------------------------------------------------
diff --git a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
index f147685..677467a 100644
--- a/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
+++ b/api/topology/src/test/java/quarks/test/topology/PlumbingTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -317,13 +316,18 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
public void testConcurrentMap() throws Exception {
Topology top = newTopology("testConcurrentMap");
- Function<Integer,JsonObject> a1 = fakeAnalytic(0, 100, TimeUnit.MILLISECONDS);
- Function<Integer,JsonObject> a2 = fakeAnalytic(1, 100, TimeUnit.MILLISECONDS);
- Function<Integer,JsonObject> a3 = fakeAnalytic(2, 100, TimeUnit.MILLISECONDS);
- Function<Integer,JsonObject> a4 = fakeAnalytic(3, 100, TimeUnit.MILLISECONDS);
- Function<Integer,JsonObject> a5 = fakeAnalytic(4, 100, TimeUnit.MILLISECONDS);
+ int ch = 0;
+ List<Function<Integer,JsonObject>> mappers = new ArrayList<>();
+ mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+ mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+ mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+ mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+ mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+ mappers.add(fakeAnalytic(ch++, 100, TimeUnit.MILLISECONDS));
+ // a couple much faster just in case something's amiss with queues
+ mappers.add(fakeAnalytic(ch++, 3, TimeUnit.MILLISECONDS));
+ mappers.add(fakeAnalytic(ch++, 13, TimeUnit.MILLISECONDS));
- List<Function<Integer,JsonObject>> mappers = new ArrayList<>(Arrays.asList(a1, a2, a3, a4, a5));
Function<List<JsonObject>,Integer> combiner = list -> {
int sum = 0;
int cnt = 0;
@@ -372,13 +376,13 @@ public abstract class PlumbingTest extends TopologyAbstractTest {
Topology top = newTopology("testConcurrent");
int ch = 0;
- Function<TStream<Integer>,TStream<JsonObject>> p1 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
- Function<TStream<Integer>,TStream<JsonObject>> p2 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
- Function<TStream<Integer>,TStream<JsonObject>> p3 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
- Function<TStream<Integer>,TStream<JsonObject>> p4 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
- Function<TStream<Integer>,TStream<JsonObject>> p5 = fakePipeline(ch++, 100, TimeUnit.MILLISECONDS);
+ List<Function<TStream<Integer>,TStream<JsonObject>>> pipelines = new ArrayList<>();
+ pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+ pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+ pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+ pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
+ pipelines.add(fakePipeline(ch++, 100, TimeUnit.MILLISECONDS));
- List<Function<TStream<Integer>,TStream<JsonObject>>> pipelines = new ArrayList<>(Arrays.asList(p1, p2, p3, p4, p5));
Function<List<JsonObject>,Integer> tupleCombiner = list -> {
int sum = 0;
int cnt = 0;