You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/28 16:41:46 UTC

[1/2] incubator-beam git commit: Remove unneccessary method in ReadEvaluatorFactories

Repository: incubator-beam
Updated Branches:
  refs/heads/master a46081eb5 -> 9c447510a


Remove unneccessary method in ReadEvaluatorFactories

The getTransformEvaluator() method's only call is poll on the result of
getTransformEvaluatorQueue (which is only ever called by
getTransformEvaluator). Instead, move the construction to
getTransformEvaluator and call poll on the result.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5637b654
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5637b654
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5637b654

Branch: refs/heads/master
Commit: 5637b654ea71233dc5a34832d4c44b5f003f36e9
Parents: a46081e
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jul 22 15:34:03 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Jul 28 11:40:10 2016 -0400

----------------------------------------------------------------------
 .../direct/BoundedReadEvaluatorFactory.java     | 15 ++++----------
 .../direct/UnboundedReadEvaluatorFactory.java   | 21 ++++++--------------
 2 files changed, 10 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5637b654/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 9ba8b61..2f4f86c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -60,21 +60,14 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     return getTransformEvaluator((AppliedPTransform) application, evaluationContext);
   }
 
-  private <OutputT> TransformEvaluator<?> getTransformEvaluator(
-      final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
-      final EvaluationContext evaluationContext) {
-    return getTransformEvaluatorQueue(transform, evaluationContext).poll();
-  }
-
   /**
-   * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements for the
-   * provided application of {@link Bounded Read.Bounded}, initializing it if required.
+   * Get a {@link TransformEvaluator} that produces elements for the provided application of
+   * {@link Bounded Read.Bounded}, initializing the queue of evaluators if required.
    *
    * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has
    * already done so.
    */
-  @SuppressWarnings("unchecked")
-  private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
+  private <OutputT> TransformEvaluator<?> getTransformEvaluator(
       final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
       final EvaluationContext evaluationContext) {
     // Key by the application and the context the evaluation is occurring in (which call to
@@ -95,7 +88,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
         evaluatorQueue = (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(transform);
       }
     }
-    return evaluatorQueue;
+    return evaluatorQueue.poll();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5637b654/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 674be5e..0e2745b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -35,7 +35,6 @@ import com.google.common.annotations.VisibleForTesting;
 import org.joda.time.Instant;
 
 import java.io.IOException;
-import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
@@ -73,25 +72,17 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     return getTransformEvaluator((AppliedPTransform) application, evaluationContext);
   }
 
-  private <OutputT> TransformEvaluator<?> getTransformEvaluator(
-      final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
-      final EvaluationContext evaluationContext) {
-    return getTransformEvaluatorQueue(transform, evaluationContext).poll();
-  }
-
   /**
-   * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements for the
-   * provided application of {@link Unbounded Read.Unbounded}, initializing it if required.
+   * Get a {@link TransformEvaluator} that produces elements for the provided application of
+   * {@link Unbounded Read.Unbounded}, initializing the queue of evaluators if required.
    *
    * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has
    * already done so.
    */
-  @SuppressWarnings("unchecked")
   private <OutputT, CheckpointMarkT extends CheckpointMark>
-  Queue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> getTransformEvaluatorQueue(
-      final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
-      final EvaluationContext evaluationContext) {
-    @SuppressWarnings("unchecked")
+      TransformEvaluator<?> getTransformEvaluator(
+          final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform,
+          final EvaluationContext evaluationContext) {
     ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue =
         (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>)
             sourceEvaluators.get(transform);
@@ -119,7 +110,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
                 sourceEvaluators.get(transform);
       }
     }
-    return evaluatorQueue;
+    return evaluatorQueue.poll();
   }
 
   /**


[2/2] incubator-beam git commit: Remove unneccessary method in ReadEvaluatorFactories

Posted by lc...@apache.org.
Remove unneccessary method in ReadEvaluatorFactories

This closes #747


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9c447510
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9c447510
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9c447510

Branch: refs/heads/master
Commit: 9c447510a68abbcdb342073eb7bd1fc179e0bcb7
Parents: a46081e 5637b65
Author: Luke Cwik <lc...@google.com>
Authored: Thu Jul 28 11:40:40 2016 -0400
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Jul 28 11:40:40 2016 -0400

----------------------------------------------------------------------
 .../direct/BoundedReadEvaluatorFactory.java     | 15 ++++----------
 .../direct/UnboundedReadEvaluatorFactory.java   | 21 ++++++--------------
 2 files changed, 10 insertions(+), 26 deletions(-)
----------------------------------------------------------------------