You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/07/20 17:31:10 UTC
[1/2] incubator-beam git commit: Make TransformEvaluatorFactory reuse
Explicit
Repository: incubator-beam
Updated Branches:
refs/heads/master c4ad11832 -> 6d7efe3df
Make TransformEvaluatorFactory reuse Explicit
Transform Evaluator Factories must be reused for the entire execution of
a Pipeline and must not be reused across pipelines.
Remove EvaluatorKey, and key explicitly by the transform application.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/34467f92
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/34467f92
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/34467f92
Branch: refs/heads/master
Commit: 34467f92d5a31b47f95b734c737fde0a8277311b
Parents: 6952471
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jul 15 10:51:24 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jul 15 10:52:32 2016 -0700
----------------------------------------------------------------------
.../direct/BoundedReadEvaluatorFactory.java | 11 ++--
.../beam/runners/direct/EvaluatorKey.java | 55 --------------------
.../direct/TransformEvaluatorFactory.java | 4 ++
.../direct/UnboundedReadEvaluatorFactory.java | 11 ++--
4 files changed, 13 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34467f92/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index e550f54..9ba8b61 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -46,7 +46,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
* Evaluators are cached here to ensure that the reader is not restarted if the evaluator is
* retriggered.
*/
- private final ConcurrentMap<EvaluatorKey, Queue<? extends BoundedReadEvaluator<?>>>
+ private final ConcurrentMap<AppliedPTransform<?, ?, ?>, Queue<? extends BoundedReadEvaluator<?>>>
sourceEvaluators = new ConcurrentHashMap<>();
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -79,12 +79,11 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
final EvaluationContext evaluationContext) {
// Key by the application and the context the evaluation is occurring in (which call to
// Pipeline#run).
- EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
Queue<BoundedReadEvaluator<OutputT>> evaluatorQueue =
- (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+ (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(transform);
if (evaluatorQueue == null) {
evaluatorQueue = new ConcurrentLinkedQueue<>();
- if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
+ if (sourceEvaluators.putIfAbsent(transform, evaluatorQueue) == null) {
// If no queue existed in the evaluators, add an evaluator to initialize the evaluator
// factory for this transform
BoundedSource<OutputT> source = transform.getTransform().getSource();
@@ -93,7 +92,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
evaluatorQueue.offer(evaluator);
} else {
// otherwise return the existing Queue that arrived before us
- evaluatorQueue = (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+ evaluatorQueue = (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(transform);
}
}
return evaluatorQueue;
@@ -132,7 +131,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
@Override
public TransformResult finishBundle() throws IOException {
try (final BoundedReader<OutputT> reader =
- source.createReader(evaluationContext.getPipelineOptions());) {
+ source.createReader(evaluationContext.getPipelineOptions())) {
boolean contentsRemaining = reader.start();
UncommittedBundle<OutputT> output =
evaluationContext.createRootBundle(transform.getOutput());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34467f92/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java
deleted file mode 100644
index 164e05a..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-
-import java.util.Objects;
-
-/**
- * A (Transform, Pipeline Execution) key for stateful evaluators.
- *
- * Source evaluators are stateful to ensure data is not read multiple times. Evaluators are cached
- * to ensure that the reader is not restarted if the evaluator is retriggered. An
- * {@link EvaluatorKey} is used to ensure that multiple Pipelines can be executed without sharing
- * the same evaluators.
- */
-final class EvaluatorKey {
- private final AppliedPTransform<?, ?, ?> transform;
- private final EvaluationContext context;
-
- public EvaluatorKey(AppliedPTransform<?, ?, ?> transform, EvaluationContext context) {
- this.transform = transform;
- this.context = context;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(transform, context);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof EvaluatorKey)) {
- return false;
- }
- EvaluatorKey that = (EvaluatorKey) other;
- return Objects.equals(this.transform, that.transform)
- && Objects.equals(this.context, that.context);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34467f92/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
index 1973a2f..7fac1e3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.direct;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
@@ -28,6 +29,9 @@ import javax.annotation.Nullable;
/**
* A factory for creating instances of {@link TransformEvaluator} for the application of a
* {@link PTransform}.
+ *
+ * <p>{@link TransformEvaluatorFactory TransformEvaluatorFactories} will be reused within a single
+ * execution of a {@link Pipeline} but will not be reused across executions.
*/
public interface TransformEvaluatorFactory {
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34467f92/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index b226a2a..674be5e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -62,7 +62,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
* an arbitrary Queue implementation does not, so the concrete type is used explicitly.
*/
private final ConcurrentMap<
- EvaluatorKey, ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?, ?>>>
+ AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?, ?>>>
sourceEvaluators = new ConcurrentHashMap<>();
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -91,16 +91,13 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
Queue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> getTransformEvaluatorQueue(
final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
final EvaluationContext evaluationContext) {
- // Key by the application and the context the evaluation is occurring in (which call to
- // Pipeline#run).
- EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
@SuppressWarnings("unchecked")
ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue =
(ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>)
- sourceEvaluators.get(key);
+ sourceEvaluators.get(transform);
if (evaluatorQueue == null) {
evaluatorQueue = new ConcurrentLinkedQueue<>();
- if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
+ if (sourceEvaluators.putIfAbsent(transform, evaluatorQueue) == null) {
// If no queue existed in the evaluators, add an evaluator to initialize the evaluator
// factory for this transform
UnboundedSource<OutputT, CheckpointMarkT> source =
@@ -119,7 +116,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
// otherwise return the existing Queue that arrived before us
evaluatorQueue =
(ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>)
- sourceEvaluators.get(key);
+ sourceEvaluators.get(transform);
}
}
return evaluatorQueue;
[2/2] incubator-beam git commit: This closes #666
Posted by ke...@apache.org.
This closes #666
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6d7efe3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6d7efe3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6d7efe3d
Branch: refs/heads/master
Commit: 6d7efe3df8cde590722d94441eca3922a3a67734
Parents: c4ad118 34467f9
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jul 20 10:30:55 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Jul 20 10:30:55 2016 -0700
----------------------------------------------------------------------
.../direct/BoundedReadEvaluatorFactory.java | 11 ++--
.../beam/runners/direct/EvaluatorKey.java | 55 --------------------
.../direct/TransformEvaluatorFactory.java | 4 ++
.../direct/UnboundedReadEvaluatorFactory.java | 11 ++--
4 files changed, 13 insertions(+), 68 deletions(-)
----------------------------------------------------------------------