You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/09 23:18:54 UTC

[2/2] incubator-beam git commit: Explicitly use a ConcurrentLinkedQueue in UnboundedReadEvaluatorFactory

Explicitly use a ConcurrentLinkedQueue in UnboundedReadEvaluatorFactory


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0e261bb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0e261bb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0e261bb0

Branch: refs/heads/master
Commit: 0e261bb0d2b54cb30843f901ff80ac59d225e079
Parents: 272493e
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 6 14:48:20 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon May 9 16:18:19 2016 -0700

----------------------------------------------------------------------
 .../direct/UnboundedReadEvaluatorFactory.java    | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e261bb0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 7a95c9f..14fb8e2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -46,8 +46,14 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
    * An evaluator for a Source is stateful, to ensure the CheckpointMark is properly persisted.
    * Evaluators are cached here to ensure that the checkpoint mark is appropriately reused
    * and any splits are honored.
+   *
+   * <p>The Queue storing available evaluators must enforce a happens-before relationship for
+   * elements being added to the queue to accesses after it, to ensure that updates performed to the
+   * state of an evaluator are properly visible. ConcurrentLinkedQueue provides this relation, but
+   * an arbitrary Queue implementation does not, so the concrete type is used explicitly.
    */
-  private final ConcurrentMap<EvaluatorKey, Queue<? extends UnboundedReadEvaluator<?>>>
+  private final ConcurrentMap<
+      EvaluatorKey, ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?>>>
       sourceEvaluators = new ConcurrentHashMap<>();
 
   @SuppressWarnings({"unchecked", "rawtypes"})
@@ -83,8 +89,8 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     // Pipeline#run).
     EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
     @SuppressWarnings("unchecked")
-    Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue =
-        (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+    ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>> evaluatorQueue =
+        (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
     if (evaluatorQueue == null) {
       evaluatorQueue = new ConcurrentLinkedQueue<>();
       if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
@@ -97,7 +103,8 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
         evaluatorQueue.offer(evaluator);
       } else {
         // otherwise return the existing Queue that arrived before us
-        evaluatorQueue = (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+        evaluatorQueue =
+            (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
       }
     }
     return evaluatorQueue;
@@ -117,7 +124,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     private static final int ARBITRARY_MAX_ELEMENTS = 10;
     private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform;
     private final InProcessEvaluationContext evaluationContext;
-    private final Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue;
+    private final ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>> evaluatorQueue;
     /**
      * The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same
      * source as derived from {@link #transform} due to splitting.
@@ -129,7 +136,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
         AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
         InProcessEvaluationContext evaluationContext,
         UnboundedSource<OutputT, ?> source,
-        Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) {
+        ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) {
       this.transform = transform;
       this.evaluationContext = evaluationContext;
       this.evaluatorQueue = evaluatorQueue;