You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/11 04:28:38 UTC

[1/2] beam git commit: Use URNs, not Java classes, in immutability enforcements

Repository: beam
Updated Branches:
  refs/heads/master eeb043299 -> 0e89df3f6


Use URNs, not Java classes, in immutability enforcements


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/311547aa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/311547aa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/311547aa

Branch: refs/heads/master
Commit: 311547aa561bb314a8fe743b6f4677a2eaaaca50
Parents: 9f904dc
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jul 10 15:25:11 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 10 15:31:40 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       | 21 ++++++++------------
 .../direct/ExecutorServiceParallelExecutor.java | 16 ++++++---------
 2 files changed, 14 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/311547aa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 7a221c4..4621224 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -38,14 +38,11 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
@@ -72,16 +69,17 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
     IMMUTABILITY {
       @Override
       public boolean appliesTo(PCollection<?> collection, DirectGraph graph) {
-        return CONTAINS_UDF.contains(graph.getProducer(collection).getTransform().getClass());
+        return CONTAINS_UDF.contains(
+            PTransformTranslation.urnForTransform(graph.getProducer(collection).getTransform()));
       }
     };
 
     /**
      * The set of {@link PTransform PTransforms} that execute a UDF. Useful for some enforcements.
      */
-    private static final Set<Class<? extends PTransform>> CONTAINS_UDF =
+    private static final Set<String> CONTAINS_UDF =
         ImmutableSet.of(
-            Read.Bounded.class, Read.Unbounded.class, ParDo.SingleOutput.class, MultiOutput.class);
+            PTransformTranslation.READ_TRANSFORM_URN, PTransformTranslation.PAR_DO_TRANSFORM_URN);
 
     public abstract boolean appliesTo(PCollection<?> collection, DirectGraph graph);
 
@@ -110,22 +108,19 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
       return bundleFactory;
     }
 
-    @SuppressWarnings("rawtypes")
-    private static Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+    private static Map<String, Collection<ModelEnforcementFactory>>
         defaultModelEnforcements(Set<Enforcement> enabledEnforcements) {
-      ImmutableMap.Builder<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
-          enforcements = ImmutableMap.builder();
+      ImmutableMap.Builder<String, Collection<ModelEnforcementFactory>> enforcements =
+          ImmutableMap.builder();
       ImmutableList.Builder<ModelEnforcementFactory> enabledParDoEnforcements =
           ImmutableList.builder();
       if (enabledEnforcements.contains(Enforcement.IMMUTABILITY)) {
         enabledParDoEnforcements.add(ImmutabilityEnforcementFactory.create());
       }
       Collection<ModelEnforcementFactory> parDoEnforcements = enabledParDoEnforcements.build();
-      enforcements.put(ParDo.SingleOutput.class, parDoEnforcements);
-      enforcements.put(MultiOutput.class, parDoEnforcements);
+      enforcements.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, parDoEnforcements);
       return enforcements.build();
     }
-
   }
 
   ////////////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/311547aa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 2f4d1f6..75e2562 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -49,11 +49,11 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
@@ -77,9 +77,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
   private final DirectGraph graph;
   private final RootProviderRegistry rootProviderRegistry;
   private final TransformEvaluatorRegistry registry;
-  @SuppressWarnings("rawtypes")
-  private final Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
-      transformEnforcements;
+  private final Map<String, Collection<ModelEnforcementFactory>> transformEnforcements;
 
   private final EvaluationContext evaluationContext;
 
@@ -112,9 +110,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
       DirectGraph graph,
       RootProviderRegistry rootProviderRegistry,
       TransformEvaluatorRegistry registry,
-      @SuppressWarnings("rawtypes")
-          Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
-              transformEnforcements,
+      Map<String, Collection<ModelEnforcementFactory>> transformEnforcements,
       EvaluationContext context) {
     return new ExecutorServiceParallelExecutor(
         targetParallelism,
@@ -130,8 +126,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
       DirectGraph graph,
       RootProviderRegistry rootProviderRegistry,
       TransformEvaluatorRegistry registry,
-      @SuppressWarnings("rawtypes")
-      Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements,
+      Map<String, Collection<ModelEnforcementFactory>> transformEnforcements,
       EvaluationContext context) {
     this.targetParallelism = targetParallelism;
     // Don't use Daemon threads for workers. The Pipeline should continue to execute even if there
@@ -237,7 +232,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
 
     Collection<ModelEnforcementFactory> enforcements =
         MoreObjects.firstNonNull(
-            transformEnforcements.get(transform.getTransform().getClass()),
+            transformEnforcements.get(
+                PTransformTranslation.urnForTransform(transform.getTransform())),
             Collections.<ModelEnforcementFactory>emptyList());
 
     TransformExecutor<T> callable =


[2/2] beam git commit: This closes #3536: Use URNs, not Java classes, in immutability enforcements

Posted by ke...@apache.org.
This closes #3536: Use URNs, not Java classes, in immutability enforcements


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e89df3f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e89df3f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e89df3f

Branch: refs/heads/master
Commit: 0e89df3f6db93f99c3e51a2e9d255fa57f3e0aa5
Parents: eeb0432 311547a
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jul 10 21:13:37 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 10 21:13:37 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       | 21 ++++++++------------
 .../direct/ExecutorServiceParallelExecutor.java | 16 ++++++---------
 2 files changed, 14 insertions(+), 23 deletions(-)
----------------------------------------------------------------------