You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/12 01:42:40 UTC
[06/18] incubator-beam git commit: [BEAM-151] Move a large portion of
the Dataflow runner to separate maven module
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerHooks.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerHooks.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerHooks.java
deleted file mode 100644
index 2cf77a0..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerHooks.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.api.services.dataflow.model.Environment;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-
-/**
- * An instance of this class can be passed to the
- * {@link DataflowPipelineRunner} to add user defined hooks to be
- * invoked at various times during pipeline execution.
- */
-@Experimental
-public class DataflowPipelineRunnerHooks {
- /**
- * Allows the user to modify the environment of their job before their job is submitted
- * to the service for execution.
- *
- * @param environment The environment of the job. Users can make change to this instance in order
- * to change the environment with which their job executes on the service.
- */
- public void modifyEnvironmentBeforeSubmission(Environment environment) {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
deleted file mode 100644
index 5bff46c..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
+++ /dev/null
@@ -1,1105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray;
-import static com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray;
-import static com.google.cloud.dataflow.sdk.util.StringUtils.byteArrayToJsonString;
-import static com.google.cloud.dataflow.sdk.util.StringUtils.jsonStringToByteArray;
-import static com.google.cloud.dataflow.sdk.util.Structs.addBoolean;
-import static com.google.cloud.dataflow.sdk.util.Structs.addDictionary;
-import static com.google.cloud.dataflow.sdk.util.Structs.addList;
-import static com.google.cloud.dataflow.sdk.util.Structs.addLong;
-import static com.google.cloud.dataflow.sdk.util.Structs.addObject;
-import static com.google.cloud.dataflow.sdk.util.Structs.addString;
-import static com.google.cloud.dataflow.sdk.util.Structs.getString;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.services.dataflow.model.AutoscalingSettings;
-import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.api.services.dataflow.model.Disk;
-import com.google.api.services.dataflow.model.Environment;
-import com.google.api.services.dataflow.model.Job;
-import com.google.api.services.dataflow.model.Step;
-import com.google.api.services.dataflow.model.WorkerPool;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.StreamingOptions;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly;
-import com.google.cloud.dataflow.sdk.runners.dataflow.BigQueryIOTranslator;
-import com.google.cloud.dataflow.sdk.runners.dataflow.PubsubIOTranslator;
-import com.google.cloud.dataflow.sdk.runners.dataflow.ReadTranslator;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
-import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.DoFnInfo;
-import com.google.cloud.dataflow.sdk.util.OutputReference;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TypedPValue;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * {@link DataflowPipelineTranslator} knows how to translate {@link Pipeline} objects
- * into Cloud Dataflow Service API {@link Job}s.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class DataflowPipelineTranslator {
- // Must be kept in sync with their internal counterparts.
- private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class);
- private static final ObjectMapper MAPPER = new ObjectMapper();
-
- /**
- * A map from {@link PTransform} subclass to the corresponding
- * {@link TransformTranslator} to use to translate that transform.
- *
- * <p>A static map that contains system-wide defaults.
- */
- private static Map<Class, TransformTranslator> transformTranslators =
- new HashMap<>();
-
- /** Provided configuration options. */
- private final DataflowPipelineOptions options;
-
- /**
- * Constructs a translator from the provided options.
- *
- * @param options Properties that configure the translator.
- *
- * @return The newly created translator.
- */
- public static DataflowPipelineTranslator fromOptions(
- DataflowPipelineOptions options) {
- return new DataflowPipelineTranslator(options);
- }
-
- private DataflowPipelineTranslator(DataflowPipelineOptions options) {
- this.options = options;
- }
-
- /**
- * Translates a {@link Pipeline} into a {@code JobSpecification}.
- */
- public JobSpecification translate(
- Pipeline pipeline,
- DataflowPipelineRunner runner,
- List<DataflowPackage> packages) {
-
- Translator translator = new Translator(pipeline, runner);
- Job result = translator.translate(packages);
- return new JobSpecification(result, Collections.unmodifiableMap(translator.stepNames));
- }
-
- /**
- * The result of a job translation.
- *
- * <p>Used to pass the result {@link Job} and any state that was used to construct the job that
- * may be of use to other classes (eg the {@link PTransform} to StepName mapping).
- */
- public static class JobSpecification {
- private final Job job;
- private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
-
- public JobSpecification(Job job, Map<AppliedPTransform<?, ?, ?>, String> stepNames) {
- this.job = job;
- this.stepNames = stepNames;
- }
-
- public Job getJob() {
- return job;
- }
-
- /**
- * Returns the mapping of {@link AppliedPTransform AppliedPTransforms} to the internal step
- * name for that {@code AppliedPTransform}.
- */
- public Map<AppliedPTransform<?, ?, ?>, String> getStepNames() {
- return stepNames;
- }
- }
-
- /**
- * Renders a {@link Job} as a string.
- */
- public static String jobToString(Job job) {
- try {
- return MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(job);
- } catch (JsonProcessingException exc) {
- throw new IllegalStateException("Failed to render Job as String.", exc);
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Records that instances of the specified PTransform class
- * should be translated by default by the corresponding
- * {@link TransformTranslator}.
- */
- public static <TransformT extends PTransform> void registerTransformTranslator(
- Class<TransformT> transformClass,
- TransformTranslator<? extends TransformT> transformTranslator) {
- if (transformTranslators.put(transformClass, transformTranslator) != null) {
- throw new IllegalArgumentException(
- "defining multiple translators for " + transformClass);
- }
- }
-
- /**
- * Returns the {@link TransformTranslator} to use for instances of the
- * specified PTransform class, or null if none registered.
- */
- public <TransformT extends PTransform>
- TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
- return transformTranslators.get(transformClass);
- }
-
- /**
- * A {@link TransformTranslator} knows how to translate
- * a particular subclass of {@link PTransform} for the
- * Cloud Dataflow service. It does so by
- * mutating the {@link TranslationContext}.
- */
- public interface TransformTranslator<TransformT extends PTransform> {
- public void translate(TransformT transform,
- TranslationContext context);
- }
-
- /**
- * The interface provided to registered callbacks for interacting
- * with the {@link DataflowPipelineRunner}, including reading and writing the
- * values of {@link PCollection}s and side inputs ({@link PCollectionView}s).
- */
- public interface TranslationContext {
- /**
- * Returns the configured pipeline options.
- */
- DataflowPipelineOptions getPipelineOptions();
-
- /**
- * Returns the input of the currently being translated transform.
- */
- <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform);
-
- /**
- * Returns the output of the currently being translated transform.
- */
- <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform);
-
- /**
- * Returns the full name of the currently being translated transform.
- */
- String getFullName(PTransform<?, ?> transform);
-
- /**
- * Adds a step to the Dataflow workflow for the given transform, with
- * the given Dataflow step type.
- * This step becomes "current" for the purpose of {@link #addInput} and
- * {@link #addOutput}.
- */
- public void addStep(PTransform<?, ?> transform, String type);
-
- /**
- * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be
- * consistent with the Step, in terms of input, output and coder types.
- *
- * <p>This is a low-level operation, when using this method it is up to
- * the caller to ensure that names do not collide.
- */
- public void addStep(PTransform<?, ? extends PValue> transform, Step step);
-
- /**
- * Sets the encoding for the current Dataflow step.
- */
- public void addEncodingInput(Coder<?> value);
-
- /**
- * Adds an input with the given name and value to the current
- * Dataflow step.
- */
- public void addInput(String name, Boolean value);
-
- /**
- * Adds an input with the given name and value to the current
- * Dataflow step.
- */
- public void addInput(String name, String value);
-
- /**
- * Adds an input with the given name and value to the current
- * Dataflow step.
- */
- public void addInput(String name, Long value);
-
- /**
- * Adds an input with the given name to the previously added Dataflow
- * step, coming from the specified input PValue.
- */
- public void addInput(String name, PInput value);
-
- /**
- * Adds an input that is a dictionary of strings to objects.
- */
- public void addInput(String name, Map<String, Object> elements);
-
- /**
- * Adds an input that is a list of objects.
- */
- public void addInput(String name, List<? extends Map<String, Object>> elements);
-
- /**
- * Adds an output with the given name to the previously added
- * Dataflow step, producing the specified output {@code PValue},
- * including its {@code Coder} if a {@code TypedPValue}. If the
- * {@code PValue} is a {@code PCollection}, wraps its coder inside
- * a {@code WindowedValueCoder}.
- */
- public void addOutput(String name, PValue value);
-
- /**
- * Adds an output with the given name to the previously added
- * Dataflow step, producing the specified output {@code PValue},
- * including its {@code Coder} if a {@code TypedPValue}. If the
- * {@code PValue} is a {@code PCollection}, wraps its coder inside
- * a {@code ValueOnlyCoder}.
- */
- public void addValueOnlyOutput(String name, PValue value);
-
- /**
- * Adds an output with the given name to the previously added
- * CollectionToSingleton Dataflow step, consuming the specified
- * input {@code PValue} and producing the specified output
- * {@code PValue}. This step requires special treatment for its
- * output encoding.
- */
- public void addCollectionToSingletonOutput(String name,
- PValue inputValue,
- PValue outputValue);
-
- /**
- * Encode a PValue reference as an output reference.
- */
- public OutputReference asOutputReference(PValue value);
- }
-
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Translates a Pipeline into the Dataflow representation.
- */
- class Translator implements PipelineVisitor, TranslationContext {
- /** The Pipeline to translate. */
- private final Pipeline pipeline;
-
- /** The runner which will execute the pipeline. */
- private final DataflowPipelineRunner runner;
-
- /** The Cloud Dataflow Job representation. */
- private final Job job = new Job();
-
- /**
- * Translator is stateful, as addProperty calls refer to the current step.
- */
- private Step currentStep;
-
- /**
- * A Map from AppliedPTransform to their unique Dataflow step names.
- */
- private final Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
-
- /**
- * A Map from PValues to their output names used by their producer
- * Dataflow steps.
- */
- private final Map<POutput, String> outputNames = new HashMap<>();
-
- /**
- * A Map from PValues to the Coders used for them.
- */
- private final Map<POutput, Coder<?>> outputCoders = new HashMap<>();
-
- /**
- * The transform currently being applied.
- */
- private AppliedPTransform<?, ?, ?> currentTransform;
-
- /**
- * Constructs a Translator that will translate the specified
- * Pipeline into Dataflow objects.
- */
- public Translator(Pipeline pipeline, DataflowPipelineRunner runner) {
- this.pipeline = pipeline;
- this.runner = runner;
- }
-
- /**
- * Translates this Translator's pipeline onto its writer.
- * @return a Job definition filled in with the type of job, the environment,
- * and the job steps.
- */
- public Job translate(List<DataflowPackage> packages) {
- job.setName(options.getJobName().toLowerCase());
-
- Environment environment = new Environment();
- job.setEnvironment(environment);
-
- try {
- environment.setSdkPipelineOptions(
- MAPPER.readValue(MAPPER.writeValueAsBytes(options), Map.class));
- } catch (IOException e) {
- throw new IllegalArgumentException(
- "PipelineOptions specified failed to serialize to JSON.", e);
- }
-
- WorkerPool workerPool = new WorkerPool();
-
- if (options.getTeardownPolicy() != null) {
- workerPool.setTeardownPolicy(options.getTeardownPolicy().getTeardownPolicyName());
- }
-
- if (options.isStreaming()) {
- job.setType("JOB_TYPE_STREAMING");
- } else {
- job.setType("JOB_TYPE_BATCH");
- workerPool.setDiskType(options.getWorkerDiskType());
- }
-
- if (options.getWorkerMachineType() != null) {
- workerPool.setMachineType(options.getWorkerMachineType());
- }
-
- workerPool.setPackages(packages);
- workerPool.setNumWorkers(options.getNumWorkers());
-
- if (options.isStreaming()) {
- // Use separate data disk for streaming.
- Disk disk = new Disk();
- disk.setDiskType(options.getWorkerDiskType());
- workerPool.setDataDisks(Collections.singletonList(disk));
- }
- if (!Strings.isNullOrEmpty(options.getZone())) {
- workerPool.setZone(options.getZone());
- }
- if (!Strings.isNullOrEmpty(options.getNetwork())) {
- workerPool.setNetwork(options.getNetwork());
- }
- if (!Strings.isNullOrEmpty(options.getSubnetwork())) {
- workerPool.setSubnetwork(options.getSubnetwork());
- }
- if (options.getDiskSizeGb() > 0) {
- workerPool.setDiskSizeGb(options.getDiskSizeGb());
- }
- AutoscalingSettings settings = new AutoscalingSettings();
- if (options.getAutoscalingAlgorithm() != null) {
- settings.setAlgorithm(options.getAutoscalingAlgorithm().getAlgorithm());
- }
- settings.setMaxNumWorkers(options.getMaxNumWorkers());
- workerPool.setAutoscalingSettings(settings);
-
- List<WorkerPool> workerPools = new LinkedList<>();
-
- workerPools.add(workerPool);
- environment.setWorkerPools(workerPools);
-
- pipeline.traverseTopologically(this);
- return job;
- }
-
- @Override
- public DataflowPipelineOptions getPipelineOptions() {
- return options;
- }
-
- @Override
- public <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
- return (InputT) getCurrentTransform(transform).getInput();
- }
-
- @Override
- public <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
- return (OutputT) getCurrentTransform(transform).getOutput();
- }
-
- @Override
- public String getFullName(PTransform<?, ?> transform) {
- return getCurrentTransform(transform).getFullName();
- }
-
- private AppliedPTransform<?, ?, ?> getCurrentTransform(PTransform<?, ?> transform) {
- checkArgument(
- currentTransform != null && currentTransform.getTransform() == transform,
- "can only be called with current transform");
- return currentTransform;
- }
-
- @Override
- public void enterCompositeTransform(TransformTreeNode node) {
- }
-
- @Override
- public void leaveCompositeTransform(TransformTreeNode node) {
- }
-
- @Override
- public void visitTransform(TransformTreeNode node) {
- PTransform<?, ?> transform = node.getTransform();
- TransformTranslator translator =
- getTransformTranslator(transform.getClass());
- if (translator == null) {
- throw new IllegalStateException(
- "no translator registered for " + transform);
- }
- LOG.debug("Translating {}", transform);
- currentTransform = AppliedPTransform.of(
- node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform);
- translator.translate(transform, this);
- currentTransform = null;
- }
-
- @Override
- public void visitValue(PValue value, TransformTreeNode producer) {
- LOG.debug("Checking translation of {}", value);
- if (value.getProducingTransformInternal() == null) {
- throw new RuntimeException(
- "internal error: expecting a PValue "
- + "to have a producingTransform");
- }
- if (!producer.isCompositeNode()) {
- // Primitive transforms are the only ones assigned step names.
- asOutputReference(value);
- }
- }
-
- @Override
- public void addStep(PTransform<?, ?> transform, String type) {
- String stepName = genStepName();
- if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
- throw new IllegalArgumentException(
- transform + " already has a name specified");
- }
- // Start the next "steps" list item.
- List<Step> steps = job.getSteps();
- if (steps == null) {
- steps = new LinkedList<>();
- job.setSteps(steps);
- }
-
- currentStep = new Step();
- currentStep.setName(stepName);
- currentStep.setKind(type);
- steps.add(currentStep);
- addInput(PropertyNames.USER_NAME, getFullName(transform));
- addDisplayData(PropertyNames.DISPLAY_DATA, DisplayData.from(transform));
- }
-
- @Override
- public void addStep(PTransform<?, ? extends PValue> transform, Step original) {
- Step step = original.clone();
- String stepName = step.getName();
- if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
- throw new IllegalArgumentException(transform + " already has a name specified");
- }
-
- Map<String, Object> properties = step.getProperties();
- if (properties != null) {
- @Nullable List<Map<String, Object>> outputInfoList = null;
- try {
- // TODO: This should be done via a Structs accessor.
- @Nullable List<Map<String, Object>> list =
- (List<Map<String, Object>>) properties.get(PropertyNames.OUTPUT_INFO);
- outputInfoList = list;
- } catch (Exception e) {
- throw new RuntimeException("Inconsistent dataflow pipeline translation", e);
- }
- if (outputInfoList != null && outputInfoList.size() > 0) {
- Map<String, Object> firstOutputPort = outputInfoList.get(0);
- @Nullable String name;
- try {
- name = getString(firstOutputPort, PropertyNames.OUTPUT_NAME);
- } catch (Exception e) {
- name = null;
- }
- if (name != null) {
- registerOutputName(getOutput(transform), name);
- }
- }
- }
-
- List<Step> steps = job.getSteps();
- if (steps == null) {
- steps = new LinkedList<>();
- job.setSteps(steps);
- }
- currentStep = step;
- steps.add(step);
- }
-
- @Override
- public void addEncodingInput(Coder<?> coder) {
- CloudObject encoding = SerializableUtils.ensureSerializable(coder);
- addObject(getProperties(), PropertyNames.ENCODING, encoding);
- }
-
- @Override
- public void addInput(String name, Boolean value) {
- addBoolean(getProperties(), name, value);
- }
-
- @Override
- public void addInput(String name, String value) {
- addString(getProperties(), name, value);
- }
-
- @Override
- public void addInput(String name, Long value) {
- addLong(getProperties(), name, value);
- }
-
- @Override
- public void addInput(String name, Map<String, Object> elements) {
- addDictionary(getProperties(), name, elements);
- }
-
- @Override
- public void addInput(String name, List<? extends Map<String, Object>> elements) {
- addList(getProperties(), name, elements);
- }
-
- @Override
- public void addInput(String name, PInput value) {
- if (value instanceof PValue) {
- addInput(name, asOutputReference((PValue) value));
- } else {
- throw new IllegalStateException("Input must be a PValue");
- }
- }
-
- @Override
- public void addOutput(String name, PValue value) {
- Coder<?> coder;
- if (value instanceof TypedPValue) {
- coder = ((TypedPValue<?>) value).getCoder();
- if (value instanceof PCollection) {
- // Wrap the PCollection element Coder inside a WindowedValueCoder.
- coder = WindowedValue.getFullCoder(
- coder,
- ((PCollection<?>) value).getWindowingStrategy().getWindowFn().windowCoder());
- }
- } else {
- // No output coder to encode.
- coder = null;
- }
- addOutput(name, value, coder);
- }
-
- @Override
- public void addValueOnlyOutput(String name, PValue value) {
- Coder<?> coder;
- if (value instanceof TypedPValue) {
- coder = ((TypedPValue<?>) value).getCoder();
- if (value instanceof PCollection) {
- // Wrap the PCollection element Coder inside a ValueOnly
- // WindowedValueCoder.
- coder = WindowedValue.getValueOnlyCoder(coder);
- }
- } else {
- // No output coder to encode.
- coder = null;
- }
- addOutput(name, value, coder);
- }
-
- @Override
- public void addCollectionToSingletonOutput(String name,
- PValue inputValue,
- PValue outputValue) {
- Coder<?> inputValueCoder =
- Preconditions.checkNotNull(outputCoders.get(inputValue));
- // The inputValueCoder for the input PCollection should be some
- // WindowedValueCoder of the input PCollection's element
- // coder.
- Preconditions.checkState(
- inputValueCoder instanceof WindowedValue.WindowedValueCoder);
- // The outputValueCoder for the output should be an
- // IterableCoder of the inputValueCoder. This is a property
- // of the backend "CollectionToSingleton" step.
- Coder<?> outputValueCoder = IterableCoder.of(inputValueCoder);
- addOutput(name, outputValue, outputValueCoder);
- }
-
- /**
- * Adds an output with the given name to the previously added
- * Dataflow step, producing the specified output {@code PValue}
- * with the given {@code Coder} (if not {@code null}).
- */
- private void addOutput(String name, PValue value, Coder<?> valueCoder) {
- registerOutputName(value, name);
-
- Map<String, Object> properties = getProperties();
- @Nullable List<Map<String, Object>> outputInfoList = null;
- try {
- // TODO: This should be done via a Structs accessor.
- outputInfoList = (List<Map<String, Object>>) properties.get(PropertyNames.OUTPUT_INFO);
- } catch (Exception e) {
- throw new RuntimeException("Inconsistent dataflow pipeline translation", e);
- }
- if (outputInfoList == null) {
- outputInfoList = new ArrayList<>();
- // TODO: This should be done via a Structs accessor.
- properties.put(PropertyNames.OUTPUT_INFO, outputInfoList);
- }
-
- Map<String, Object> outputInfo = new HashMap<>();
- addString(outputInfo, PropertyNames.OUTPUT_NAME, name);
- addString(outputInfo, PropertyNames.USER_NAME, value.getName());
- if (value instanceof PCollection
- && runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) {
- addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true);
- }
- if (valueCoder != null) {
- // Verify that encoding can be decoded, in order to catch serialization
- // failures as early as possible.
- CloudObject encoding = SerializableUtils.ensureSerializable(valueCoder);
- addObject(outputInfo, PropertyNames.ENCODING, encoding);
- outputCoders.put(value, valueCoder);
- }
-
- outputInfoList.add(outputInfo);
- }
-
- private void addDisplayData(String name, DisplayData displayData) {
- List<Map<String, Object>> serializedItems = Lists.newArrayList();
- for (DisplayData.Item item : displayData.items()) {
- serializedItems.add(MAPPER.convertValue(item, Map.class));
- }
-
- addList(getProperties(), name, serializedItems);
- }
-
- @Override
- public OutputReference asOutputReference(PValue value) {
- AppliedPTransform<?, ?, ?> transform =
- value.getProducingTransformInternal();
- String stepName = stepNames.get(transform);
- if (stepName == null) {
- throw new IllegalArgumentException(transform + " doesn't have a name specified");
- }
-
- String outputName = outputNames.get(value);
- if (outputName == null) {
- throw new IllegalArgumentException(
- "output " + value + " doesn't have a name specified");
- }
-
- return new OutputReference(stepName, outputName);
- }
-
- private Map<String, Object> getProperties() {
- Map<String, Object> properties = currentStep.getProperties();
- if (properties == null) {
- properties = new HashMap<>();
- currentStep.setProperties(properties);
- }
- return properties;
- }
-
- /**
- * Returns a fresh Dataflow step name.
- */
- private String genStepName() {
- return "s" + (stepNames.size() + 1);
- }
-
- /**
- * Records the name of the given output PValue,
- * within its producing transform.
- */
- private void registerOutputName(POutput value, String name) {
- if (outputNames.put(value, name) != null) {
- throw new IllegalArgumentException(
- "output " + value + " already has a name specified");
- }
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- @Override
- public String toString() {
- return "DataflowPipelineTranslator#" + hashCode();
- }
-
-
- ///////////////////////////////////////////////////////////////////////////
-
- static {
- registerTransformTranslator(
- View.CreatePCollectionView.class,
- new TransformTranslator<View.CreatePCollectionView>() {
- @Override
- public void translate(
- View.CreatePCollectionView transform,
- TranslationContext context) {
- translateTyped(transform, context);
- }
-
- private <ElemT, ViewT> void translateTyped(
- View.CreatePCollectionView<ElemT, ViewT> transform,
- TranslationContext context) {
- context.addStep(transform, "CollectionToSingleton");
- context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
- context.addCollectionToSingletonOutput(
- PropertyNames.OUTPUT,
- context.getInput(transform),
- context.getOutput(transform));
- }
- });
-
- DataflowPipelineTranslator.registerTransformTranslator(
- Combine.GroupedValues.class,
- new DataflowPipelineTranslator.TransformTranslator<Combine.GroupedValues>() {
- @Override
- public void translate(
- Combine.GroupedValues transform,
- DataflowPipelineTranslator.TranslationContext context) {
- translateHelper(transform, context);
- }
-
- private <K, InputT, OutputT> void translateHelper(
- final Combine.GroupedValues<K, InputT, OutputT> transform,
- DataflowPipelineTranslator.TranslationContext context) {
- context.addStep(transform, "CombineValues");
- translateInputs(context.getInput(transform), transform.getSideInputs(), context);
-
- AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn =
- transform.getAppliedFn(
- context.getInput(transform).getPipeline().getCoderRegistry(),
- context.getInput(transform).getCoder(),
- context.getInput(transform).getWindowingStrategy());
-
- context.addEncodingInput(fn.getAccumulatorCoder());
- context.addInput(
- PropertyNames.SERIALIZED_FN,
- byteArrayToJsonString(serializeToByteArray(fn)));
- context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
- }
- });
-
- registerTransformTranslator(
- Create.Values.class,
- new TransformTranslator<Create.Values>() {
- @Override
- public void translate(
- Create.Values transform,
- TranslationContext context) {
- createHelper(transform, context);
- }
-
- private <T> void createHelper(
- Create.Values<T> transform,
- TranslationContext context) {
- context.addStep(transform, "CreateCollection");
-
- Coder<T> coder = context.getOutput(transform).getCoder();
- List<CloudObject> elements = new LinkedList<>();
- for (T elem : transform.getElements()) {
- byte[] encodedBytes;
- try {
- encodedBytes = encodeToByteArray(coder, elem);
- } catch (CoderException exn) {
- // TODO: Put in better element printing:
- // truncate if too long.
- throw new IllegalArgumentException(
- "Unable to encode element '" + elem + "' of transform '" + transform
- + "' using coder '" + coder + "'.",
- exn);
- }
- String encodedJson = byteArrayToJsonString(encodedBytes);
- assert Arrays.equals(encodedBytes,
- jsonStringToByteArray(encodedJson));
- elements.add(CloudObject.forString(encodedJson));
- }
- context.addInput(PropertyNames.ELEMENT, elements);
- context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
- }
- });
-
- registerTransformTranslator(
- Flatten.FlattenPCollectionList.class,
- new TransformTranslator<Flatten.FlattenPCollectionList>() {
- @Override
- public void translate(
- Flatten.FlattenPCollectionList transform,
- TranslationContext context) {
- flattenHelper(transform, context);
- }
-
- private <T> void flattenHelper(
- Flatten.FlattenPCollectionList<T> transform,
- TranslationContext context) {
- context.addStep(transform, "Flatten");
-
- List<OutputReference> inputs = new LinkedList<>();
- for (PCollection<T> input : context.getInput(transform).getAll()) {
- inputs.add(context.asOutputReference(input));
- }
- context.addInput(PropertyNames.INPUTS, inputs);
- context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
- }
- });
-
- registerTransformTranslator(
- GroupByKeyAndSortValuesOnly.class,
- new TransformTranslator<GroupByKeyAndSortValuesOnly>() {
- @Override
- public void translate(
- GroupByKeyAndSortValuesOnly transform,
- TranslationContext context) {
- groupByKeyAndSortValuesHelper(transform, context);
- }
-
- private <K1, K2, V> void groupByKeyAndSortValuesHelper(
- GroupByKeyAndSortValuesOnly<K1, K2, V> transform,
- TranslationContext context) {
- context.addStep(transform, "GroupByKey");
- context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
- context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
- context.addInput(PropertyNames.SORT_VALUES, true);
-
- // TODO: Add support for combiner lifting once the need arises.
- context.addInput(
- PropertyNames.DISALLOW_COMBINER_LIFTING, true);
- }
- });
-
- registerTransformTranslator(
- GroupByKey.class,
- new TransformTranslator<GroupByKey>() {
- @Override
- public void translate(
- GroupByKey transform,
- TranslationContext context) {
- groupByKeyHelper(transform, context);
- }
-
- private <K, V> void groupByKeyHelper(
- GroupByKey<K, V> transform,
- TranslationContext context) {
- context.addStep(transform, "GroupByKey");
- context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
- context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-
- WindowingStrategy<?, ?> windowingStrategy =
- context.getInput(transform).getWindowingStrategy();
- boolean isStreaming =
- context.getPipelineOptions().as(StreamingOptions.class).isStreaming();
- boolean disallowCombinerLifting =
- !windowingStrategy.getWindowFn().isNonMerging()
- || (isStreaming && !transform.fewKeys())
- // TODO: Allow combiner lifting on the non-default trigger, as appropriate.
- || !(windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger);
- context.addInput(
- PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting);
- context.addInput(
- PropertyNames.SERIALIZED_FN,
- byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
- context.addInput(
- PropertyNames.IS_MERGING_WINDOW_FN,
- !windowingStrategy.getWindowFn().isNonMerging());
- }
- });
-
- registerTransformTranslator(
- ParDo.BoundMulti.class,
- new TransformTranslator<ParDo.BoundMulti>() {
- @Override
- public void translate(
- ParDo.BoundMulti transform,
- TranslationContext context) {
- translateMultiHelper(transform, context);
- }
-
- private <InputT, OutputT> void translateMultiHelper(
- ParDo.BoundMulti<InputT, OutputT> transform,
- TranslationContext context) {
- context.addStep(transform, "ParallelDo");
- translateInputs(context.getInput(transform), transform.getSideInputs(), context);
- translateFn(transform.getFn(), context.getInput(transform).getWindowingStrategy(),
- transform.getSideInputs(), context.getInput(transform).getCoder(), context);
- translateOutputs(context.getOutput(transform), context);
- }
- });
-
- registerTransformTranslator(
- ParDo.Bound.class,
- new TransformTranslator<ParDo.Bound>() {
- @Override
- public void translate(
- ParDo.Bound transform,
- TranslationContext context) {
- translateSingleHelper(transform, context);
- }
-
- private <InputT, OutputT> void translateSingleHelper(
- ParDo.Bound<InputT, OutputT> transform,
- TranslationContext context) {
- context.addStep(transform, "ParallelDo");
- translateInputs(context.getInput(transform), transform.getSideInputs(), context);
- translateFn(
- transform.getFn(),
- context.getInput(transform).getWindowingStrategy(),
- transform.getSideInputs(), context.getInput(transform).getCoder(), context);
- context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
- }
- });
-
- registerTransformTranslator(
- Window.Bound.class,
- new DataflowPipelineTranslator.TransformTranslator<Window.Bound>() {
- @Override
- public void translate(
- Window.Bound transform, TranslationContext context) {
- translateHelper(transform, context);
- }
-
- private <T> void translateHelper(
- Window.Bound<T> transform, TranslationContext context) {
- context.addStep(transform, "Bucket");
- context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
- context.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
-
- WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy();
- byte[] serializedBytes = serializeToByteArray(strategy);
- String serializedJson = byteArrayToJsonString(serializedBytes);
- assert Arrays.equals(serializedBytes,
- jsonStringToByteArray(serializedJson));
- context.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
- }
- });
-
- ///////////////////////////////////////////////////////////////////////////
- // IO Translation.
-
- registerTransformTranslator(
- BigQueryIO.Read.Bound.class, new BigQueryIOTranslator.ReadTranslator());
- registerTransformTranslator(
- BigQueryIO.Write.Bound.class, new BigQueryIOTranslator.WriteTranslator());
-
- registerTransformTranslator(
- PubsubIO.Read.Bound.class, new PubsubIOTranslator.ReadTranslator());
- registerTransformTranslator(
- DataflowPipelineRunner.StreamingPubsubIOWrite.class,
- new PubsubIOTranslator.WriteTranslator());
-
- registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
- }
-
- private static void translateInputs(
- PCollection<?> input,
- List<PCollectionView<?>> sideInputs,
- TranslationContext context) {
- context.addInput(PropertyNames.PARALLEL_INPUT, input);
- translateSideInputs(sideInputs, context);
- }
-
- // Used for ParDo
- private static void translateSideInputs(
- List<PCollectionView<?>> sideInputs,
- TranslationContext context) {
- Map<String, Object> nonParInputs = new HashMap<>();
-
- for (PCollectionView<?> view : sideInputs) {
- nonParInputs.put(
- view.getTagInternal().getId(),
- context.asOutputReference(view));
- }
-
- context.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs);
- }
-
- private static void translateFn(
- DoFn fn,
- WindowingStrategy windowingStrategy,
- Iterable<PCollectionView<?>> sideInputs,
- Coder inputCoder,
- TranslationContext context) {
- context.addInput(PropertyNames.USER_FN, fn.getClass().getName());
- context.addInput(
- PropertyNames.SERIALIZED_FN,
- byteArrayToJsonString(serializeToByteArray(
- new DoFnInfo(fn, windowingStrategy, sideInputs, inputCoder))));
- }
-
- private static void translateOutputs(
- PCollectionTuple outputs,
- TranslationContext context) {
- for (Map.Entry<TupleTag<?>, PCollection<?>> entry
- : outputs.getAll().entrySet()) {
- TupleTag<?> tag = entry.getKey();
- PCollection<?> output = entry.getValue();
- context.addOutput(tag.getId(), output);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowServiceException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowServiceException.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowServiceException.java
deleted file mode 100644
index 8b9f3f4..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowServiceException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import javax.annotation.Nullable;
-
-/**
- * Signals there was an error retrieving information about a job from the Cloud Dataflow Service.
- */
-public class DataflowServiceException extends DataflowJobException {
- DataflowServiceException(DataflowPipelineJob job, String message) {
- this(job, message, null);
- }
-
- DataflowServiceException(DataflowPipelineJob job, String message, @Nullable Throwable cause) {
- super(job, message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AssignWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AssignWindows.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AssignWindows.java
deleted file mode 100644
index 974c3a9..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AssignWindows.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * A primitive {@link PTransform} that implements the {@link Window#into(WindowFn)}
- * {@link PTransform}.
- *
- * For an application of {@link Window#into(WindowFn)} that changes the {@link WindowFn}, applies
- * a primitive {@link PTransform} in the Dataflow service.
- *
- * For an application of {@link Window#into(WindowFn)} that does not change the {@link WindowFn},
- * applies an identity {@link ParDo} and sets the windowing strategy of the output
- * {@link PCollection}.
- *
- * For internal use only.
- *
- * @param <T> the type of input element
- */
-public class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
- private final Window.Bound<T> transform;
-
- /**
- * Builds an instance of this class from the overriden transform.
- */
- @SuppressWarnings("unused") // Used via reflection
- public AssignWindows(Window.Bound<T> transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollection<T> apply(PCollection<T> input) {
- WindowingStrategy<?, ?> outputStrategy =
- transform.getOutputStrategyInternal(input.getWindowingStrategy());
- if (transform.getWindowFn() != null) {
- // If the windowFn changed, we create a primitive, and run the AssignWindows operation here.
- return PCollection.<T>createPrimitiveOutputInternal(
- input.getPipeline(), outputStrategy, input.isBounded());
- } else {
- // If the windowFn didn't change, we just run a pass-through transform and then set the
- // new windowing strategy.
- return input.apply(ParDo.named("Identity").of(new DoFn<T, T>() {
- @Override
- public void processElement(DoFn<T, T>.ProcessContext c) throws Exception {
- c.output(c.element());
- }
- })).setWindowingStrategyInternal(outputStrategy);
- }
- }
-
- @Override
- public void validate(PCollection<T> input) {
- transform.validate(input);
- }
-
- @Override
- protected Coder<?> getDefaultOutputCoder(PCollection<T> input) {
- return input.getCoder();
- }
-
- @Override
- protected String getKindString() {
- return "Window.Into()";
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/BigQueryIOTranslator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/BigQueryIOTranslator.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/BigQueryIOTranslator.java
deleted file mode 100644
index 203ce4f..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/BigQueryIOTranslator.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import com.google.api.client.json.JsonFactory;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.Transport;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * BigQuery transform support code for the Dataflow backend.
- */
-public class BigQueryIOTranslator {
- private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
- private static final Logger LOG = LoggerFactory.getLogger(BigQueryIOTranslator.class);
-
- /**
- * Implements BigQueryIO Read translation for the Dataflow backend.
- */
- public static class ReadTranslator
- implements DataflowPipelineTranslator.TransformTranslator<BigQueryIO.Read.Bound> {
-
- @Override
- public void translate(
- BigQueryIO.Read.Bound transform, DataflowPipelineTranslator.TranslationContext context) {
- // Actual translation.
- context.addStep(transform, "ParallelRead");
- context.addInput(PropertyNames.FORMAT, "bigquery");
- context.addInput(PropertyNames.BIGQUERY_EXPORT_FORMAT, "FORMAT_AVRO");
-
- if (transform.getQuery() != null) {
- context.addInput(PropertyNames.BIGQUERY_QUERY, transform.getQuery());
- context.addInput(PropertyNames.BIGQUERY_FLATTEN_RESULTS, transform.getFlattenResults());
- } else {
- TableReference table = transform.getTable();
- if (table.getProjectId() == null) {
- // If user does not specify a project we assume the table to be located in the project
- // that owns the Dataflow job.
- String projectIdFromOptions = context.getPipelineOptions().getProject();
- LOG.warn(String.format(BigQueryIO.SET_PROJECT_FROM_OPTIONS_WARNING, table.getDatasetId(),
- table.getDatasetId(), table.getTableId(), projectIdFromOptions));
- table.setProjectId(projectIdFromOptions);
- }
-
- context.addInput(PropertyNames.BIGQUERY_TABLE, table.getTableId());
- context.addInput(PropertyNames.BIGQUERY_DATASET, table.getDatasetId());
- if (table.getProjectId() != null) {
- context.addInput(PropertyNames.BIGQUERY_PROJECT, table.getProjectId());
- }
- }
- context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
- }
- }
-
- /**
- * Implements BigQueryIO Write translation for the Dataflow backend.
- */
- public static class WriteTranslator
- implements DataflowPipelineTranslator.TransformTranslator<BigQueryIO.Write.Bound> {
-
- @Override
- public void translate(BigQueryIO.Write.Bound transform,
- DataflowPipelineTranslator.TranslationContext context) {
- if (context.getPipelineOptions().isStreaming()) {
- // Streaming is handled by the streaming runner.
- throw new AssertionError(
- "BigQueryIO is specified to use streaming write in batch mode.");
- }
-
- TableReference table = transform.getTable();
-
- // Actual translation.
- context.addStep(transform, "ParallelWrite");
- context.addInput(PropertyNames.FORMAT, "bigquery");
- context.addInput(PropertyNames.BIGQUERY_TABLE,
- table.getTableId());
- context.addInput(PropertyNames.BIGQUERY_DATASET,
- table.getDatasetId());
- if (table.getProjectId() != null) {
- context.addInput(PropertyNames.BIGQUERY_PROJECT, table.getProjectId());
- }
- if (transform.getSchema() != null) {
- try {
- context.addInput(PropertyNames.BIGQUERY_SCHEMA,
- JSON_FACTORY.toString(transform.getSchema()));
- } catch (IOException exn) {
- throw new IllegalArgumentException("Invalid table schema.", exn);
- }
- }
- context.addInput(
- PropertyNames.BIGQUERY_CREATE_DISPOSITION,
- transform.getCreateDisposition().name());
- context.addInput(
- PropertyNames.BIGQUERY_WRITE_DISPOSITION,
- transform.getWriteDisposition().name());
- // Set sink encoding to TableRowJsonCoder.
- context.addEncodingInput(
- WindowedValue.getValueOnlyCoder(TableRowJsonCoder.of()));
- context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java
deleted file mode 100644
index 3473c41..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/CustomSources.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import static com.google.api.client.util.Base64.encodeBase64String;
-import static com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray;
-import static com.google.cloud.dataflow.sdk.util.Structs.addString;
-import static com.google.cloud.dataflow.sdk.util.Structs.addStringList;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.services.dataflow.model.SourceMetadata;
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.Source;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * A helper class for supporting sources defined as {@code Source}.
- *
- * <p>Provides a bridge between the high-level {@code Source} API and the
- * low-level {@code CloudSource} class.
- */
-public class CustomSources {
- private static final String SERIALIZED_SOURCE = "serialized_source";
- @VisibleForTesting static final String SERIALIZED_SOURCE_SPLITS = "serialized_source_splits";
- /**
- * The current limit on the size of a ReportWorkItemStatus RPC to Google Cloud Dataflow, which
- * includes the initial splits, is 20 MB.
- */
- public static final long DATAFLOW_SPLIT_RESPONSE_API_SIZE_BYTES = 20 * (1 << 20);
-
- private static final Logger LOG = LoggerFactory.getLogger(CustomSources.class);
-
- private static final ByteString firstSplitKey = ByteString.copyFromUtf8("0000000000000001");
-
- public static boolean isFirstUnboundedSourceSplit(ByteString splitKey) {
- return splitKey.equals(firstSplitKey);
- }
-
- private static int getDesiredNumUnboundedSourceSplits(DataflowPipelineOptions options) {
- if (options.getMaxNumWorkers() > 0) {
- return options.getMaxNumWorkers();
- } else if (options.getNumWorkers() > 0) {
- return options.getNumWorkers() * 3;
- } else {
- return 20;
- }
- }
-
- public static com.google.api.services.dataflow.model.Source serializeToCloudSource(
- Source<?> source, PipelineOptions options) throws Exception {
- com.google.api.services.dataflow.model.Source cloudSource =
- new com.google.api.services.dataflow.model.Source();
- // We ourselves act as the SourceFormat.
- cloudSource.setSpec(CloudObject.forClass(CustomSources.class));
- addString(
- cloudSource.getSpec(), SERIALIZED_SOURCE, encodeBase64String(serializeToByteArray(source)));
-
- SourceMetadata metadata = new SourceMetadata();
- if (source instanceof BoundedSource) {
- BoundedSource<?> boundedSource = (BoundedSource<?>) source;
- try {
- metadata.setProducesSortedKeys(boundedSource.producesSortedKeys(options));
- } catch (Exception e) {
- LOG.warn("Failed to check if the source produces sorted keys: " + source, e);
- }
-
- // Size estimation is best effort so we continue even if it fails here.
- try {
- metadata.setEstimatedSizeBytes(boundedSource.getEstimatedSizeBytes(options));
- } catch (Exception e) {
- LOG.warn("Size estimation of the source failed: " + source, e);
- }
- } else if (source instanceof UnboundedSource) {
- UnboundedSource<?, ?> unboundedSource = (UnboundedSource<?, ?>) source;
- metadata.setInfinite(true);
- List<String> encodedSplits = new ArrayList<>();
- int desiredNumSplits =
- getDesiredNumUnboundedSourceSplits(options.as(DataflowPipelineOptions.class));
- for (UnboundedSource<?, ?> split :
- unboundedSource.generateInitialSplits(desiredNumSplits, options)) {
- encodedSplits.add(encodeBase64String(serializeToByteArray(split)));
- }
- checkArgument(!encodedSplits.isEmpty(), "UnboundedSources must have at least one split");
- addStringList(cloudSource.getSpec(), SERIALIZED_SOURCE_SPLITS, encodedSplits);
- } else {
- throw new IllegalArgumentException("Unexpected source kind: " + source.getClass());
- }
-
- cloudSource.setMetadata(metadata);
- return cloudSource;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowAggregatorTransforms.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowAggregatorTransforms.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowAggregatorTransforms.java
deleted file mode 100644
index 5c1dd95..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowAggregatorTransforms.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-
-/**
- * A mapping relating {@link Aggregator}s and the {@link PTransform} in which they are used.
- */
-public class DataflowAggregatorTransforms {
- private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms;
- private final Multimap<PTransform<?, ?>, AppliedPTransform<?, ?, ?>> transformAppliedTransforms;
- private final BiMap<AppliedPTransform<?, ?, ?>, String> appliedStepNames;
-
- public DataflowAggregatorTransforms(
- Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms,
- Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
- this.aggregatorTransforms = aggregatorTransforms;
- appliedStepNames = HashBiMap.create(transformStepNames);
-
- transformAppliedTransforms = HashMultimap.create();
- for (AppliedPTransform<?, ?, ?> appliedTransform : transformStepNames.keySet()) {
- transformAppliedTransforms.put(appliedTransform.getTransform(), appliedTransform);
- }
- }
-
- /**
- * Returns true if the provided {@link Aggregator} is used in the constructing {@link Pipeline}.
- */
- public boolean contains(Aggregator<?, ?> aggregator) {
- return aggregatorTransforms.containsKey(aggregator);
- }
-
- /**
- * Gets the step names in which the {@link Aggregator} is used.
- */
- public Collection<String> getAggregatorStepNames(Aggregator<?, ?> aggregator) {
- Collection<String> names = new HashSet<>();
- Collection<PTransform<?, ?>> transforms = aggregatorTransforms.get(aggregator);
- for (PTransform<?, ?> transform : transforms) {
- for (AppliedPTransform<?, ?, ?> applied : transformAppliedTransforms.get(transform)) {
- names.add(appliedStepNames.get(applied));
- }
- }
- return names;
- }
-
- /**
- * Gets the {@link PTransform} that was assigned the provided step name.
- */
- public AppliedPTransform<?, ?, ?> getAppliedTransformForStepName(String stepName) {
- return appliedStepNames.inverse().get(stepName);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowMetricUpdateExtractor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowMetricUpdateExtractor.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowMetricUpdateExtractor.java
deleted file mode 100644
index 8de1038..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/DataflowMetricUpdateExtractor.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Methods for extracting the values of an {@link Aggregator} from a collection of {@link
- * MetricUpdate MetricUpdates}.
- */
-public final class DataflowMetricUpdateExtractor {
- private static final String STEP_NAME_CONTEXT_KEY = "step";
- private static final String IS_TENTATIVE_KEY = "tentative";
-
- private DataflowMetricUpdateExtractor() {
- // Do not instantiate.
- }
-
- /**
- * Extract the values of the provided {@link Aggregator} at each {@link PTransform} it was used in
- * according to the provided {@link DataflowAggregatorTransforms} from the given list of {@link
- * MetricUpdate MetricUpdates}.
- */
- public static <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator,
- DataflowAggregatorTransforms aggregatorTransforms, List<MetricUpdate> metricUpdates) {
- Map<String, OutputT> results = new HashMap<>();
- if (metricUpdates == null) {
- return results;
- }
-
- String aggregatorName = aggregator.getName();
- Collection<String> aggregatorSteps = aggregatorTransforms.getAggregatorStepNames(aggregator);
-
- for (MetricUpdate metricUpdate : metricUpdates) {
- MetricStructuredName metricStructuredName = metricUpdate.getName();
- Map<String, String> context = metricStructuredName.getContext();
- if (metricStructuredName.getName().equals(aggregatorName) && context != null
- && aggregatorSteps.contains(context.get(STEP_NAME_CONTEXT_KEY))) {
- AppliedPTransform<?, ?, ?> transform =
- aggregatorTransforms.getAppliedTransformForStepName(
- context.get(STEP_NAME_CONTEXT_KEY));
- String fullName = transform.getFullName();
- // Prefer the tentative (fresher) value if it exists.
- if (Boolean.parseBoolean(context.get(IS_TENTATIVE_KEY)) || !results.containsKey(fullName)) {
- results.put(fullName, toValue(aggregator, metricUpdate));
- }
- }
- }
-
- return results;
-
- }
-
- private static <OutputT> OutputT toValue(
- Aggregator<?, OutputT> aggregator, MetricUpdate metricUpdate) {
- CombineFn<?, ?, OutputT> combineFn = aggregator.getCombineFn();
- Class<? super OutputT> outputType = combineFn.getOutputType().getRawType();
-
- if (outputType.equals(Long.class)) {
- @SuppressWarnings("unchecked")
- OutputT asLong = (OutputT) Long.valueOf(toNumber(metricUpdate).longValue());
- return asLong;
- }
- if (outputType.equals(Integer.class)) {
- @SuppressWarnings("unchecked")
- OutputT asInt = (OutputT) Integer.valueOf(toNumber(metricUpdate).intValue());
- return asInt;
- }
- if (outputType.equals(Double.class)) {
- @SuppressWarnings("unchecked")
- OutputT asDouble = (OutputT) Double.valueOf(toNumber(metricUpdate).doubleValue());
- return asDouble;
- }
- throw new UnsupportedOperationException(
- "Unsupported Output Type " + outputType + " in aggregator " + aggregator);
- }
-
- private static Number toNumber(MetricUpdate update) {
- if (update.getScalar() instanceof Number) {
- return (Number) update.getScalar();
- }
- throw new IllegalArgumentException(
- "Metric Update " + update + " does not have a numeric scalar");
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java
deleted file mode 100644
index b54d5c6..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/PubsubIOTranslator.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-
-/**
- * Pubsub transform support code for the Dataflow backend.
- */
-public class PubsubIOTranslator {
-
- /**
- * Implements PubsubIO Read translation for the Dataflow backend.
- */
- public static class ReadTranslator<T> implements TransformTranslator<PubsubIO.Read.Bound<T>> {
- @Override
- @SuppressWarnings({"rawtypes", "unchecked"})
- public void translate(
- PubsubIO.Read.Bound transform,
- TranslationContext context) {
- translateReadHelper(transform, context);
- }
-
- private <T> void translateReadHelper(
- PubsubIO.Read.Bound<T> transform,
- TranslationContext context) {
- if (!context.getPipelineOptions().isStreaming()) {
- throw new IllegalArgumentException(
- "PubsubIO.Read can only be used with the Dataflow streaming runner.");
- }
-
- context.addStep(transform, "ParallelRead");
- context.addInput(PropertyNames.FORMAT, "pubsub");
- if (transform.getTopic() != null) {
- context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path());
- }
- if (transform.getSubscription() != null) {
- context.addInput(
- PropertyNames.PUBSUB_SUBSCRIPTION, transform.getSubscription().asV1Beta1Path());
- }
- if (transform.getTimestampLabel() != null) {
- context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel());
- }
- if (transform.getIdLabel() != null) {
- context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
- }
- context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
- }
- }
-
- /**
- * Implements PubsubIO Write translation for the Dataflow backend.
- */
- public static class WriteTranslator<T>
- implements TransformTranslator<DataflowPipelineRunner.StreamingPubsubIOWrite<T>> {
-
- @Override
- @SuppressWarnings({"rawtypes", "unchecked"})
- public void translate(
- DataflowPipelineRunner.StreamingPubsubIOWrite transform,
- TranslationContext context) {
- translateWriteHelper(transform, context);
- }
-
- private <T> void translateWriteHelper(
- DataflowPipelineRunner.StreamingPubsubIOWrite<T> customTransform,
- TranslationContext context) {
- if (!context.getPipelineOptions().isStreaming()) {
- throw new IllegalArgumentException(
- "PubsubIO.Write is non-primitive for the Dataflow batch runner.");
- }
-
- PubsubIO.Write.Bound<T> transform = customTransform.getOverriddenTransform();
-
- context.addStep(customTransform, "ParallelWrite");
- context.addInput(PropertyNames.FORMAT, "pubsub");
- context.addInput(PropertyNames.PUBSUB_TOPIC, transform.getTopic().asV1Beta1Path());
- if (transform.getTimestampLabel() != null) {
- context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL, transform.getTimestampLabel());
- }
- if (transform.getIdLabel() != null) {
- context.addInput(PropertyNames.PUBSUB_ID_LABEL, transform.getIdLabel());
- }
- context.addEncodingInput(WindowedValue.getValueOnlyCoder(transform.getCoder()));
- context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(customTransform));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java
deleted file mode 100644
index 4998af3..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/ReadTranslator.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addBoolean;
-import static com.google.cloud.dataflow.sdk.util.Structs.addDictionary;
-import static com.google.cloud.dataflow.sdk.util.Structs.addLong;
-
-import com.google.api.services.dataflow.model.SourceMetadata;
-import com.google.cloud.dataflow.sdk.io.FileBasedSource;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.Source;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.values.PValue;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Translator for the {@code Read} {@code PTransform} for the Dataflow back-end.
- */
-public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
- @Override
- public void translate(Read.Bounded<?> transform, TranslationContext context) {
- translateReadHelper(transform.getSource(), transform, context);
- }
-
- public static <T> void translateReadHelper(Source<T> source,
- PTransform<?, ? extends PValue> transform,
- DataflowPipelineTranslator.TranslationContext context) {
- try {
- // TODO: Move this validation out of translation once IOChannelUtils is portable
- // and can be reconstructed on the worker.
- if (source instanceof FileBasedSource) {
- String filePatternOrSpec = ((FileBasedSource<?>) source).getFileOrPatternSpec();
- context.getPipelineOptions()
- .getPathValidator()
- .validateInputFilePatternSupported(filePatternOrSpec);
- }
-
- context.addStep(transform, "ParallelRead");
- context.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
- context.addInput(
- PropertyNames.SOURCE_STEP_INPUT,
- cloudSourceToDictionary(
- CustomSources.serializeToCloudSource(source, context.getPipelineOptions())));
- context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- // Represents a cloud Source as a dictionary for encoding inside the {@code SOURCE_STEP_INPUT}
- // property of CloudWorkflowStep.input.
- private static Map<String, Object> cloudSourceToDictionary(
- com.google.api.services.dataflow.model.Source source) {
- // Do not translate encoding - the source's encoding is translated elsewhere
- // to the step's output info.
- Map<String, Object> res = new HashMap<>();
- addDictionary(res, PropertyNames.SOURCE_SPEC, source.getSpec());
- if (source.getMetadata() != null) {
- addDictionary(res, PropertyNames.SOURCE_METADATA,
- cloudSourceMetadataToDictionary(source.getMetadata()));
- }
- if (source.getDoesNotNeedSplitting() != null) {
- addBoolean(
- res, PropertyNames.SOURCE_DOES_NOT_NEED_SPLITTING, source.getDoesNotNeedSplitting());
- }
- return res;
- }
-
- private static Map<String, Object> cloudSourceMetadataToDictionary(SourceMetadata metadata) {
- Map<String, Object> res = new HashMap<>();
- if (metadata.getProducesSortedKeys() != null) {
- addBoolean(res, PropertyNames.SOURCE_PRODUCES_SORTED_KEYS, metadata.getProducesSortedKeys());
- }
- if (metadata.getEstimatedSizeBytes() != null) {
- addLong(res, PropertyNames.SOURCE_ESTIMATED_SIZE_BYTES, metadata.getEstimatedSizeBytes());
- }
- if (metadata.getInfinite() != null) {
- addBoolean(res, PropertyNames.SOURCE_IS_INFINITE, metadata.getInfinite());
- }
- return res;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java
deleted file mode 100644
index d1484ce..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Implementation of the {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}.
- */
-package com.google.cloud.dataflow.sdk.runners.dataflow;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b4857cc/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java
deleted file mode 100644
index 2f920a6..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineOptions.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.testing;
-
-import com.google.cloud.dataflow.sdk.options.BlockingDataflowPipelineOptions;
-
-/**
- * A set of options used to configure the {@link TestPipeline}.
- */
-public interface TestDataflowPipelineOptions extends BlockingDataflowPipelineOptions {
-
-}