You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2019/01/29 22:04:39 UTC
[beam] branch master updated: Revert "Merge pull request #7316
[BEAM-6269] Cross-SDK transform expansion protocol."
This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a65f1c4 Revert "Merge pull request #7316 [BEAM-6269] Cross-SDK transform expansion protocol."
new d9a1bac Merge pull request #7663: [BEAM-6539] Revert "Merge pull request #7316 [BEAM-6269] Cross-SDK transform expansion protocol"
a65f1c4 is described below
commit a65f1c43b7fbedfec3e7396d9e024bf0e762c33d
Author: Daniel Oliveira <da...@gmail.com>
AuthorDate: Tue Jan 29 11:01:02 2019 -0800
Revert "Merge pull request #7316 [BEAM-6269] Cross-SDK transform expansion protocol."
This reverts commit 26bd104f122986429ba5a8abd583b04a44a8edee, reversing
changes made to dcda09a8725d30467c4eb549985e3fe979208759.
---
model/job-management/build.gradle | 5 +-
.../src/main/proto/beam_expansion_api.proto | 68 -------
.../pipeline/src/main/proto/beam_runner_api.proto | 3 -
runners/core-construction-java/build.gradle | 18 --
.../core/construction/ExpansionService.java | 205 --------------------
.../core/construction/ModelCoderRegistrar.java | 3 -
.../runners/core/construction/ModelCoders.java | 2 -
.../core/construction/RehydratedComponents.java | 11 --
.../runners/core/construction/SdkComponents.java | 57 +-----
.../core/construction/CoderTranslationTest.java | 1 -
.../core/construction/ExpansionServiceTest.java | 103 ----------
.../core/construction/TestExpansionService.java | 52 ------
.../fnexecution/control/RemoteExecutionTest.java | 32 +++-
.../beam/sdk/runners/TransformHierarchy.java | 28 +--
sdks/python/apache_beam/coders/coders.py | 36 ----
sdks/python/apache_beam/pipeline.py | 30 ++-
sdks/python/apache_beam/pvalue.py | 12 +-
.../python/apache_beam/runners/pipeline_context.py | 34 +---
.../apache_beam/runners/pipeline_context_test.py | 8 -
.../runners/portability/expansion_service.py | 118 ------------
sdks/python/apache_beam/transforms/__init__.py | 1 -
sdks/python/apache_beam/transforms/external.py | 208 ---------------------
.../python/apache_beam/transforms/external_test.py | 205 --------------------
sdks/python/apache_beam/transforms/ptransform.py | 46 +----
sdks/python/build.gradle | 12 --
25 files changed, 65 insertions(+), 1233 deletions(-)
diff --git a/model/job-management/build.gradle b/model/job-management/build.gradle
index 4f81152..4c50782 100644
--- a/model/job-management/build.gradle
+++ b/model/job-management/build.gradle
@@ -17,10 +17,7 @@
*/
apply plugin: org.apache.beam.gradle.BeamModulePlugin
-applyPortabilityNature(shadowJarValidationExcludes:[
- "org/apache/beam/model/expansion/v1/**",
- "org/apache/beam/model/jobmanagement/v1/**",
-])
+applyPortabilityNature(shadowJarValidationExcludes: ["org/apache/beam/model/jobmanagement/v1/**"])
description = "Apache Beam :: Model :: Job Management"
ext.summary = "Portable definitions for submitting pipelines."
diff --git a/model/job-management/src/main/proto/beam_expansion_api.proto b/model/job-management/src/main/proto/beam_expansion_api.proto
deleted file mode 100644
index 92b0dd2..0000000
--- a/model/job-management/src/main/proto/beam_expansion_api.proto
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Protocol Buffers describing the Expansion API, an api for expanding
- * transforms in a remote SDK.
- */
-
-syntax = "proto3";
-
-package org.apache.beam.model.expansion.v1;
-
-option go_package = "construction_v1";
-option java_package = "org.apache.beam.model.expansion.v1";
-option java_outer_classname = "ExpansionApi";
-
-import "beam_runner_api.proto";
-
-message ExpansionRequest {
- // Set of components needed to interpret the transform, or which
- // may be useful for its expansion. This includes the input
- // PCollections (if any) to the to-be-expanded transform, along
- // with their coders and windowing strategies.
- org.apache.beam.model.pipeline.v1.Components components = 1;
-
- // The actual PTransform to be expaneded according to its spec.
- // Its input should be set, but its subtransforms and outputs
- // should not be.
- org.apache.beam.model.pipeline.v1.PTransform transform = 2;
-
- // A namespace (prefix) to use for the id of any newly created
- // components.
- string namespace = 3;
-}
-
-message ExpansionResponse {
- // Set of components needed to execute the expanded transform,
- // including the (original) inputs, outputs, and subtransforms.
- org.apache.beam.model.pipeline.v1.Components components = 1;
-
- // The expanded transform itself, with references to its outputs
- // and subtransforms.
- org.apache.beam.model.pipeline.v1.PTransform transform = 2;
-
- // (Optional) An string representation of any error encountered while
- // attempting to expand this transform.
- string error = 10;
-}
-
-// Job Service for constructing pipelines
-service ExpansionService {
- rpc Expand (ExpansionRequest) returns (ExpansionResponse);
-}
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index e081f07..42a1970 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -544,9 +544,6 @@ message StandardCoders {
// Components: None
BYTES = 0 [(beam_urn) = "beam:coder:bytes:v1"];
- // Components: None
- STRING_UTF8 = 10 [(beam_urn) = "beam:coder:string_utf8:v1"];
-
// Components: The key and value coder, in that order.
KV = 1 [(beam_urn) = "beam:coder:kv:v1"];
diff --git a/runners/core-construction-java/build.gradle b/runners/core-construction-java/build.gradle
index 7ac38e0..365373c 100644
--- a/runners/core-construction-java/build.gradle
+++ b/runners/core-construction-java/build.gradle
@@ -50,21 +50,3 @@ dependencies {
shadowTest library.java.jackson_dataformat_yaml
shadowTest project(path: ":beam-model-fn-execution", configuration: "shadow")
}
-
-task runExpansionService (type: JavaExec) {
- main = "org.apache.beam.runners.core.construction.ExpansionService"
- classpath = sourceSets.main.runtimeClasspath
- args = [project.findProperty("constructionService.port") ?: "8097"]
-}
-
-task testExpansionService(type: Jar) {
- dependsOn = [shadowJar, shadowTestJar]
- manifest {
- attributes(
- 'Main-Class': 'org.apache.beam.runners.core.construction.TestExpansionService'
- )
- }
- from { configurations.testRuntime.collect { it.isDirectory() ? it : zipTree(it) }}
- from sourceSets.main.output
- from sourceSets.test.output
-}
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExpansionService.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExpansionService.java
deleted file mode 100644
index a76bc48..0000000
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExpansionService.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.construction;
-
-import com.google.auto.service.AutoService;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.ServiceLoader;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.beam.model.expansion.v1.ExpansionApi;
-import org.apache.beam.model.expansion.v1.ExpansionServiceGrpc;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Server;
-import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ServerBuilder;
-import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** A service that allows pipeline expand transforms from a remote SDK. */
-public class ExpansionService extends ExpansionServiceGrpc.ExpansionServiceImplBase {
-
- private static final Logger LOG = LoggerFactory.getLogger(ExpansionService.class);
-
- /**
- * A registrar that creates {@link TransformProvider} instances from {@link
- * RunnerApi.FunctionSpec}s.
- *
- * <p>Transform authors have the ability to provide a registrar by creating a {@link
- * ServiceLoader} entry and a concrete implementation of this interface.
- *
- * <p>It is optional but recommended to use one of the many build time tools such as {@link
- * AutoService} to generate the necessary META-INF files automatically.
- */
- public interface ExpansionServiceRegistrar {
- Map<String, TransformProvider> knownTransforms();
- }
-
- /**
- * Provides a mapping of {@link RunnerApi.FunctionSpec} to a {@link PTransform}, together with
- * mappings of its inputs and outputs to maps of PCollections.
- *
- * @param <InputT> input {@link PValue} type of the transform
- * @param <OutputT> output {@link PValue} type of the transform
- */
- public interface TransformProvider<InputT extends PValue, OutputT extends PValue> {
-
- default InputT createInput(Pipeline p, Map<String, PCollection<?>> inputs) {
- if (inputs.size() == 0) {
- return (InputT) p.begin();
- }
- if (inputs.size() == 1) {
- return (InputT) Iterables.getOnlyElement(inputs.values());
- } else {
- PCollectionTuple inputTuple = PCollectionTuple.empty(p);
- for (Map.Entry<String, PCollection<?>> entry : inputs.entrySet()) {
- inputTuple = inputTuple.and(new TupleTag(entry.getKey()), entry.getValue());
- }
- return (InputT) inputTuple;
- }
- }
-
- PTransform<InputT, OutputT> getTransform(RunnerApi.FunctionSpec spec);
-
- default Map<String, PCollection<?>> extractOutputs(OutputT output) {
- if (output instanceof PDone) {
- return Collections.emptyMap();
- } else if (output instanceof PCollection) {
- return ImmutableMap.of("output", (PCollection<?>) output);
- } else if (output instanceof PCollectionTuple) {
- return ((PCollectionTuple) output)
- .getAll().entrySet().stream()
- .collect(Collectors.toMap(entry -> entry.getKey().toString(), Map.Entry::getValue));
- } else if (output instanceof PCollectionList<?>) {
- PCollectionList<?> listOutput = (PCollectionList<?>) output;
- return IntStream.range(0, listOutput.size())
- .boxed()
- .collect(Collectors.toMap(index -> "output_" + index, listOutput::get));
- } else {
- throw new UnsupportedOperationException("Unknown output type: " + output.getClass());
- }
- }
-
- default Map<String, PCollection<?>> apply(
- Pipeline p, String name, RunnerApi.FunctionSpec spec, Map<String, PCollection<?>> inputs) {
- return extractOutputs(
- Pipeline.applyTransform(name, createInput(p, inputs), getTransform(spec)));
- }
- }
-
- private Map<String, TransformProvider> registeredTransforms = loadRegisteredTransforms();
-
- private Map<String, TransformProvider> loadRegisteredTransforms() {
- ImmutableMap.Builder<String, TransformProvider> registeredTransforms = ImmutableMap.builder();
- for (ExpansionServiceRegistrar registrar :
- ServiceLoader.load(ExpansionServiceRegistrar.class)) {
- registeredTransforms.putAll(registrar.knownTransforms());
- }
- return registeredTransforms.build();
- }
-
- @VisibleForTesting
- /*package*/ ExpansionApi.ExpansionResponse expand(ExpansionApi.ExpansionRequest request) {
- LOG.info(
- "Expanding '{}' with URN '{}'",
- request.getTransform().getUniqueName(),
- request.getTransform().getSpec().getUrn());
- LOG.debug("Full transform: {}", request.getTransform());
- Set<String> existingTransformIds = request.getComponents().getTransformsMap().keySet();
- Pipeline pipeline = Pipeline.create();
- RehydratedComponents rehydratedComponents =
- RehydratedComponents.forComponents(request.getComponents()).withPipeline(pipeline);
-
- Map<String, PCollection<?>> inputs =
- request.getTransform().getInputsMap().entrySet().stream()
- .collect(
- Collectors.toMap(
- Map.Entry::getKey,
- input -> {
- try {
- return rehydratedComponents.getPCollection(input.getValue());
- } catch (IOException exn) {
- throw new RuntimeException(exn);
- }
- }));
- if (!registeredTransforms.containsKey(request.getTransform().getSpec().getUrn())) {
- throw new UnsupportedOperationException(
- "Unknown urn: " + request.getTransform().getSpec().getUrn());
- }
- registeredTransforms
- .get(request.getTransform().getSpec().getUrn())
- .apply(
- pipeline,
- request.getTransform().getUniqueName(),
- request.getTransform().getSpec(),
- inputs);
-
- // Needed to find which transform was new...
- SdkComponents sdkComponents =
- rehydratedComponents.getSdkComponents().withNewIdPrefix(request.getNamespace());
- sdkComponents.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT);
- RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents);
- String expandedTransformId =
- Iterables.getOnlyElement(
- pipelineProto.getRootTransformIdsList().stream()
- .filter(id -> !existingTransformIds.contains(id))
- .collect(Collectors.toList()));
- RunnerApi.Components components = pipelineProto.getComponents();
- LOG.debug("Expanded to {}", components.getTransformsOrThrow(expandedTransformId));
-
- return ExpansionApi.ExpansionResponse.newBuilder()
- .setComponents(components.toBuilder().removeTransforms(expandedTransformId))
- .setTransform(components.getTransformsOrThrow(expandedTransformId))
- .build();
- }
-
- @Override
- public void expand(
- ExpansionApi.ExpansionRequest request,
- StreamObserver<ExpansionApi.ExpansionResponse> responseObserver) {
- try {
- responseObserver.onNext(expand(request));
- responseObserver.onCompleted();
- } catch (RuntimeException exn) {
- responseObserver.onError(exn);
- throw exn;
- }
- }
-
- public static void main(String[] args) throws Exception {
- int port = Integer.parseInt(args[0]);
- System.out.println("Starting expansion service at localhost:" + port);
- Server server = ServerBuilder.forPort(port).addService(new ExpansionService()).build();
- server.start();
- server.awaitTermination();
- }
-}
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
index d55f5d4..8843125 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
@@ -47,7 +46,6 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar {
static final BiMap<Class<? extends Coder>, String> BEAM_MODEL_CODER_URNS =
ImmutableBiMap.<Class<? extends Coder>, String>builder()
.put(ByteArrayCoder.class, ModelCoders.BYTES_CODER_URN)
- .put(StringUtf8Coder.class, ModelCoders.STRING_UTF8_CODER_URN)
.put(KvCoder.class, ModelCoders.KV_CODER_URN)
.put(VarLongCoder.class, ModelCoders.INT64_CODER_URN)
.put(IntervalWindowCoder.class, ModelCoders.INTERVAL_WINDOW_CODER_URN)
@@ -64,7 +62,6 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar {
static final Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> BEAM_MODEL_CODERS =
ImmutableMap.<Class<? extends Coder>, CoderTranslator<? extends Coder>>builder()
.put(ByteArrayCoder.class, CoderTranslators.atomic(ByteArrayCoder.class))
- .put(StringUtf8Coder.class, CoderTranslators.atomic(StringUtf8Coder.class))
.put(VarLongCoder.class, CoderTranslators.atomic(VarLongCoder.class))
.put(IntervalWindowCoder.class, CoderTranslators.atomic(IntervalWindowCoder.class))
.put(GlobalWindow.Coder.class, CoderTranslators.atomic(GlobalWindow.Coder.class))
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
index b79ee35..3c6dfba 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
@@ -33,7 +33,6 @@ public class ModelCoders {
private ModelCoders() {}
public static final String BYTES_CODER_URN = getUrn(StandardCoders.Enum.BYTES);
- public static final String STRING_UTF8_CODER_URN = getUrn(StandardCoders.Enum.STRING_UTF8);
// Where is this required explicitly, instead of implicit within WindowedValue and LengthPrefix
// coders?
public static final String INT64_CODER_URN = getUrn(StandardCoders.Enum.VARINT);
@@ -55,7 +54,6 @@ public class ModelCoders {
private static final Set<String> MODEL_CODER_URNS =
ImmutableSet.of(
BYTES_CODER_URN,
- STRING_UTF8_CODER_URN,
INT64_CODER_URN,
ITERABLE_CODER_URN,
TIMER_CODER_URN,
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
index 6265894..68a793a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core.construction;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import java.io.IOException;
-import java.util.Collections;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -174,14 +173,4 @@ public class RehydratedComponents {
public Components getComponents() {
return components;
}
-
- public SdkComponents getSdkComponents() {
- return SdkComponents.create(
- components,
- Collections.emptyMap(),
- pCollections.asMap(),
- windowingStrategies.asMap(),
- coders.asMap(),
- Collections.emptyMap());
- }
}
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index 2a1b335..e44d724 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -23,7 +23,6 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
@@ -43,20 +42,22 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
/** SDK objects that will be represented at some later point within a {@link Components} object. */
public class SdkComponents {
- private final String newIdPrefix;
private final RunnerApi.Components.Builder componentsBuilder = RunnerApi.Components.newBuilder();
private final BiMap<AppliedPTransform<?, ?, ?>, String> transformIds = HashBiMap.create();
private final BiMap<PCollection<?>, String> pCollectionIds = HashBiMap.create();
private final BiMap<WindowingStrategy<?, ?>, String> windowingStrategyIds = HashBiMap.create();
+
+ /** A map of Coder to IDs. Coders are stored here with identity equivalence. */
private final BiMap<Coder<?>, String> coderIds = HashBiMap.create();
+
private final BiMap<Environment, String> environmentIds = HashBiMap.create();
private final Set<String> reservedIds = new HashSet<>();
/** Create a new {@link SdkComponents} with no components. */
public static SdkComponents create() {
- return new SdkComponents("");
+ return new SdkComponents();
}
/**
@@ -65,27 +66,11 @@ public class SdkComponents {
* <p>WARNING: This action might cause some of duplicate items created.
*/
public static SdkComponents create(RunnerApi.Components components) {
- return new SdkComponents(components, "");
- }
-
- /*package*/ static SdkComponents create(
- RunnerApi.Components components,
- Map<String, AppliedPTransform<?, ?, ?>> transforms,
- Map<String, PCollection<?>> pCollections,
- Map<String, WindowingStrategy<?, ?>> windowingStrategies,
- Map<String, Coder<?>> coders,
- Map<String, Environment> environments) {
- SdkComponents sdkComponents = SdkComponents.create(components);
- sdkComponents.transformIds.inverse().putAll(transforms);
- sdkComponents.pCollectionIds.inverse().putAll(pCollections);
- sdkComponents.windowingStrategyIds.inverse().putAll(windowingStrategies);
- sdkComponents.coderIds.inverse().putAll(coders);
- sdkComponents.environmentIds.inverse().putAll(environments);
- return sdkComponents;
+ return new SdkComponents(components);
}
public static SdkComponents create(PipelineOptions options) {
- SdkComponents sdkComponents = new SdkComponents("");
+ SdkComponents sdkComponents = new SdkComponents();
PortablePipelineOptions portablePipelineOptions = options.as(PortablePipelineOptions.class);
sdkComponents.registerEnvironment(
Environments.createOrGetDefaultEnvironment(
@@ -94,13 +79,9 @@ public class SdkComponents {
return sdkComponents;
}
- private SdkComponents(String newIdPrefix) {
- this.newIdPrefix = newIdPrefix;
- }
-
- private SdkComponents(RunnerApi.Components components, String newIdPrefix) {
- this.newIdPrefix = newIdPrefix;
+ private SdkComponents() {}
+ private SdkComponents(RunnerApi.Components components) {
if (components == null) {
return;
}
@@ -111,28 +92,10 @@ public class SdkComponents {
reservedIds.addAll(components.getCodersMap().keySet());
reservedIds.addAll(components.getEnvironmentsMap().keySet());
- environmentIds.inverse().putAll(components.getEnvironmentsMap());
-
componentsBuilder.mergeFrom(components);
}
/**
- * Returns an SdkComponents like this one, but which will prefix all newly generated ids with the
- * given string.
- *
- * <p>Useful for ensuring independently-constructed components have non-overlapping ids.
- */
- public SdkComponents withNewIdPrefix(String newIdPrefix) {
- SdkComponents sdkComponents = new SdkComponents(componentsBuilder.build(), newIdPrefix);
- sdkComponents.transformIds.putAll(transformIds);
- sdkComponents.pCollectionIds.putAll(pCollectionIds);
- sdkComponents.windowingStrategyIds.putAll(windowingStrategyIds);
- sdkComponents.coderIds.putAll(coderIds);
- sdkComponents.environmentIds.putAll(environmentIds);
- return sdkComponents;
- }
-
- /**
* Registers the provided {@link AppliedPTransform} into this {@link SdkComponents}, returning a
* unique ID for the {@link AppliedPTransform}. Multiple registrations of the same {@link
* AppliedPTransform} will return the same unique ID.
@@ -274,10 +237,10 @@ public class SdkComponents {
}
private String uniqify(String baseName, Set<String> existing) {
- String name = newIdPrefix + baseName;
+ String name = baseName;
int increment = 1;
while (existing.contains(name) || reservedIds.contains(name)) {
- name = newIdPrefix + baseName + Integer.toString(increment);
+ name = baseName + Integer.toString(increment);
increment++;
}
return name;
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
index 593f9a8..f96c977 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
@@ -60,7 +60,6 @@ public class CoderTranslationTest {
private static final Set<StructuredCoder<?>> KNOWN_CODERS =
ImmutableSet.<StructuredCoder<?>>builder()
.add(ByteArrayCoder.of())
- .add(StringUtf8Coder.of())
.add(KvCoder.of(VarLongCoder.of(), VarLongCoder.of()))
.add(VarLongCoder.of())
.add(IntervalWindowCoder.of())
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExpansionServiceTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExpansionServiceTest.java
deleted file mode 100644
index 93af15d..0000000
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExpansionServiceTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.construction;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-
-import com.google.auto.service.AutoService;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.beam.model.expansion.v1.ExpansionApi;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Impulse;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
-import org.junit.Test;
-
-/** Tests for {@link ExpansionService}. */
-public class ExpansionServiceTest {
-
- private static final String TEST_URN = "test:beam:transforms:count";
-
- private static final String TEST_NAME = "TestName";
-
- private static final String TEST_NAMESPACE = "namespace";
-
- private ExpansionService expansionService = new ExpansionService();
-
- /** Registers a single test transformation. */
- @AutoService(ExpansionService.ExpansionServiceRegistrar.class)
- public static class TestTransforms implements ExpansionService.ExpansionServiceRegistrar {
- @Override
- public Map<String, ExpansionService.TransformProvider> knownTransforms() {
- return ImmutableMap.of(TEST_URN, spec -> Count.perElement());
- }
- }
-
- @Test
- public void testConstruct() {
- Pipeline p = Pipeline.create();
- p.apply(Impulse.create());
- RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
- String inputPcollId =
- Iterables.getOnlyElement(
- Iterables.getOnlyElement(pipelineProto.getComponents().getTransformsMap().values())
- .getOutputsMap()
- .values());
- ExpansionApi.ExpansionRequest request =
- ExpansionApi.ExpansionRequest.newBuilder()
- .setComponents(pipelineProto.getComponents())
- .setTransform(
- RunnerApi.PTransform.newBuilder()
- .setUniqueName(TEST_NAME)
- .setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(TEST_URN))
- .putInputs("input", inputPcollId))
- .setNamespace(TEST_NAMESPACE)
- .build();
- ExpansionApi.ExpansionResponse response = expansionService.expand(request);
- RunnerApi.PTransform expandedTransform = response.getTransform();
- assertEquals(TEST_NAME, expandedTransform.getUniqueName());
- // Verify it has the right input.
- assertEquals(inputPcollId, Iterables.getOnlyElement(expandedTransform.getInputsMap().values()));
- // Loose check that it's composite, and its children are represented.
- assertNotEquals(expandedTransform.getSubtransformsCount(), 0);
- for (String subtransform : expandedTransform.getSubtransformsList()) {
- assertTrue(response.getComponents().containsTransforms(subtransform));
- }
- // Check that any newly generated components are properly namespaced.
- Set<String> originalIds = allIds(request.getComponents());
- for (String id : allIds(response.getComponents())) {
- assertTrue(id, id.startsWith(TEST_NAMESPACE) || originalIds.contains(id));
- }
- }
-
- public Set<String> allIds(RunnerApi.Components components) {
- Set<String> all = new HashSet<>();
- all.addAll(components.getTransformsMap().keySet());
- all.addAll(components.getPcollectionsMap().keySet());
- all.addAll(components.getCodersMap().keySet());
- all.addAll(components.getWindowingStrategiesMap().keySet());
- all.addAll(components.getEnvironmentsMap().keySet());
- return all;
- }
-}
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestExpansionService.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestExpansionService.java
deleted file mode 100644
index 983aa02..0000000
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestExpansionService.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.construction;
-
-import com.google.auto.service.AutoService;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Filter;
-import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Server;
-import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ServerBuilder;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
-
-/** An {@link ExpansionService} useful for tests. */
-public class TestExpansionService {
-
- private static final String TEST_COUNT_URN = "pytest:beam:transforms:count";
- private static final String TEST_FILTER_URN = "pytest:beam:transforms:filter_less_than";
-
- /** Registers a single test transformation. */
- @AutoService(ExpansionService.ExpansionServiceRegistrar.class)
- public static class TestTransforms implements ExpansionService.ExpansionServiceRegistrar {
- @Override
- public Map<String, ExpansionService.TransformProvider> knownTransforms() {
- return ImmutableMap.of(
- TEST_COUNT_URN, spec -> Count.perElement(),
- TEST_FILTER_URN, spec -> Filter.lessThanEq(spec.getPayload().toStringUtf8()));
- }
- }
-
- public static void main(String[] args) throws Exception {
- int port = Integer.parseInt(args[0]);
- System.out.println("Starting expansion service at localhost:" + port);
- Server server = ServerBuilder.forPort(port).addService(new ExpansionService()).build();
- server.start();
- server.awaitTermination();
- }
-}
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 80e5a56..09cf59d 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -442,7 +442,11 @@ public class RemoteExecutionTest implements Serializable {
RemoteOutputReceiver.of(targetCoder.getValue(), outputContents::add));
}
- Iterable<String> sideInputData = Arrays.asList("A", "B", "C");
+ Iterable<byte[]> sideInputData =
+ Arrays.asList(
+ CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "A"),
+ CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B"),
+ CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C"));
StateRequestHandler stateRequestHandler =
StateRequestHandlers.forSideInputHandlerFactory(
descriptor.getSideInputSpecs(),
@@ -472,9 +476,13 @@ public class RemoteExecutionTest implements Serializable {
try (ActiveBundle bundle =
processor.newBundle(outputReceivers, stateRequestHandler, progressHandler)) {
Iterables.getOnlyElement(bundle.getInputReceivers().values())
- .accept(WindowedValue.valueInGlobalWindow("X"));
+ .accept(
+ WindowedValue.valueInGlobalWindow(
+ CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
Iterables.getOnlyElement(bundle.getInputReceivers().values())
- .accept(WindowedValue.valueInGlobalWindow("Y"));
+ .accept(
+ WindowedValue.valueInGlobalWindow(
+ CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Y")));
}
for (Collection<WindowedValue<?>> windowedValues : outputValues.values()) {
assertThat(
@@ -1070,18 +1078,22 @@ public class RemoteExecutionTest implements Serializable {
WindowedValue.valueInGlobalWindow(kvBytes("stream2X", ""))));
}
- private KV<String, byte[]> kvBytes(String key, long value) throws CoderException {
- return KV.of(key, CoderUtils.encodeToByteArray(BigEndianLongCoder.of(), value));
+ private KV<byte[], byte[]> kvBytes(String key, long value) throws CoderException {
+ return KV.of(
+ CoderUtils.encodeToByteArray(StringUtf8Coder.of(), key),
+ CoderUtils.encodeToByteArray(BigEndianLongCoder.of(), value));
}
- private KV<String, String> kvBytes(String key, String value) throws CoderException {
- return KV.of(key, value);
+ private KV<byte[], byte[]> kvBytes(String key, String value) throws CoderException {
+ return KV.of(
+ CoderUtils.encodeToByteArray(StringUtf8Coder.of(), key),
+ CoderUtils.encodeToByteArray(StringUtf8Coder.of(), value));
}
- private KV<String, org.apache.beam.runners.core.construction.Timer<byte[]>> timerBytes(
+ private KV<byte[], org.apache.beam.runners.core.construction.Timer<byte[]>> timerBytes(
String key, long timestampOffset) throws CoderException {
return KV.of(
- key,
+ CoderUtils.encodeToByteArray(StringUtf8Coder.of(), key),
org.apache.beam.runners.core.construction.Timer.of(
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(timestampOffset),
CoderUtils.encodeToByteArray(VoidCoder.of(), null, Coder.Context.NESTED)));
@@ -1090,7 +1102,7 @@ public class RemoteExecutionTest implements Serializable {
private Object timerStructuralValue(Object timer) {
return WindowedValue.FullWindowedValueCoder.of(
KvCoder.of(
- StringUtf8Coder.of(),
+ ByteArrayCoder.of(),
org.apache.beam.runners.core.construction.Timer.Coder.of(ByteArrayCoder.of())),
GlobalWindow.Coder.INSTANCE)
.structuralValue(timer);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 5468537..a31eced 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -185,11 +185,9 @@ public class TransformHierarchy {
public void finishSpecifyingInput() {
// Inputs must be completely specified before they are consumed by a transform.
for (PValue inputValue : current.getInputs().values()) {
+ Node producerNode = getProducer(inputValue);
PInput input = producerInput.remove(inputValue);
- Node producerNode = maybeGetProducer(inputValue);
- if (producerNode != null) {
- inputValue.finishSpecifying(input, producerNode.getTransform());
- }
+ inputValue.finishSpecifying(input, producerNode.getTransform());
}
}
@@ -237,12 +235,8 @@ public class TransformHierarchy {
checkState(current != null, "Can't pop the root node of a TransformHierarchy");
}
- Node maybeGetProducer(PValue produced) {
- return producers.get(produced);
- }
-
Node getProducer(PValue produced) {
- return checkNotNull(maybeGetProducer(produced), "No producer found for %s", produced);
+ return checkNotNull(producers.get(produced), "No producer found for %s", produced);
}
public Set<PValue> visit(PipelineVisitor visitor) {
@@ -635,15 +629,13 @@ public class TransformHierarchy {
if (!isRootNode()) {
// Visit inputs.
for (PValue inputValue : inputs.values()) {
- Node valueProducer = maybeGetProducer(inputValue);
- if (valueProducer != null) {
- if (!visitedNodes.contains(valueProducer)) {
- valueProducer.visit(visitor, visitedValues, visitedNodes, skippedComposites);
- }
- if (visitedValues.add(inputValue)) {
- LOG.debug("Visiting input value {}", inputValue);
- visitor.visitValue(inputValue, valueProducer);
- }
+ Node valueProducer = getProducer(inputValue);
+ if (!visitedNodes.contains(valueProducer)) {
+ valueProducer.visit(visitor, visitedValues, visitedNodes, skippedComposites);
+ }
+ if (visitedValues.add(inputValue)) {
+ LOG.debug("Visiting input value {}", inputValue);
+ visitor.visitValue(inputValue, valueProducer);
}
}
}
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 2164161..baa3cf5 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -27,13 +27,11 @@ from builtins import object
import google.protobuf.wrappers_pb2
from future.moves import pickle
-from past.builtins import unicode
from apache_beam.coders import coder_impl
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.typehints import typehints
from apache_beam.utils import proto_utils
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -164,9 +162,6 @@ class Coder(object):
return d
return self.__dict__
- def to_type_hint(self):
- raise NotImplementedError('BEAM-2717')
-
@classmethod
def from_type_hint(cls, unused_typehint, unused_registry):
# If not overridden, just construct the coder without arguments.
@@ -326,13 +321,6 @@ class StrUtf8Coder(Coder):
def is_deterministic(self):
return True
- def to_type_hint(self):
- return unicode
-
-
-Coder.register_structured_urn(
- common_urns.coders.STRING_UTF8.urn, StrUtf8Coder)
-
class ToStringCoder(Coder):
"""A default string coder used if no sink coder is specified."""
@@ -388,9 +376,6 @@ class BytesCoder(FastCoder):
def is_deterministic(self):
return True
- def to_type_hint(self):
- return bytes
-
def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:bytes',
@@ -415,9 +400,6 @@ class VarIntCoder(FastCoder):
def is_deterministic(self):
return True
- def to_type_hint(self):
- return int
-
def as_cloud_object(self, coders_context=None):
return {
'@type': 'kind:varint',
@@ -442,9 +424,6 @@ class FloatCoder(FastCoder):
def is_deterministic(self):
return True
- def to_type_hint(self):
- return float
-
def __eq__(self, other):
return type(self) == type(other)
@@ -589,9 +568,6 @@ class PickleCoder(_PickleCoderBase):
def as_deterministic_coder(self, step_label, error_message=None):
return DeterministicFastPrimitivesCoder(self, step_label)
- def to_type_hint(self):
- return typehints.Any
-
class DillCoder(_PickleCoderBase):
"""Coder using dill's pickle functionality."""
@@ -623,9 +599,6 @@ class DeterministicFastPrimitivesCoder(FastCoder):
def value_coder(self):
return self
- def to_type_hint(self):
- return typehints.Any
-
class FastPrimitivesCoder(FastCoder):
"""Encodes simple primitives (e.g. str, int) efficiently.
@@ -648,9 +621,6 @@ class FastPrimitivesCoder(FastCoder):
else:
return DeterministicFastPrimitivesCoder(self, step_label)
- def to_type_hint(self):
- return typehints.Any
-
def as_cloud_object(self, coders_context=None, is_pair_like=True):
value = super(FastCoder, self).as_cloud_object(coders_context)
# We currently use this coder in places where we cannot infer the coder to
@@ -776,9 +746,6 @@ class TupleCoder(FastCoder):
return TupleCoder([c.as_deterministic_coder(step_label, error_message)
for c in self._coders])
- def to_type_hint(self):
- return typehints.Tuple[tuple(c.to_type_hint() for c in self._coders)]
-
@staticmethod
def from_type_hint(typehint, registry):
return TupleCoder([registry.get_coder(t) for t in typehint.tuple_types])
@@ -911,9 +878,6 @@ class IterableCoder(FastCoder):
def value_coder(self):
return self._elem_coder
- def to_type_hint(self):
- return typehints.Iterable[self._elem_coder.to_type_hint()]
-
@staticmethod
def from_type_hint(typehint, registry):
return IterableCoder(registry.get_coder(typehint.inner_type))
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 8f0fa10..20ac5f0 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -72,7 +72,6 @@ from apache_beam.pvalue import PDone
from apache_beam.runners import PipelineRunner
from apache_beam.runners import create_runner
from apache_beam.transforms import ptransform
-#from apache_beam.transforms import external
from apache_beam.typehints import TypeCheckError
from apache_beam.typehints import typehints
from apache_beam.utils.annotations import deprecated
@@ -791,23 +790,23 @@ class AppliedPTransform(object):
for pval in self.inputs:
if pval not in visited and not isinstance(pval, pvalue.PBegin):
- if pval.producer is not None:
- pval.producer.visit(visitor, pipeline, visited)
- # The value should be visited now since we visit outputs too.
- assert pval in visited, pval
+ assert pval.producer is not None
+ pval.producer.visit(visitor, pipeline, visited)
+ # The value should be visited now since we visit outputs too.
+ assert pval in visited, pval
# Visit side inputs.
for pval in self.side_inputs:
if isinstance(pval, pvalue.AsSideInput) and pval.pvalue not in visited:
pval = pval.pvalue # Unpack marker-object-wrapped pvalue.
- if pval.producer is not None:
- pval.producer.visit(visitor, pipeline, visited)
- # The value should be visited now since we visit outputs too.
- assert pval in visited
- # TODO(silviuc): Is there a way to signal that we are visiting a side
- # value? The issue is that the same PValue can be reachable through
- # multiple paths and therefore it is not guaranteed that the value
- # will be visited as a side value.
+ assert pval.producer is not None
+ pval.producer.visit(visitor, pipeline, visited)
+ # The value should be visited now since we visit outputs too.
+ assert pval in visited
+ # TODO(silviuc): Is there a way to signal that we are visiting a side
+ # value? The issue is that the same PValue can be reachable through
+ # multiple paths and therefore it is not guaranteed that the value
+ # will be visited as a side value.
# Visit a composite or primitive transform.
if self.is_composite():
@@ -848,11 +847,6 @@ class AppliedPTransform(object):
if isinstance(output, pvalue.PCollection)}
def to_runner_api(self, context):
- # External tranforms require more splicing than just setting the spec.
- from apache_beam.transforms import external
- if isinstance(self.transform, external.ExternalTransform):
- return self.transform.to_runner_api_transform(context, self.full_label)
-
from apache_beam.portability.api import beam_runner_api_pb2
def transform_to_runner_api(transform, context):
diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py
index 34cafd0..6a5e42a 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -145,26 +145,20 @@ class PCollection(PValue, typing.Generic[typing.TypeVar('T')]):
def to_runner_api(self, context):
return beam_runner_api_pb2.PCollection(
- unique_name=self._unique_name(),
+ unique_name='%d%s.%s' % (
+ len(self.producer.full_label), self.producer.full_label, self.tag),
coder_id=context.coder_id_from_element_type(self.element_type),
is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED,
windowing_strategy_id=context.windowing_strategies.get_id(
self.windowing))
- def _unique_name(self):
- if self.producer:
- return '%d%s.%s' % (
- len(self.producer.full_label), self.producer.full_label, self.tag)
- else:
- return 'PCollection%s' % id(self)
-
@staticmethod
def from_runner_api(proto, context):
# Producer and tag will be filled in later, the key point is that the
# same object is returned for the same pcollection id.
return PCollection(
None,
- element_type=context.element_type_from_coder_id(proto.coder_id),
+ element_type=pickler.loads(proto.coder_id),
windowing=context.windowing_strategies.get_by_id(
proto.windowing_strategy_id))
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
index e6685d9..74156a1 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -55,10 +55,9 @@ class _PipelineContextMap(object):
Under the hood it encodes and decodes these objects into runner API
representations.
"""
- def __init__(self, context, obj_type, namespace, proto_map=None):
+ def __init__(self, context, obj_type, proto_map=None):
self._pipeline_context = context
self._obj_type = obj_type
- self._namespace = namespace
self._obj_to_id = {}
self._id_to_obj = {}
self._id_to_proto = dict(proto_map) if proto_map else {}
@@ -66,11 +65,8 @@ class _PipelineContextMap(object):
def _unique_ref(self, obj=None, label=None):
self._counter += 1
- return "%s_%s_%s_%d" % (
- self._namespace,
- self._obj_type.__name__,
- label or type(obj).__name__,
- self._counter)
+ return "ref_%s_%s_%s" % (
+ self._obj_type.__name__, label or type(obj).__name__, self._counter)
def populate_map(self, proto_map):
for id, proto in self._id_to_proto.items():
@@ -93,19 +89,6 @@ class _PipelineContextMap(object):
self._id_to_proto[id], self._pipeline_context)
return self._id_to_obj[id]
- def get_by_proto(self, maybe_new_proto, label=None, deduplicate=False):
- if deduplicate:
- for id, proto in self._id_to_proto.items():
- if proto == maybe_new_proto:
- return id
- return self.put_proto(self._unique_ref(label), maybe_new_proto)
-
- def put_proto(self, id, proto):
- if id in self._id_to_proto:
- raise ValueError("Id '%s' is already taken." % id)
- self._id_to_proto[id] = proto
- return id
-
def __getitem__(self, id):
return self.get_by_id(id)
@@ -129,8 +112,7 @@ class PipelineContext(object):
def __init__(
self, proto=None, default_environment=None, use_fake_coders=False,
- iterable_state_read=None, iterable_state_write=None,
- namespace='ref'):
+ iterable_state_read=None, iterable_state_write=None):
if isinstance(proto, beam_fn_api_pb2.ProcessBundleDescriptor):
proto = beam_runner_api_pb2.Components(
coders=dict(proto.coders.items()),
@@ -139,7 +121,7 @@ class PipelineContext(object):
for name, cls in self._COMPONENT_TYPES.items():
setattr(
self, name, _PipelineContextMap(
- self, cls, namespace, getattr(proto, name, None)))
+ self, cls, getattr(proto, name, None)))
if default_environment:
self._default_environment_id = self.environments.get_id(
Environment(default_environment), label='default_environment')
@@ -159,12 +141,6 @@ class PipelineContext(object):
else:
return self.coders.get_id(coders.registry.get_coder(element_type))
- def element_type_from_coder_id(self, coder_id):
- if self.use_fake_coders or coder_id not in self.coders:
- return pickler.loads(coder_id)
- else:
- return self.coders[coder_id].to_type_hint()
-
@staticmethod
def from_runner_api(proto):
return PipelineContext(proto)
diff --git a/sdks/python/apache_beam/runners/pipeline_context_test.py b/sdks/python/apache_beam/runners/pipeline_context_test.py
index 6f1ec74..1e9456a 100644
--- a/sdks/python/apache_beam/runners/pipeline_context_test.py
+++ b/sdks/python/apache_beam/runners/pipeline_context_test.py
@@ -33,14 +33,6 @@ class PipelineContextTest(unittest.TestCase):
bytes_coder_ref2 = context.coders.get_id(coders.BytesCoder())
self.assertEqual(bytes_coder_ref, bytes_coder_ref2)
- def test_deduplication_by_proto(self):
- context = pipeline_context.PipelineContext()
- bytes_coder_proto = coders.BytesCoder().to_runner_api(None)
- bytes_coder_ref = context.coders.get_by_proto(bytes_coder_proto)
- bytes_coder_ref2 = context.coders.get_by_proto(
- bytes_coder_proto, deduplicate=True)
- self.assertEqual(bytes_coder_ref, bytes_coder_ref2)
-
def test_serialization(self):
context = pipeline_context.PipelineContext()
float_coder_ref = context.coders.get_id(coders.FloatCoder())
diff --git a/sdks/python/apache_beam/runners/portability/expansion_service.py b/sdks/python/apache_beam/runners/portability/expansion_service.py
deleted file mode 100644
index e407892..0000000
--- a/sdks/python/apache_beam/runners/portability/expansion_service.py
+++ /dev/null
@@ -1,118 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A PipelineExpansion service.
-"""
-from __future__ import absolute_import
-from __future__ import print_function
-
-import argparse
-import logging
-import sys
-import time
-import traceback
-
-from apache_beam import pipeline as beam_pipeline
-from apache_beam.portability import python_urns
-from apache_beam.portability.api import beam_expansion_api_pb2
-from apache_beam.portability.api import beam_expansion_api_pb2_grpc
-from apache_beam.runners import pipeline_context
-from apache_beam.runners.portability import portable_runner
-from apache_beam.transforms import external
-from apache_beam.transforms import ptransform
-
-
-class ExpansionServiceServicer(
- beam_expansion_api_pb2_grpc.ExpansionServiceServicer):
-
- def __init__(self, options=None):
- self._options = options or beam_pipeline.PipelineOptions(
- environment_type=python_urns.EMBEDDED_PYTHON)
-
- def Expand(self, request):
- try:
- pipeline = beam_pipeline.Pipeline(options=self._options)
-
- def with_pipeline(component, pcoll_id=None):
- component.pipeline = pipeline
- if pcoll_id:
- component.producer, component.tag = producers[pcoll_id]
- # We need the lookup to resolve back to this id.
- context.pcollections._obj_to_id[component] = pcoll_id
- return component
-
- context = pipeline_context.PipelineContext(
- request.components,
- default_environment=
- portable_runner.PortableRunner._create_environment(
- self._options),
- namespace=request.namespace)
- producers = {
- pcoll_id: (context.transforms.get_by_id(t_id), pcoll_tag)
- for t_id, t_proto in request.components.transforms.items()
- for pcoll_tag, pcoll_id in t_proto.outputs.items()
- }
- transform = with_pipeline(
- ptransform.PTransform.from_runner_api(
- request.transform.spec, context))
- inputs = transform._pvaluish_from_dict({
- tag: with_pipeline(context.pcollections.get_by_id(pcoll_id), pcoll_id)
- for tag, pcoll_id in request.transform.inputs.items()
- })
- if not inputs:
- inputs = pipeline
- with external.ExternalTransform.outer_namespace(request.namespace):
- result = pipeline.apply(
- transform, inputs, request.transform.unique_name)
- expanded_transform = pipeline._root_transform().parts[-1]
- # TODO(BEAM-1833): Use named outputs internally.
- if isinstance(result, dict):
- expanded_transform.outputs = result
- pipeline_proto = pipeline.to_runner_api(context=context)
- # TODO(BEAM-1833): Use named inputs internally.
- expanded_transform_id = context.transforms.get_id(expanded_transform)
- expanded_transform_proto = pipeline_proto.components.transforms.pop(
- expanded_transform_id)
- expanded_transform_proto.inputs.clear()
- expanded_transform_proto.inputs.update(request.transform.inputs)
- for transform_id in pipeline_proto.root_transform_ids:
- del pipeline_proto.components.transforms[transform_id]
- return beam_expansion_api_pb2.ExpansionResponse(
- components=pipeline_proto.components,
- transform=expanded_transform_proto)
-
- except Exception: # pylint: disable=broad-except
- return beam_expansion_api_pb2.ExpansionResponse(
- error=traceback.format_exc())
-
-
-def main(unused_argv):
- parser = argparse.ArgumentParser()
- parser.add_argument('-p', '--port',
- type=int,
- help='port on which to serve the job api')
- options = parser.parse_args()
- expansion_servicer = ExpansionServiceServicer()
- port = expansion_servicer.start_grpc_server(options.port)
- while True:
- logging.info('Listening for expansion requests at %d', port)
- time.sleep(300)
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- main(sys.argv)
diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py
index 41cfcf6..a207009 100644
--- a/sdks/python/apache_beam/transforms/__init__.py
+++ b/sdks/python/apache_beam/transforms/__init__.py
@@ -22,7 +22,6 @@ from __future__ import absolute_import
from apache_beam.transforms import combiners
from apache_beam.transforms.core import *
-from apache_beam.transforms.external import *
from apache_beam.transforms.ptransform import *
from apache_beam.transforms.timeutil import TimeDomain
from apache_beam.transforms.util import *
diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py
deleted file mode 100644
index c9f8ba4..0000000
--- a/sdks/python/apache_beam/transforms/external.py
+++ /dev/null
@@ -1,208 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Defines Transform whose expansion is implemented elsewhere.
-"""
-from __future__ import absolute_import
-from __future__ import print_function
-
-import contextlib
-import copy
-import threading
-
-import grpc
-
-from apache_beam import pvalue
-from apache_beam.portability import common_urns
-from apache_beam.portability.api import beam_expansion_api_pb2
-from apache_beam.portability.api import beam_expansion_api_pb2_grpc
-from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.runners import pipeline_context
-from apache_beam.transforms import ptransform
-
-
-class ExternalTransform(ptransform.PTransform):
-
- _namespace_counter = 0
- _namespace = threading.local()
- _namespace.value = 'external'
-
- _EXPANDED_TRANSFORM_UNIQUE_NAME = 'root'
- _IMPULSE_PREFIX = 'impulse'
-
- def __init__(self, urn, payload, endpoint):
- # TODO: Start an endpoint given an environment?
- self._urn = urn
- self._payload = payload
- self._endpoint = endpoint
- self._namespace = self._fresh_namespace()
-
- def default_label(self):
- return '%s(%s)' % (self.__class__.__name__, self._urn)
-
- @classmethod
- @contextlib.contextmanager
- def outer_namespace(cls, namespace):
- prev = cls._namespace.value
- cls._namespace.value = namespace
- yield
- cls._namespace.value = prev
-
- @classmethod
- def _fresh_namespace(cls):
- ExternalTransform._namespace_counter += 1
- return '%s_%d' % (cls._namespace.value, cls._namespace_counter)
-
- def expand(self, pvalueish):
- if isinstance(pvalueish, pvalue.PBegin):
- self._inputs = {}
- elif isinstance(pvalueish, (list, tuple)):
- self._inputs = {str(ix): pvalue for ix, pvalue in enumerate(pvalueish)}
- elif isinstance(pvalueish, dict):
- self._inputs = pvalueish
- else:
- self._inputs = {'input': pvalueish}
- pipeline = (
- next(iter(self._inputs.values())).pipeline
- if self._inputs
- else pvalueish.pipeline)
- context = pipeline_context.PipelineContext()
- transform_proto = beam_runner_api_pb2.PTransform(
- unique_name=self._EXPANDED_TRANSFORM_UNIQUE_NAME,
- spec=beam_runner_api_pb2.FunctionSpec(
- urn=self._urn, payload=self._payload))
- for tag, pcoll in self._inputs.items():
- transform_proto.inputs[tag] = context.pcollections.get_id(pcoll)
- # Conversion to/from proto assumes producers.
- # TODO: Possibly loosen this.
- context.transforms.put_proto(
- '%s_%s' % (self._IMPULSE_PREFIX, tag),
- beam_runner_api_pb2.PTransform(
- unique_name='%s_%s' % (self._IMPULSE_PREFIX, tag),
- spec=beam_runner_api_pb2.FunctionSpec(
- urn=common_urns.primitives.IMPULSE.urn),
- outputs={'out': transform_proto.inputs[tag]}))
- components = context.to_runner_api()
- request = beam_expansion_api_pb2.ExpansionRequest(
- components=components,
- namespace=self._namespace,
- transform=transform_proto)
-
- if isinstance(self._endpoint, str):
- with grpc.insecure_channel(self._endpoint) as channel:
- response = beam_expansion_api_pb2_grpc.ExpansionServiceStub(
- channel).Expand(request)
- else:
- response = self._endpoint.Expand(request)
-
- if response.error:
- raise RuntimeError(response.error)
- self._expanded_components = response.components
- self._expanded_transform = response.transform
- result_context = pipeline_context.PipelineContext(response.components)
-
- def fix_output(pcoll, tag):
- pcoll.pipeline = pipeline
- pcoll.tag = tag
- return pcoll
- self._outputs = {
- tag: fix_output(result_context.pcollections.get_by_id(pcoll_id), tag)
- for tag, pcoll_id in self._expanded_transform.outputs.items()
- }
-
- return self._output_to_pvalueish(self._outputs)
-
- def _output_to_pvalueish(self, output_dict):
- if len(output_dict) == 1:
- return next(iter(output_dict.values()))
- else:
- return output_dict
-
- def to_runner_api_transform(self, context, full_label):
- pcoll_renames = {}
- renamed_tag_seen = False
- for tag, pcoll in self._inputs.items():
- if tag not in self._expanded_transform.inputs:
- if renamed_tag_seen:
- raise RuntimeError(
- 'Ambiguity due to non-preserved tags: %s vs %s' % (
- sorted(self._expanded_transform.inputs.keys()),
- sorted(self._inputs.keys())))
- else:
- renamed_tag_seen = True
- tag, = self._expanded_transform.inputs.keys()
- pcoll_renames[self._expanded_transform.inputs[tag]] = (
- context.pcollections.get_id(pcoll))
- for tag, pcoll in self._outputs.items():
- pcoll_renames[self._expanded_transform.outputs[tag]] = (
- context.pcollections.get_id(pcoll))
-
- def _equivalent(coder1, coder2):
- return coder1 == coder2 or _normalize(coder1) == _normalize(coder2)
-
- def _normalize(coder_proto):
- normalized = copy.copy(coder_proto)
- normalized.spec.environment_id = ''
- # TODO(robertwb): Normalize components as well.
- return normalized
-
- for id, proto in self._expanded_components.coders.items():
- if id.startswith(self._namespace):
- context.coders.put_proto(id, proto)
- elif id in context.coders:
- if not _equivalent(context.coders._id_to_proto[id], proto):
- raise RuntimeError('Re-used coder id: %s\n%s\n%s' % (
- id, context.coders._id_to_proto[id], proto))
- else:
- context.coders.put_proto(id, proto)
- for id, proto in self._expanded_components.windowing_strategies.items():
- if id.startswith(self._namespace):
- context.windowing_strategies.put_proto(id, proto)
- for id, proto in self._expanded_components.environments.items():
- if id.startswith(self._namespace):
- context.environments.put_proto(id, proto)
- for id, proto in self._expanded_components.pcollections.items():
- if id not in pcoll_renames:
- context.pcollections.put_proto(id, proto)
-
- for id, proto in self._expanded_components.transforms.items():
- if id.startswith(self._IMPULSE_PREFIX):
- # Our fake inputs.
- continue
- assert id.startswith(self._namespace), (id, self._namespace)
- new_proto = beam_runner_api_pb2.PTransform(
- unique_name=full_label + proto.unique_name[
- len(self._EXPANDED_TRANSFORM_UNIQUE_NAME):],
- spec=proto.spec,
- subtransforms=proto.subtransforms,
- inputs={tag: pcoll_renames.get(pcoll, pcoll)
- for tag, pcoll in proto.inputs.items()},
- outputs={tag: pcoll_renames.get(pcoll, pcoll)
- for tag, pcoll in proto.outputs.items()})
- context.transforms.put_proto(id, new_proto)
-
- return self._expanded_transform
-
-
-def memoize(func):
- cache = {}
-
- def wrapper(*args):
- if args not in cache:
- cache[args] = func(*args)
- return cache[args]
- return wrapper
diff --git a/sdks/python/apache_beam/transforms/external_test.py b/sdks/python/apache_beam/transforms/external_test.py
deleted file mode 100644
index e66cf21..0000000
--- a/sdks/python/apache_beam/transforms/external_test.py
+++ /dev/null
@@ -1,205 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the transform.util classes."""
-
-from __future__ import absolute_import
-
-import argparse
-import subprocess
-import sys
-import unittest
-
-import grpc
-from past.builtins import unicode
-
-import apache_beam as beam
-from apache_beam.portability import python_urns
-from apache_beam.runners.portability import expansion_service
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-from apache_beam.transforms import ptransform
-
-
-class ExternalTransformTest(unittest.TestCase):
-
- # This will be overwritten if set via a flag.
- expansion_service_jar = None
-
- def test_simple(self):
-
- @ptransform.PTransform.register_urn('simple', None)
- class SimpleTransform(ptransform.PTransform):
- def expand(self, pcoll):
- return pcoll | beam.Map(lambda x: 'Simple(%s)' % x)
-
- def to_runner_api_parameter(self, unused_context):
- return 'simple', None
-
- @staticmethod
- def from_runner_api_parameter(unused_parameter, unused_context):
- return SimpleTransform()
-
- with beam.Pipeline() as p:
- res = (
- p
- | beam.Create(['a', 'b'])
- | beam.ExternalTransform(
- 'simple',
- None,
- expansion_service.ExpansionServiceServicer()))
- assert_that(res, equal_to(['Simple(a)', 'Simple(b)']))
-
- def test_multi(self):
-
- @ptransform.PTransform.register_urn('multi', None)
- class MutltiTransform(ptransform.PTransform):
- def expand(self, pcolls):
- return {
- 'main':
- (pcolls['main1'], pcolls['main2'])
- | beam.Flatten()
- | beam.Map(lambda x, s: x + s,
- beam.pvalue.AsSingleton(pcolls['side'])),
- 'side': pcolls['side'] | beam.Map(lambda x: x + x),
- }
-
- def to_runner_api_parameter(self, unused_context):
- return 'multi', None
-
- @staticmethod
- def from_runner_api_parameter(unused_parameter, unused_context):
- return MutltiTransform()
-
- with beam.Pipeline() as p:
- main1 = p | 'Main1' >> beam.Create(['a', 'bb'], reshuffle=False)
- main2 = p | 'Main2' >> beam.Create(['x', 'yy', 'zzz'], reshuffle=False)
- side = p | 'Side' >> beam.Create(['s'])
- res = dict(main1=main1, main2=main2, side=side) | beam.ExternalTransform(
- 'multi', None, expansion_service.ExpansionServiceServicer())
- assert_that(res['main'], equal_to(['as', 'bbs', 'xs', 'yys', 'zzzs']))
- assert_that(res['side'], equal_to(['ss']), label='CheckSide')
-
- def test_payload(self):
-
- @ptransform.PTransform.register_urn('payload', bytes)
- class PayloadTransform(ptransform.PTransform):
- def __init__(self, payload):
- self._payload = payload
-
- def expand(self, pcoll):
- return pcoll | beam.Map(lambda x, s: x + s, self._payload)
-
- def to_runner_api_parameter(self, unused_context):
- return b'payload', self._payload.encode('ascii')
-
- @staticmethod
- def from_runner_api_parameter(payload, unused_context):
- return PayloadTransform(payload.decode('ascii'))
-
- with beam.Pipeline() as p:
- res = (
- p
- | beam.Create(['a', 'bb'], reshuffle=False)
- | beam.ExternalTransform(
- 'payload', b's',
- expansion_service.ExpansionServiceServicer()))
- assert_that(res, equal_to(['as', 'bbs']))
-
- def test_nested(self):
- @ptransform.PTransform.register_urn('fib', bytes)
- class FibTransform(ptransform.PTransform):
- def __init__(self, level):
- self._level = level
-
- def expand(self, p):
- if self._level <= 2:
- return p | beam.Create([1])
- else:
- a = p | 'A' >> beam.ExternalTransform(
- 'fib', str(self._level - 1).encode('ascii'),
- expansion_service.ExpansionServiceServicer())
- b = p | 'B' >> beam.ExternalTransform(
- 'fib', str(self._level - 2).encode('ascii'),
- expansion_service.ExpansionServiceServicer())
- return (
- (a, b)
- | beam.Flatten()
- | beam.CombineGlobally(sum).without_defaults())
-
- def to_runner_api_parameter(self, unused_context):
- return 'fib', str(self._level).encode('ascii')
-
- @staticmethod
- def from_runner_api_parameter(level, unused_context):
- return FibTransform(int(level.decode('ascii')))
-
- with beam.Pipeline() as p:
- assert_that(p | FibTransform(6), equal_to([8]))
-
- def test_java_expansion(self):
- if not self.expansion_service_jar:
- raise unittest.SkipTest('No expansion service jar provided.')
-
- # The actual definitions of these transforms is in
- # org.apache.beam.runners.core.construction.TestExpansionService.
- TEST_COUNT_URN = "pytest:beam:transforms:count"
- TEST_FILTER_URN = "pytest:beam:transforms:filter_less_than"
-
- # Run as cheaply as possible on the portable runner.
- # TODO(robertwb): Support this directly in the direct runner.
- options = beam.options.pipeline_options.PipelineOptions(
- runner='PortableRunner',
- experiments=['beam_fn_api'],
- environment_type=python_urns.EMBEDDED_PYTHON,
- job_endpoint='embed')
-
- try:
- # Start the java server and wait for it to be ready.
- port = '8091'
- address = 'localhost:%s' % port
- server = subprocess.Popen(
- ['java', '-jar', self.expansion_service_jar, port])
- with grpc.insecure_channel(address) as channel:
- grpc.channel_ready_future(channel).result()
-
- # Run a simple count-filtered-letters pipeline.
- with beam.Pipeline(options=options) as p:
- res = (
- p
- | beam.Create(list('aaabccxyyzzz'))
- | beam.Map(unicode)
- | beam.ExternalTransform(TEST_FILTER_URN, 'middle', address)
- | beam.ExternalTransform(TEST_COUNT_URN, None, address)
- | beam.Map(lambda kv: '%s: %s' % kv))
-
- assert_that(res, equal_to(['a: 3', 'b: 1', 'c: 2']))
-
- finally:
- server.kill()
-
-
-if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument('--expansion_service_jar')
- known_args, sys.argv = parser.parse_known_args(sys.argv)
-
- if known_args.expansion_service_jar:
- ExternalTransformTest.expansion_service_jar = (
- known_args.expansion_service_jar)
-
- unittest.main()
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 2ebc93b..c512d9f 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -36,7 +36,6 @@ FlatMap processing functions.
from __future__ import absolute_import
-import contextlib
import copy
import itertools
import operator
@@ -231,23 +230,6 @@ def get_nested_pvalues(pvalueish):
return pvalues
-def get_nested_pvalues0(pvalueish):
- if isinstance(pvalueish, (tuple, list)):
- tagged_values = enumerate(pvalueish)
- if isinstance(pvalueish, dict):
- tagged_values = pvalueish.items()
- else:
- yield None, pvalueish
- return
-
- for tag, subvalue in tagged_values:
- for subtag, subsubvalue in get_nested_pvalues(subvalue):
- if subtag is None:
- yield tag, subsubvalue
- else:
- yield '%s.%s' % (tag, subsubvalue), subsubvalue
-
-
class _ZipPValues(object):
"""Pairs each PValue in a pvalueish with a value in a parallel out sibling.
@@ -544,37 +526,13 @@ class PTransform(WithTypeHints, HasDisplayData):
yield pvalueish
return pvalueish, tuple(_dict_tuple_leaves(pvalueish))
- def _pvaluish_from_dict(self, input_dict):
- if len(input_dict) == 1:
- return next(iter(input_dict.values()))
- else:
- return input_dict
-
_known_urns = {}
@classmethod
def register_urn(cls, urn, parameter_type, constructor=None):
def register(constructor):
- if isinstance(constructor, type):
- constructor.from_runner_api_parameter = register(
- constructor.from_runner_api_parameter)
- # pylint isn't smart enough to recognize when this is used
- # on a class or a method, and will emit a no-self-warning
- # in the latter case. Rather than suppressing this at each
- # use, we fool it here through some dynamic patching that
- # pylint will also not understand.
-
- @contextlib.contextmanager
- def fake_static_method():
- actual_static_method = staticmethod
- globals()['staticmethod'] = lambda x: x
- yield
- globals()['staticmethod'] = actual_static_method
- with fake_static_method():
- return staticmethod(constructor)
- else:
- cls._known_urns[urn] = parameter_type, constructor
- return staticmethod(constructor)
+ cls._known_urns[urn] = parameter_type, constructor
+ return staticmethod(constructor)
if constructor:
# Used as a statement.
register(constructor)
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index 18bd161..fc2372e 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -430,15 +430,3 @@ project.task('createProcessWorker') {
}
}
}
-
-project.task('crossLanguagePythonJava') {
- dependsOn 'setupVirtualenv'
- dependsOn ':beam-sdks-java-container:docker'
- dependsOn ':beam-runners-core-construction-java:testExpansionService'
- doLast {
- exec {
- executable 'sh'
- args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.transforms.external_test --expansion_service_jar=${project(":beam-runners-core-construction-java:").testExpansionService.archivePath}"
- }
- }
-}