You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/09 23:18:54 UTC
[2/2] incubator-beam git commit: Explicitly use a
ConcurrentLinkedQueue in UnboundedReadEvaluatorFactory
Explicitly use a ConcurrentLinkedQueue in UnboundedReadEvaluatorFactory
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0e261bb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0e261bb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0e261bb0
Branch: refs/heads/master
Commit: 0e261bb0d2b54cb30843f901ff80ac59d225e079
Parents: 272493e
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 6 14:48:20 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon May 9 16:18:19 2016 -0700
----------------------------------------------------------------------
.../direct/UnboundedReadEvaluatorFactory.java | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e261bb0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 7a95c9f..14fb8e2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -46,8 +46,14 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
* An evaluator for a Source is stateful, to ensure the CheckpointMark is properly persisted.
* Evaluators are cached here to ensure that the checkpoint mark is appropriately reused
* and any splits are honored.
+ *
+ * <p>The Queue storing available evaluators must enforce a happens-before relationship for
+ * elements being added to the queue to accesses after it, to ensure that updates performed to the
+ * state of an evaluator are properly visible. ConcurrentLinkedQueue provides this relation, but
+ * an arbitrary Queue implementation does not, so the concrete type is used explicitly.
*/
- private final ConcurrentMap<EvaluatorKey, Queue<? extends UnboundedReadEvaluator<?>>>
+ private final ConcurrentMap<
+ EvaluatorKey, ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?>>>
sourceEvaluators = new ConcurrentHashMap<>();
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -83,8 +89,8 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
// Pipeline#run).
EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
@SuppressWarnings("unchecked")
- Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue =
- (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+ ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>> evaluatorQueue =
+ (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
if (evaluatorQueue == null) {
evaluatorQueue = new ConcurrentLinkedQueue<>();
if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
@@ -97,7 +103,8 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
evaluatorQueue.offer(evaluator);
} else {
// otherwise return the existing Queue that arrived before us
- evaluatorQueue = (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+ evaluatorQueue =
+ (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
}
}
return evaluatorQueue;
@@ -117,7 +124,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
private static final int ARBITRARY_MAX_ELEMENTS = 10;
private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform;
private final InProcessEvaluationContext evaluationContext;
- private final Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue;
+ private final ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>> evaluatorQueue;
/**
* The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same
* source as derived from {@link #transform} due to splitting.
@@ -129,7 +136,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
InProcessEvaluationContext evaluationContext,
UnboundedSource<OutputT, ?> source,
- Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) {
+ ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) {
this.transform = transform;
this.evaluationContext = evaluationContext;
this.evaluatorQueue = evaluatorQueue;