You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/11 04:28:38 UTC
[1/2] beam git commit: Use URNs, not Java classes,
in immutability enforcements
Repository: beam
Updated Branches:
refs/heads/master eeb043299 -> 0e89df3f6
Use URNs, not Java classes, in immutability enforcements
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/311547aa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/311547aa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/311547aa
Branch: refs/heads/master
Commit: 311547aa561bb314a8fe743b6f4677a2eaaaca50
Parents: 9f904dc
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jul 10 15:25:11 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 10 15:31:40 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunner.java | 21 ++++++++------------
.../direct/ExecutorServiceParallelExecutor.java | 16 ++++++---------
2 files changed, 14 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/311547aa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 7a221c4..4621224 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -38,14 +38,11 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
@@ -72,16 +69,17 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
IMMUTABILITY {
@Override
public boolean appliesTo(PCollection<?> collection, DirectGraph graph) {
- return CONTAINS_UDF.contains(graph.getProducer(collection).getTransform().getClass());
+ return CONTAINS_UDF.contains(
+ PTransformTranslation.urnForTransform(graph.getProducer(collection).getTransform()));
}
};
/**
* The set of {@link PTransform PTransforms} that execute a UDF. Useful for some enforcements.
*/
- private static final Set<Class<? extends PTransform>> CONTAINS_UDF =
+ private static final Set<String> CONTAINS_UDF =
ImmutableSet.of(
- Read.Bounded.class, Read.Unbounded.class, ParDo.SingleOutput.class, MultiOutput.class);
+ PTransformTranslation.READ_TRANSFORM_URN, PTransformTranslation.PAR_DO_TRANSFORM_URN);
public abstract boolean appliesTo(PCollection<?> collection, DirectGraph graph);
@@ -110,22 +108,19 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
return bundleFactory;
}
- @SuppressWarnings("rawtypes")
- private static Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+ private static Map<String, Collection<ModelEnforcementFactory>>
defaultModelEnforcements(Set<Enforcement> enabledEnforcements) {
- ImmutableMap.Builder<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
- enforcements = ImmutableMap.builder();
+ ImmutableMap.Builder<String, Collection<ModelEnforcementFactory>> enforcements =
+ ImmutableMap.builder();
ImmutableList.Builder<ModelEnforcementFactory> enabledParDoEnforcements =
ImmutableList.builder();
if (enabledEnforcements.contains(Enforcement.IMMUTABILITY)) {
enabledParDoEnforcements.add(ImmutabilityEnforcementFactory.create());
}
Collection<ModelEnforcementFactory> parDoEnforcements = enabledParDoEnforcements.build();
- enforcements.put(ParDo.SingleOutput.class, parDoEnforcements);
- enforcements.put(MultiOutput.class, parDoEnforcements);
+ enforcements.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, parDoEnforcements);
return enforcements.build();
}
-
}
////////////////////////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/beam/blob/311547aa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 2f4d1f6..75e2562 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -49,11 +49,11 @@ import javax.annotation.Nullable;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
@@ -77,9 +77,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
private final DirectGraph graph;
private final RootProviderRegistry rootProviderRegistry;
private final TransformEvaluatorRegistry registry;
- @SuppressWarnings("rawtypes")
- private final Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
- transformEnforcements;
+ private final Map<String, Collection<ModelEnforcementFactory>> transformEnforcements;
private final EvaluationContext evaluationContext;
@@ -112,9 +110,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
DirectGraph graph,
RootProviderRegistry rootProviderRegistry,
TransformEvaluatorRegistry registry,
- @SuppressWarnings("rawtypes")
- Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
- transformEnforcements,
+ Map<String, Collection<ModelEnforcementFactory>> transformEnforcements,
EvaluationContext context) {
return new ExecutorServiceParallelExecutor(
targetParallelism,
@@ -130,8 +126,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
DirectGraph graph,
RootProviderRegistry rootProviderRegistry,
TransformEvaluatorRegistry registry,
- @SuppressWarnings("rawtypes")
- Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements,
+ Map<String, Collection<ModelEnforcementFactory>> transformEnforcements,
EvaluationContext context) {
this.targetParallelism = targetParallelism;
// Don't use Daemon threads for workers. The Pipeline should continue to execute even if there
@@ -237,7 +232,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
Collection<ModelEnforcementFactory> enforcements =
MoreObjects.firstNonNull(
- transformEnforcements.get(transform.getTransform().getClass()),
+ transformEnforcements.get(
+ PTransformTranslation.urnForTransform(transform.getTransform())),
Collections.<ModelEnforcementFactory>emptyList());
TransformExecutor<T> callable =
[2/2] beam git commit: This closes #3536: Use URNs, not Java classes,
in immutability enforcements
Posted by ke...@apache.org.
This closes #3536: Use URNs, not Java classes, in immutability enforcements
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e89df3f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e89df3f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e89df3f
Branch: refs/heads/master
Commit: 0e89df3f6db93f99c3e51a2e9d255fa57f3e0aa5
Parents: eeb0432 311547a
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jul 10 21:13:37 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 10 21:13:37 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunner.java | 21 ++++++++------------
.../direct/ExecutorServiceParallelExecutor.java | 16 ++++++---------
2 files changed, 14 insertions(+), 23 deletions(-)
----------------------------------------------------------------------