You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/06 20:07:56 UTC
[1/5] beam git commit: Add explicit translation builder for a Step to
in Dataflow translator
Repository: beam
Updated Branches:
refs/heads/master b4d870272 -> a6caa82a6
Add explicit translation builder for a Step to in Dataflow translator
Previously, there was always a "current" step that was the most recent
step created. This makes it cumbersome or impossible to do things like
translate one primitive transform into a small subgraph of steps. Thus
we added hacks like CreatePCollectionView which are not actually part
of the model at all - in fact, we should be able to add the needed
CollectionToSingleton steps simply by looking at the side inputs of a
ParDo node.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f04537cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f04537cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f04537cc
Branch: refs/heads/master
Commit: f04537ccbc2897ea4337941d5ca8121432daef43
Parents: b4d8702
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 21 14:34:27 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 6 11:24:49 2017 -0800
----------------------------------------------------------------------
.../dataflow/DataflowPipelineTranslator.java | 313 ++++++++++---------
.../beam/runners/dataflow/DataflowRunner.java | 60 ++--
.../dataflow/internal/ReadTranslator.java | 9 +-
.../runners/dataflow/DataflowRunnerTest.java | 5 +-
4 files changed, 196 insertions(+), 191 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f04537cc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 8d2b076..2385fa1 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -213,14 +213,12 @@ public class DataflowPipelineTranslator {
}
/**
- * A {@link TransformTranslator} knows how to translate
- * a particular subclass of {@link PTransform} for the
- * Cloud Dataflow service. It does so by
- * mutating the {@link TranslationContext}.
+ * A {@link TransformTranslator} knows how to translate a particular subclass of {@link
+ * PTransform} for the Cloud Dataflow service. It does so by mutating the {@link
+ * TranslationContext}.
*/
public interface TransformTranslator<TransformT extends PTransform> {
- void translate(TransformT transform,
- TranslationContext context);
+ void translate(TransformT transform, TranslationContext context);
}
/**
@@ -252,10 +250,8 @@ public class DataflowPipelineTranslator {
/**
* Adds a step to the Dataflow workflow for the given transform, with
* the given Dataflow step type.
- * This step becomes "current" for the purpose of {@link #addInput} and
- * {@link #addOutput}.
*/
- void addStep(PTransform<?, ?> transform, String type);
+ StepTranslationContext addStep(PTransform<?, ?> transform, String type);
/**
* Adds a pre-defined step to the Dataflow workflow. The given PTransform should be
@@ -264,8 +260,14 @@ public class DataflowPipelineTranslator {
* <p>This is a low-level operation, when using this method it is up to
* the caller to ensure that names do not collide.
*/
- void addStep(PTransform<?, ? extends PValue> transform, Step step);
+ Step addStep(PTransform<?, ? extends PValue> transform, Step step);
+ /**
+ * Encode a PValue reference as an output reference.
+ */
+ OutputReference asOutputReference(PValue value);
+ }
+ public interface StepTranslationContext {
/**
* Sets the encoding for the current Dataflow step.
*/
@@ -330,12 +332,7 @@ public class DataflowPipelineTranslator {
* output encoding. Returns a pipeline level unique id.
*/
long addCollectionToSingletonOutput(PValue inputValue,
- PValue outputValue);
-
- /**
- * Encode a PValue reference as an output reference.
- */
- OutputReference asOutputReference(PValue value);
+ PValue outputValue);
}
@@ -343,6 +340,8 @@ public class DataflowPipelineTranslator {
/**
* Translates a Pipeline into the Dataflow representation.
+ *
+ * <p>For internal use only.
*/
class Translator extends PipelineVisitor.Defaults implements TranslationContext {
/**
@@ -368,11 +367,6 @@ public class DataflowPipelineTranslator {
private final Job job = new Job();
/**
- * Translator is stateful, as addProperty calls refer to the current step.
- */
- private Step currentStep;
-
- /**
* A Map from AppliedPTransform to their unique Dataflow step names.
*/
private final Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
@@ -546,7 +540,7 @@ public class DataflowPipelineTranslator {
}
@Override
- public void addStep(PTransform<?, ?> transform, String type) {
+ public StepTranslator addStep(PTransform<?, ?> transform, String type) {
String stepName = genStepName();
if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
throw new IllegalArgumentException(
@@ -559,16 +553,19 @@ public class DataflowPipelineTranslator {
job.setSteps(steps);
}
- currentStep = new Step();
- currentStep.setName(stepName);
- currentStep.setKind(type);
- steps.add(currentStep);
- addInput(PropertyNames.USER_NAME, getFullName(transform));
- addDisplayData(stepName, transform);
+ Step step = new Step();
+ step.setName(stepName);
+ step.setKind(type);
+ steps.add(step);
+
+ StepTranslator stepContext = new StepTranslator(this, step);
+ stepContext.addInput(PropertyNames.USER_NAME, getFullName(transform));
+ stepContext.addDisplayData(step, stepName, transform);
+ return stepContext;
}
@Override
- public void addStep(PTransform<?, ? extends PValue> transform, Step original) {
+ public Step addStep(PTransform<?, ? extends PValue> transform, Step original) {
Step step = original.clone();
String stepName = step.getName();
if (stepNames.put(getCurrentTransform(transform), stepName) != null) {
@@ -605,8 +602,59 @@ public class DataflowPipelineTranslator {
steps = new LinkedList<>();
job.setSteps(steps);
}
- currentStep = step;
steps.add(step);
+ return step;
+ }
+
+ @Override
+ public OutputReference asOutputReference(PValue value) {
+ AppliedPTransform<?, ?, ?> transform =
+ value.getProducingTransformInternal();
+ String stepName = stepNames.get(transform);
+ if (stepName == null) {
+ throw new IllegalArgumentException(transform + " doesn't have a name specified");
+ }
+
+ String outputName = outputNames.get(value);
+ if (outputName == null) {
+ throw new IllegalArgumentException(
+ "output " + value + " doesn't have a name specified");
+ }
+
+ return new OutputReference(stepName, outputName);
+ }
+
+ /**
+ * Returns a fresh Dataflow step name.
+ */
+ private String genStepName() {
+ return "s" + (stepNames.size() + 1);
+ }
+
+ /**
+ * Records the name of the given output PValue,
+ * within its producing transform.
+ */
+ private void registerOutputName(POutput value, String name) {
+ if (outputNames.put(value, name) != null) {
+ throw new IllegalArgumentException(
+ "output " + value + " already has a name specified");
+ }
+ }
+ }
+
+ static class StepTranslator implements StepTranslationContext {
+
+ private final Translator translator;
+ private final Step step;
+
+ private StepTranslator(Translator translator, Step step) {
+ this.translator = translator;
+ this.step = step;
+ }
+
+ private Map<String, Object> getProperties() {
+ return DataflowPipelineTranslator.getProperties(step);
}
@Override
@@ -643,7 +691,7 @@ public class DataflowPipelineTranslator {
@Override
public void addInput(String name, PInput value) {
if (value instanceof PValue) {
- addInput(name, asOutputReference((PValue) value));
+ addInput(name, translator.asOutputReference((PValue) value));
} else {
throw new IllegalStateException("Input must be a PValue");
}
@@ -685,10 +733,10 @@ public class DataflowPipelineTranslator {
}
@Override
- public long addCollectionToSingletonOutput(PValue inputValue,
- PValue outputValue) {
+ public long addCollectionToSingletonOutput(
+ PValue inputValue, PValue outputValue) {
Coder<?> inputValueCoder =
- checkNotNull(outputCoders.get(inputValue));
+ checkNotNull(translator.outputCoders.get(inputValue));
// The inputValueCoder for the input PCollection should be some
// WindowedValueCoder of the input PCollection's element
// coder.
@@ -707,8 +755,8 @@ public class DataflowPipelineTranslator {
* with the given {@code Coder} (if not {@code null}).
*/
private long addOutput(PValue value, Coder<?> valueCoder) {
- long id = idGenerator.get();
- registerOutputName(value, Long.toString(id));
+ long id = translator.idGenerator.get();
+ translator.registerOutputName(value, Long.toString(id));
Map<String, Object> properties = getProperties();
@Nullable List<Map<String, Object>> outputInfoList = null;
@@ -728,7 +776,7 @@ public class DataflowPipelineTranslator {
addString(outputInfo, PropertyNames.OUTPUT_NAME, Long.toString(id));
addString(outputInfo, PropertyNames.USER_NAME, value.getName());
if (value instanceof PCollection
- && runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) {
+ && translator.runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) {
addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true);
}
if (valueCoder != null) {
@@ -736,63 +784,19 @@ public class DataflowPipelineTranslator {
// failures as early as possible.
CloudObject encoding = SerializableUtils.ensureSerializable(valueCoder);
addObject(outputInfo, PropertyNames.ENCODING, encoding);
- outputCoders.put(value, valueCoder);
+ translator.outputCoders.put(value, valueCoder);
}
outputInfoList.add(outputInfo);
return id;
}
- private void addDisplayData(String stepName, HasDisplayData hasDisplayData) {
+ private void addDisplayData(Step step, String stepName, HasDisplayData hasDisplayData) {
DisplayData displayData = DisplayData.from(hasDisplayData);
List<Map<String, Object>> list = MAPPER.convertValue(displayData, List.class);
addList(getProperties(), PropertyNames.DISPLAY_DATA, list);
}
- @Override
- public OutputReference asOutputReference(PValue value) {
- AppliedPTransform<?, ?, ?> transform =
- value.getProducingTransformInternal();
- String stepName = stepNames.get(transform);
- if (stepName == null) {
- throw new IllegalArgumentException(transform + " doesn't have a name specified");
- }
-
- String outputName = outputNames.get(value);
- if (outputName == null) {
- throw new IllegalArgumentException(
- "output " + value + " doesn't have a name specified");
- }
-
- return new OutputReference(stepName, outputName);
- }
-
- private Map<String, Object> getProperties() {
- Map<String, Object> properties = currentStep.getProperties();
- if (properties == null) {
- properties = new HashMap<>();
- currentStep.setProperties(properties);
- }
- return properties;
- }
-
- /**
- * Returns a fresh Dataflow step name.
- */
- private String genStepName() {
- return "s" + (stepNames.size() + 1);
- }
-
- /**
- * Records the name of the given output PValue,
- * within its producing transform.
- */
- private void registerOutputName(POutput value, String name) {
- if (outputNames.put(value, name) != null) {
- throw new IllegalArgumentException(
- "output " + value + " already has a name specified");
- }
- }
}
/////////////////////////////////////////////////////////////////////////////
@@ -802,6 +806,14 @@ public class DataflowPipelineTranslator {
return "DataflowPipelineTranslator#" + hashCode();
}
+ private static Map<String, Object> getProperties(Step step) {
+ Map<String, Object> properties = step.getProperties();
+ if (properties == null) {
+ properties = new HashMap<>();
+ step.setProperties(properties);
+ }
+ return properties;
+ }
///////////////////////////////////////////////////////////////////////////
@@ -810,20 +822,17 @@ public class DataflowPipelineTranslator {
View.CreatePCollectionView.class,
new TransformTranslator<View.CreatePCollectionView>() {
@Override
- public void translate(
- View.CreatePCollectionView transform,
- TranslationContext context) {
+ public void translate(View.CreatePCollectionView transform, TranslationContext context) {
translateTyped(transform, context);
}
private <ElemT, ViewT> void translateTyped(
- View.CreatePCollectionView<ElemT, ViewT> transform,
- TranslationContext context) {
- context.addStep(transform, "CollectionToSingleton");
- context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
- context.addCollectionToSingletonOutput(
- context.getInput(transform),
- context.getOutput(transform));
+ View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) {
+ StepTranslationContext stepContext =
+ context.addStep(transform, "CollectionToSingleton");
+ stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+ stepContext.addCollectionToSingletonOutput(
+ context.getInput(transform), context.getOutput(transform));
}
});
@@ -839,21 +848,21 @@ public class DataflowPipelineTranslator {
private <K, InputT, OutputT> void translateHelper(
final Combine.GroupedValues<K, InputT, OutputT> transform,
- DataflowPipelineTranslator.TranslationContext context) {
- context.addStep(transform, "CombineValues");
- translateInputs(context.getInput(transform), transform.getSideInputs(), context);
+ TranslationContext context) {
+ StepTranslationContext stepContext = context.addStep(transform, "CombineValues");
+ translateInputs(
+ stepContext, context.getInput(transform), transform.getSideInputs(), context);
AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn =
transform.getAppliedFn(
context.getInput(transform).getPipeline().getCoderRegistry(),
- context.getInput(transform).getCoder(),
- context.getInput(transform).getWindowingStrategy());
+ context.getInput(transform).getCoder(),
+ context.getInput(transform).getWindowingStrategy());
- context.addEncodingInput(fn.getAccumulatorCoder());
- context.addInput(
- PropertyNames.SERIALIZED_FN,
- byteArrayToJsonString(serializeToByteArray(fn)));
- context.addOutput(context.getOutput(transform));
+ stepContext.addEncodingInput(fn.getAccumulatorCoder());
+ stepContext.addInput(
+ PropertyNames.SERIALIZED_FN, byteArrayToJsonString(serializeToByteArray(fn)));
+ stepContext.addOutput(context.getOutput(transform));
}
});
@@ -870,14 +879,14 @@ public class DataflowPipelineTranslator {
private <T> void flattenHelper(
Flatten.FlattenPCollectionList<T> transform,
TranslationContext context) {
- context.addStep(transform, "Flatten");
+ StepTranslationContext stepContext = context.addStep(transform, "Flatten");
List<OutputReference> inputs = new LinkedList<>();
for (PCollection<T> input : context.getInput(transform).getAll()) {
inputs.add(context.asOutputReference(input));
}
- context.addInput(PropertyNames.INPUTS, inputs);
- context.addOutput(context.getOutput(transform));
+ stepContext.addInput(PropertyNames.INPUTS, inputs);
+ stepContext.addOutput(context.getOutput(transform));
}
});
@@ -885,23 +894,19 @@ public class DataflowPipelineTranslator {
GroupByKeyAndSortValuesOnly.class,
new TransformTranslator<GroupByKeyAndSortValuesOnly>() {
@Override
- public void translate(
- GroupByKeyAndSortValuesOnly transform,
- TranslationContext context) {
+ public void translate(GroupByKeyAndSortValuesOnly transform, TranslationContext context) {
groupByKeyAndSortValuesHelper(transform, context);
}
private <K1, K2, V> void groupByKeyAndSortValuesHelper(
- GroupByKeyAndSortValuesOnly<K1, K2, V> transform,
- TranslationContext context) {
- context.addStep(transform, "GroupByKey");
- context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
- context.addOutput(context.getOutput(transform));
- context.addInput(PropertyNames.SORT_VALUES, true);
+ GroupByKeyAndSortValuesOnly<K1, K2, V> transform, TranslationContext context) {
+ StepTranslationContext stepContext = context.addStep(transform, "GroupByKey");
+ stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+ stepContext.addOutput(context.getOutput(transform));
+ stepContext.addInput(PropertyNames.SORT_VALUES, true);
// TODO: Add support for combiner lifting once the need arises.
- context.addInput(
- PropertyNames.DISALLOW_COMBINER_LIFTING, true);
+ stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, true);
}
});
@@ -918,9 +923,9 @@ public class DataflowPipelineTranslator {
private <K, V> void groupByKeyHelper(
GroupByKey<K, V> transform,
TranslationContext context) {
- context.addStep(transform, "GroupByKey");
- context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
- context.addOutput(context.getOutput(transform));
+ StepTranslationContext stepContext = context.addStep(transform, "GroupByKey");
+ stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+ stepContext.addOutput(context.getOutput(transform));
WindowingStrategy<?, ?> windowingStrategy =
context.getInput(transform).getWindowingStrategy();
@@ -931,12 +936,12 @@ public class DataflowPipelineTranslator {
|| (isStreaming && !transform.fewKeys())
// TODO: Allow combiner lifting on the non-default trigger, as appropriate.
|| !(windowingStrategy.getTrigger() instanceof DefaultTrigger);
- context.addInput(
+ stepContext.addInput(
PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting);
- context.addInput(
+ stepContext.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(serializeToByteArray(windowingStrategy)));
- context.addInput(
+ stepContext.addInput(
PropertyNames.IS_MERGING_WINDOW_FN,
!windowingStrategy.getWindowFn().isNonMerging());
}
@@ -946,22 +951,21 @@ public class DataflowPipelineTranslator {
ParDo.BoundMulti.class,
new TransformTranslator<ParDo.BoundMulti>() {
@Override
- public void translate(
- ParDo.BoundMulti transform,
- TranslationContext context) {
+ public void translate(ParDo.BoundMulti transform, TranslationContext context) {
translateMultiHelper(transform, context);
}
private <InputT, OutputT> void translateMultiHelper(
- ParDo.BoundMulti<InputT, OutputT> transform,
- TranslationContext context) {
- rejectStatefulDoFn(transform.getFn());
+ ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
+ DataflowPipelineTranslator.rejectStatefulDoFn(transform.getFn());
- context.addStep(transform, "ParallelDo");
- translateInputs(context.getInput(transform), transform.getSideInputs(), context);
+ StepTranslationContext stepContext = context.addStep(transform, "ParallelDo");
+ translateInputs(
+ stepContext, context.getInput(transform), transform.getSideInputs(), context);
BiMap<Long, TupleTag<?>> outputMap =
- translateOutputs(context.getOutput(transform), context);
+ translateOutputs(context.getOutput(transform), stepContext);
translateFn(
+ stepContext,
transform.getFn(),
context.getInput(transform).getWindowingStrategy(),
transform.getSideInputs(),
@@ -976,30 +980,28 @@ public class DataflowPipelineTranslator {
ParDo.Bound.class,
new TransformTranslator<ParDo.Bound>() {
@Override
- public void translate(
- ParDo.Bound transform,
- TranslationContext context) {
+ public void translate(ParDo.Bound transform, TranslationContext context) {
translateSingleHelper(transform, context);
}
private <InputT, OutputT> void translateSingleHelper(
- ParDo.Bound<InputT, OutputT> transform,
- TranslationContext context) {
+ ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
rejectStatefulDoFn(transform.getFn());
- context.addStep(transform, "ParallelDo");
- translateInputs(context.getInput(transform), transform.getSideInputs(), context);
- long mainOutput = context.addOutput(context.getOutput(transform));
+ StepTranslationContext stepContext = context.addStep(transform, "ParallelDo");
+ translateInputs(
+ stepContext, context.getInput(transform), transform.getSideInputs(), context);
+ long mainOutput = stepContext.addOutput(context.getOutput(transform));
translateFn(
+ stepContext,
transform.getFn(),
context.getInput(transform).getWindowingStrategy(),
transform.getSideInputs(),
context.getInput(transform).getCoder(),
context,
mainOutput,
- ImmutableMap.<Long, TupleTag<?>>of(mainOutput,
- new TupleTag<>(PropertyNames.OUTPUT)));
-
+ ImmutableMap.<Long, TupleTag<?>>of(
+ mainOutput, new TupleTag<>(PropertyNames.OUTPUT)));
}
});
@@ -1014,16 +1016,16 @@ public class DataflowPipelineTranslator {
private <T> void translateHelper(
Window.Bound<T> transform, TranslationContext context) {
- context.addStep(transform, "Bucket");
- context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
- context.addOutput(context.getOutput(transform));
+ StepTranslationContext stepContext = context.addStep(transform, "Bucket");
+ stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+ stepContext.addOutput(context.getOutput(transform));
WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy();
byte[] serializedBytes = serializeToByteArray(strategy);
String serializedJson = byteArrayToJsonString(serializedBytes);
assert Arrays.equals(serializedBytes,
jsonStringToByteArray(serializedJson));
- context.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
+ stepContext.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
}
});
@@ -1046,15 +1048,17 @@ public class DataflowPipelineTranslator {
}
private static void translateInputs(
+ StepTranslationContext stepContext,
PCollection<?> input,
List<PCollectionView<?>> sideInputs,
TranslationContext context) {
- context.addInput(PropertyNames.PARALLEL_INPUT, input);
- translateSideInputs(sideInputs, context);
+ stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
+ translateSideInputs(stepContext, sideInputs, context);
}
// Used for ParDo
private static void translateSideInputs(
+ StepTranslationContext stepContext,
List<PCollectionView<?>> sideInputs,
TranslationContext context) {
Map<String, Object> nonParInputs = new HashMap<>();
@@ -1065,10 +1069,11 @@ public class DataflowPipelineTranslator {
context.asOutputReference(view));
}
- context.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs);
+ stepContext.addInput(PropertyNames.NON_PARALLEL_INPUTS, nonParInputs);
}
private static void translateFn(
+ StepTranslationContext stepContext,
DoFn fn,
WindowingStrategy windowingStrategy,
Iterable<PCollectionView<?>> sideInputs,
@@ -1076,8 +1081,8 @@ public class DataflowPipelineTranslator {
TranslationContext context,
long mainOutput,
Map<Long, TupleTag<?>> outputMap) {
- context.addInput(PropertyNames.USER_FN, fn.getClass().getName());
- context.addInput(
+ stepContext.addInput(PropertyNames.USER_FN, fn.getClass().getName());
+ stepContext.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(
serializeToByteArray(
@@ -1087,13 +1092,13 @@ public class DataflowPipelineTranslator {
private static BiMap<Long, TupleTag<?>> translateOutputs(
PCollectionTuple outputs,
- TranslationContext context) {
+ StepTranslationContext stepContext) {
ImmutableBiMap.Builder<Long, TupleTag<?>> mapBuilder = ImmutableBiMap.builder();
for (Map.Entry<TupleTag<?>, PCollection<?>> entry
: outputs.getAll().entrySet()) {
TupleTag<?> tag = entry.getKey();
PCollection<?> output = entry.getValue();
- mapBuilder.put(context.addOutput(output), tag);
+ mapBuilder.put(stepContext.addOutput(output), tag);
}
return mapBuilder.build();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f04537cc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 03e5dfc..d2c1e66 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -33,6 +33,7 @@ import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.ListJobsResponse;
+import com.google.api.services.dataflow.model.Step;
import com.google.api.services.dataflow.model.WorkerPool;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
@@ -72,6 +73,7 @@ import java.util.SortedSet;
import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
+import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
import org.apache.beam.runners.dataflow.internal.AssignWindows;
@@ -2116,50 +2118,46 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
}
- /**
- * Rewrite {@link StreamingPubsubIORead} to the appropriate internal node.
- */
- private static class StreamingPubsubIOReadTranslator<T> implements
- TransformTranslator<StreamingPubsubIORead<T>> {
+ /** Rewrite {@link StreamingPubsubIORead} to the appropriate internal node. */
+ private static class StreamingPubsubIOReadTranslator<T>
+ implements TransformTranslator<StreamingPubsubIORead<T>> {
@Override
- public void translate(
- StreamingPubsubIORead<T> transform,
- TranslationContext context) {
- checkArgument(context.getPipelineOptions().isStreaming(),
- "StreamingPubsubIORead is only for streaming pipelines.");
+ public void translate(StreamingPubsubIORead<T> transform, TranslationContext context) {
+ checkArgument(
+ context.getPipelineOptions().isStreaming(),
+ "StreamingPubsubIORead is only for streaming pipelines.");
PubsubUnboundedSource<T> overriddenTransform = transform.getOverriddenTransform();
- context.addStep(transform, "ParallelRead");
- context.addInput(PropertyNames.FORMAT, "pubsub");
+ StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
+ stepContext.addInput(PropertyNames.FORMAT, "pubsub");
if (overriddenTransform.getTopicProvider() != null) {
if (overriddenTransform.getTopicProvider().isAccessible()) {
- context.addInput(
+ stepContext.addInput(
PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getV1Beta1Path());
} else {
- context.addInput(
+ stepContext.addInput(
PropertyNames.PUBSUB_TOPIC_OVERRIDE,
((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName());
}
}
if (overriddenTransform.getSubscriptionProvider() != null) {
if (overriddenTransform.getSubscriptionProvider().isAccessible()) {
- context.addInput(
+ stepContext.addInput(
PropertyNames.PUBSUB_SUBSCRIPTION,
overriddenTransform.getSubscription().getV1Beta1Path());
} else {
- context.addInput(
+ stepContext.addInput(
PropertyNames.PUBSUB_SUBSCRIPTION_OVERRIDE,
- ((NestedValueProvider) overriddenTransform.getSubscriptionProvider())
- .propertyName());
+ ((NestedValueProvider) overriddenTransform.getSubscriptionProvider()).propertyName());
}
}
if (overriddenTransform.getTimestampLabel() != null) {
- context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL,
- overriddenTransform.getTimestampLabel());
+ stepContext.addInput(
+ PropertyNames.PUBSUB_TIMESTAMP_LABEL, overriddenTransform.getTimestampLabel());
}
if (overriddenTransform.getIdLabel() != null) {
- context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
+ stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
}
- context.addValueOnlyOutput(context.getOutput(transform));
+ stepContext.addValueOnlyOutput(context.getOutput(transform));
}
}
@@ -2211,26 +2209,26 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
checkArgument(context.getPipelineOptions().isStreaming(),
"StreamingPubsubIOWrite is only for streaming pipelines.");
PubsubUnboundedSink<T> overriddenTransform = transform.getOverriddenTransform();
- context.addStep(transform, "ParallelWrite");
- context.addInput(PropertyNames.FORMAT, "pubsub");
+ StepTranslationContext stepContext = context.addStep(transform, "ParallelWrite");
+ stepContext.addInput(PropertyNames.FORMAT, "pubsub");
if (overriddenTransform.getTopicProvider().isAccessible()) {
- context.addInput(
+ stepContext.addInput(
PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getV1Beta1Path());
} else {
- context.addInput(
+ stepContext.addInput(
PropertyNames.PUBSUB_TOPIC_OVERRIDE,
((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName());
}
if (overriddenTransform.getTimestampLabel() != null) {
- context.addInput(PropertyNames.PUBSUB_TIMESTAMP_LABEL,
- overriddenTransform.getTimestampLabel());
+ stepContext.addInput(
+ PropertyNames.PUBSUB_TIMESTAMP_LABEL, overriddenTransform.getTimestampLabel());
}
if (overriddenTransform.getIdLabel() != null) {
- context.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
+ stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
}
- context.addEncodingInput(
+ stepContext.addEncodingInput(
WindowedValue.getValueOnlyCoder(overriddenTransform.getElementCoder()));
- context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
+ stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f04537cc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
index 84950f7..1a5a9a5 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
@@ -25,6 +25,7 @@ import com.google.api.services.dataflow.model.SourceMetadata;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator;
+import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
import org.apache.beam.sdk.io.FileBasedSource;
@@ -60,13 +61,13 @@ public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
}
}
- context.addStep(transform, "ParallelRead");
- context.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
- context.addInput(
+ StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
+ stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
+ stepContext.addInput(
PropertyNames.SOURCE_STEP_INPUT,
cloudSourceToDictionary(
CustomSources.serializeToCloudSource(source, context.getPipelineOptions())));
- context.addValueOnlyOutput(context.getOutput(transform));
+ stepContext.addValueOnlyOutput(context.getOutput(transform));
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f04537cc/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 21d575a..a19fd8c 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -61,6 +61,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
+import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsList;
import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMap;
import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMultimap;
@@ -998,8 +999,8 @@ public class DataflowRunnerTest {
// Note: This is about the minimum needed to fake out a
// translation. This obviously isn't a real translation.
- context.addStep(transform, "TestTranslate");
- context.addOutput(context.getOutput(transform));
+ StepTranslationContext stepContext = context.addStep(transform, "TestTranslate");
+ stepContext.addOutput(context.getOutput(transform));
}
});
[2/5] beam git commit: Move some pieces of Dataflow translator to top
level
Posted by ke...@apache.org.
Move some pieces of Dataflow translator to top level
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d2cb3e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d2cb3e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d2cb3e2
Branch: refs/heads/master
Commit: 5d2cb3e2310dbf7046785e9e8f6403b854b2dd03
Parents: f04537c
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jan 5 16:51:23 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 6 11:36:51 2017 -0800
----------------------------------------------------------------------
.../dataflow/DataflowPipelineTranslator.java | 134 +------------------
.../beam/runners/dataflow/DataflowRunner.java | 8 +-
.../runners/dataflow/TransformTranslator.java | 123 +++++++++++++++++
.../dataflow/internal/ReadTranslator.java | 7 +-
.../DataflowPipelineTranslatorTest.java | 3 +-
.../runners/dataflow/DataflowRunnerTest.java | 5 +-
6 files changed, 137 insertions(+), 143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 2385fa1..e9cf6f4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -56,6 +56,8 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowRunner.GroupByKeyAndSortValuesOnly;
+import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
+import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext;
import org.apache.beam.runners.dataflow.internal.ReadTranslator;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.DoFnInfo;
@@ -69,6 +71,7 @@ import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.GroupedValues;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -80,6 +83,7 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.Window.Bound;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.PropertyNames;
@@ -212,130 +216,6 @@ public class DataflowPipelineTranslator {
return transformTranslators.get(transformClass);
}
- /**
- * A {@link TransformTranslator} knows how to translate a particular subclass of {@link
- * PTransform} for the Cloud Dataflow service. It does so by mutating the {@link
- * TranslationContext}.
- */
- public interface TransformTranslator<TransformT extends PTransform> {
- void translate(TransformT transform, TranslationContext context);
- }
-
- /**
- * The interface provided to registered callbacks for interacting
- * with the {@link DataflowRunner}, including reading and writing the
- * values of {@link PCollection}s and side inputs ({@link PCollectionView}s).
- */
- public interface TranslationContext {
- /**
- * Returns the configured pipeline options.
- */
- DataflowPipelineOptions getPipelineOptions();
-
- /**
- * Returns the input of the currently being translated transform.
- */
- <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform);
-
- /**
- * Returns the output of the currently being translated transform.
- */
- <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform);
-
- /**
- * Returns the full name of the currently being translated transform.
- */
- String getFullName(PTransform<?, ?> transform);
-
- /**
- * Adds a step to the Dataflow workflow for the given transform, with
- * the given Dataflow step type.
- */
- StepTranslationContext addStep(PTransform<?, ?> transform, String type);
-
- /**
- * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be
- * consistent with the Step, in terms of input, output and coder types.
- *
- * <p>This is a low-level operation, when using this method it is up to
- * the caller to ensure that names do not collide.
- */
- Step addStep(PTransform<?, ? extends PValue> transform, Step step);
- /**
- * Encode a PValue reference as an output reference.
- */
- OutputReference asOutputReference(PValue value);
- }
-
- public interface StepTranslationContext {
- /**
- * Sets the encoding for the current Dataflow step.
- */
- void addEncodingInput(Coder<?> value);
-
- /**
- * Adds an input with the given name and value to the current
- * Dataflow step.
- */
- void addInput(String name, Boolean value);
-
- /**
- * Adds an input with the given name and value to the current
- * Dataflow step.
- */
- void addInput(String name, String value);
-
- /**
- * Adds an input with the given name and value to the current
- * Dataflow step.
- */
- void addInput(String name, Long value);
-
- /**
- * Adds an input with the given name to the previously added Dataflow
- * step, coming from the specified input PValue.
- */
- void addInput(String name, PInput value);
-
- /**
- * Adds an input that is a dictionary of strings to objects.
- */
- void addInput(String name, Map<String, Object> elements);
-
- /**
- * Adds an input that is a list of objects.
- */
- void addInput(String name, List<? extends Map<String, Object>> elements);
-
- /**
- * Adds an output to the previously added Dataflow step,
- * producing the specified output {@code PValue},
- * including its {@code Coder} if a {@code TypedPValue}. If the
- * {@code PValue} is a {@code PCollection}, wraps its coder inside
- * a {@code WindowedValueCoder}. Returns a pipeline level unique id.
- */
- long addOutput(PValue value);
-
- /**
- * Adds an output to the previously added Dataflow step,
- * producing the specified output {@code PValue},
- * including its {@code Coder} if a {@code TypedPValue}. If the
- * {@code PValue} is a {@code PCollection}, wraps its coder inside
- * a {@code ValueOnlyCoder}. Returns a pipeline level unique id.
- */
- long addValueOnlyOutput(PValue value);
-
- /**
- * Adds an output to the previously added CollectionToSingleton Dataflow step,
- * consuming the specified input {@code PValue} and producing the specified output
- * {@code PValue}. This step requires special treatment for its
- * output encoding. Returns a pipeline level unique id.
- */
- long addCollectionToSingletonOutput(PValue inputValue,
- PValue outputValue);
- }
-
-
/////////////////////////////////////////////////////////////////////////////
/**
@@ -838,11 +718,11 @@ public class DataflowPipelineTranslator {
DataflowPipelineTranslator.registerTransformTranslator(
Combine.GroupedValues.class,
- new DataflowPipelineTranslator.TransformTranslator<Combine.GroupedValues>() {
+ new TransformTranslator<GroupedValues>() {
@Override
public void translate(
Combine.GroupedValues transform,
- DataflowPipelineTranslator.TranslationContext context) {
+ TranslationContext context) {
translateHelper(transform, context);
}
@@ -1007,7 +887,7 @@ public class DataflowPipelineTranslator {
registerTransformTranslator(
Window.Bound.class,
- new DataflowPipelineTranslator.TransformTranslator<Window.Bound>() {
+ new TransformTranslator<Bound>() {
@Override
public void translate(
Window.Bound transform, TranslationContext context) {
http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index d2c1e66..9da7d24 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -33,7 +33,6 @@ import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.ListJobsResponse;
-import com.google.api.services.dataflow.model.Step;
import com.google.api.services.dataflow.model.WorkerPool;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
@@ -73,9 +72,6 @@ import java.util.SortedSet;
import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
import org.apache.beam.runners.dataflow.internal.AssignWindows;
import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
import org.apache.beam.runners.dataflow.internal.DataflowUnboundedReadFromBoundedSource;
@@ -2315,10 +2311,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
private static class ReadWithIdsTranslator
- implements DataflowPipelineTranslator.TransformTranslator<ReadWithIds<?>> {
+ implements TransformTranslator<ReadWithIds<?>> {
@Override
public void translate(ReadWithIds<?> transform,
- DataflowPipelineTranslator.TranslationContext context) {
+ TranslationContext context) {
ReadTranslator.translateReadHelper(transform.getSource(), transform, context);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
new file mode 100644
index 0000000..2aa8327
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import com.google.api.services.dataflow.model.Step;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.util.OutputReference;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+
+/**
+ * A {@link TransformTranslator} knows how to translate a particular subclass of {@link PTransform}
+ * for the Cloud Dataflow service. It does so by mutating the {@link TranslationContext}.
+ */
+public interface TransformTranslator<TransformT extends PTransform> {
+ void translate(TransformT transform, TranslationContext context);
+
+ /**
+ * The interface provided to registered callbacks for interacting with the {@link DataflowRunner},
+ * including reading and writing the values of {@link PCollection}s and side inputs.
+ */
+ interface TranslationContext {
+ /** Returns the configured pipeline options. */
+ DataflowPipelineOptions getPipelineOptions();
+
+ /** Returns the input of the currently being translated transform. */
+ <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform);
+
+ /** Returns the output of the currently being translated transform. */
+ <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform);
+
+ /** Returns the full name of the currently being translated transform. */
+ String getFullName(PTransform<?, ?> transform);
+
+ /**
+ * Adds a step to the Dataflow workflow for the given transform, with the given Dataflow step
+ * type.
+ */
+ StepTranslationContext addStep(PTransform<?, ?> transform, String type);
+
+ /**
+ * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be consistent
+ * with the Step, in terms of input, output and coder types.
+ *
+ * <p>This is a low-level operation, when using this method it is up to the caller to ensure
+ * that names do not collide.
+ */
+ Step addStep(PTransform<?, ? extends PValue> transform, Step step);
+ /** Encode a PValue reference as an output reference. */
+ OutputReference asOutputReference(PValue value);
+ }
+
+ /** The interface for a {@link TransformTranslator} to build a Dataflow step. */
+ interface StepTranslationContext {
+ /** Sets the encoding for this Dataflow step. */
+ void addEncodingInput(Coder<?> value);
+
+ /** Adds an input with the given name and value to this Dataflow step. */
+ void addInput(String name, Boolean value);
+
+ /** Adds an input with the given name and value to this Dataflow step. */
+ void addInput(String name, String value);
+
+ /** Adds an input with the given name and value to this Dataflow step. */
+ void addInput(String name, Long value);
+
+ /**
+ * Adds an input with the given name to this Dataflow step, coming from the specified input
+ * PValue.
+ */
+ void addInput(String name, PInput value);
+
+ /** Adds an input that is a dictionary of strings to objects. */
+ void addInput(String name, Map<String, Object> elements);
+
+ /** Adds an input that is a list of objects. */
+ void addInput(String name, List<? extends Map<String, Object>> elements);
+
+ /**
+ * Adds an output to this Dataflow step, producing the specified output {@code PValue},
+ * including its {@code Coder} if a {@code TypedPValue}. If the {@code PValue} is a {@code
+ * PCollection}, wraps its coder inside a {@code WindowedValueCoder}. Returns a pipeline level
+ * unique id.
+ */
+ long addOutput(PValue value);
+
+ /**
+ * Adds an output to this Dataflow step, producing the specified output {@code PValue},
+ * including its {@code Coder} if a {@code TypedPValue}. If the {@code PValue} is a {@code
+ * PCollection}, wraps its coder inside a {@code ValueOnlyCoder}. Returns a pipeline level
+ * unique id.
+ */
+ long addValueOnlyOutput(PValue value);
+
+ /**
+ * Adds an output to this {@code CollectionToSingleton} Dataflow step, consuming the specified
+ * input {@code PValue} and producing the specified output {@code PValue}. This step requires
+ * special treatment for its output encoding. Returns a pipeline level unique id.
+ */
+ long addCollectionToSingletonOutput(PValue inputValue, PValue outputValue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
index 1a5a9a5..a15a2a3 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
@@ -24,10 +24,7 @@ import static org.apache.beam.sdk.util.Structs.addLong;
import com.google.api.services.dataflow.model.SourceMetadata;
import java.util.HashMap;
import java.util.Map;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
+import org.apache.beam.runners.dataflow.TransformTranslator;
import org.apache.beam.sdk.io.FileBasedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Source;
@@ -47,7 +44,7 @@ public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
public static <T> void translateReadHelper(Source<T> source,
PTransform<?, ? extends PValue> transform,
- DataflowPipelineTranslator.TranslationContext context) {
+ TranslationContext context) {
try {
// TODO: Move this validation out of translation once IOChannelUtils is portable
// and can be reconstructed on the worker.
http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index ab82941..84b585a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -56,7 +56,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.runners.dataflow.util.OutputReference;
@@ -566,7 +565,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
* {@link TranslationContext#addStep} and remaps the input port reference.
*/
private static class EmbeddedTranslator
- implements DataflowPipelineTranslator.TransformTranslator<EmbeddedTransform> {
+ implements TransformTranslator<EmbeddedTransform> {
@Override public void translate(EmbeddedTransform transform, TranslationContext context) {
addObject(transform.step.getProperties(), PropertyNames.PARALLEL_INPUT,
context.asOutputReference(context.getInput(transform)));
http://git-wip-us.apache.org/repos/asf/beam/blob/5d2cb3e2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index a19fd8c..4fff1c6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -61,7 +61,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
-import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.StepTranslationContext;
import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsList;
import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMap;
import org.apache.beam.runners.dataflow.DataflowRunner.BatchViewAsMultimap;
@@ -989,12 +988,12 @@ public class DataflowRunnerTest {
DataflowPipelineTranslator.registerTransformTranslator(
TestTransform.class,
- new DataflowPipelineTranslator.TransformTranslator<TestTransform>() {
+ new TransformTranslator<TestTransform>() {
@SuppressWarnings("unchecked")
@Override
public void translate(
TestTransform transform,
- DataflowPipelineTranslator.TranslationContext context) {
+ TranslationContext context) {
transform.translated = true;
// Note: This is about the minimum needed to fake out a
[5/5] beam git commit: This closes #1678: Refactor Dataflow
translator to decouple steps
Posted by ke...@apache.org.
This closes #1678: Refactor Dataflow translator to decouple steps
Reduce visibility of many Dataflow runner internals
Move some pieces of Dataflow translator to top level
Add explicit translation builder for a Step to in Dataflow translator
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a6caa82a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a6caa82a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a6caa82a
Branch: refs/heads/master
Commit: a6caa82a621a0a9c1cd2cd92c26304e9f79546bb
Parents: b4d8702 33907f8
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jan 6 11:37:07 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 6 11:37:07 2017 -0800
----------------------------------------------------------------------
.../beam/runners/dataflow/AssignWindows.java | 89 +++
.../dataflow/DataflowAggregatorTransforms.java | 79 +++
.../dataflow/DataflowMetricUpdateExtractor.java | 109 ++++
.../runners/dataflow/DataflowPipelineJob.java | 2 -
.../dataflow/DataflowPipelineTranslator.java | 424 ++++++--------
.../beam/runners/dataflow/DataflowRunner.java | 68 +--
.../DataflowUnboundedReadFromBoundedSource.java | 547 +++++++++++++++++++
.../beam/runners/dataflow/ReadTranslator.java | 102 ++++
.../runners/dataflow/TransformTranslator.java | 123 +++++
.../dataflow/internal/AssignWindows.java | 89 ---
.../internal/DataflowAggregatorTransforms.java | 79 ---
.../internal/DataflowMetricUpdateExtractor.java | 109 ----
.../DataflowUnboundedReadFromBoundedSource.java | 547 -------------------
.../dataflow/internal/ReadTranslator.java | 104 ----
.../dataflow/DataflowPipelineJobTest.java | 1 -
.../DataflowPipelineTranslatorTest.java | 3 +-
.../runners/dataflow/DataflowRunnerTest.java | 8 +-
...aflowUnboundedReadFromBoundedSourceTest.java | 79 +++
...aflowUnboundedReadFromBoundedSourceTest.java | 79 ---
19 files changed, 1316 insertions(+), 1325 deletions(-)
----------------------------------------------------------------------
[4/5] beam git commit: Reduce visibility of many Dataflow runner
internals
Posted by ke...@apache.org.
Reduce visibility of many Dataflow runner internals
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/33907f89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/33907f89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/33907f89
Branch: refs/heads/master
Commit: 33907f8908238199b166070bc1e12796af32829a
Parents: 5d2cb3e
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jan 5 17:15:52 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 6 11:36:51 2017 -0800
----------------------------------------------------------------------
.../beam/runners/dataflow/AssignWindows.java | 89 +++
.../dataflow/DataflowAggregatorTransforms.java | 79 +++
.../dataflow/DataflowMetricUpdateExtractor.java | 109 ++++
.../runners/dataflow/DataflowPipelineJob.java | 2 -
.../dataflow/DataflowPipelineTranslator.java | 3 +-
.../beam/runners/dataflow/DataflowRunner.java | 4 -
.../DataflowUnboundedReadFromBoundedSource.java | 547 +++++++++++++++++++
.../beam/runners/dataflow/ReadTranslator.java | 102 ++++
.../runners/dataflow/TransformTranslator.java | 2 +-
.../dataflow/internal/AssignWindows.java | 89 ---
.../internal/DataflowAggregatorTransforms.java | 79 ---
.../internal/DataflowMetricUpdateExtractor.java | 109 ----
.../DataflowUnboundedReadFromBoundedSource.java | 547 -------------------
.../dataflow/internal/ReadTranslator.java | 102 ----
.../dataflow/DataflowPipelineJobTest.java | 1 -
...aflowUnboundedReadFromBoundedSourceTest.java | 79 +++
...aflowUnboundedReadFromBoundedSourceTest.java | 79 ---
17 files changed, 1007 insertions(+), 1015 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
new file mode 100644
index 0000000..880cd26
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * A primitive {@link PTransform} that implements the {@link Window#into(WindowFn)}
+ * {@link PTransform}.
+ *
+ * <p>For an application of {@link Window#into(WindowFn)} that changes the {@link WindowFn}, applies
+ * a primitive {@link PTransform} in the Dataflow service.
+ *
+ * <p>For an application of {@link Window#into(WindowFn)} that does not change the {@link WindowFn},
+ * applies an identity {@link ParDo} and sets the windowing strategy of the output
+ * {@link PCollection}.
+ *
+ * <p>For internal use only.
+ *
+ * @param <T> the type of input element
+ */
+class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
+ private final Window.Bound<T> transform;
+
+ /**
+ * Builds an instance of this class from the overriden transform.
+ */
+ @SuppressWarnings("unused") // Used via reflection
+ public AssignWindows(Window.Bound<T> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollection<T> expand(PCollection<T> input) {
+ WindowingStrategy<?, ?> outputStrategy =
+ transform.getOutputStrategyInternal(input.getWindowingStrategy());
+ if (transform.getWindowFn() != null) {
+ // If the windowFn changed, we create a primitive, and run the AssignWindows operation here.
+ return PCollection.<T>createPrimitiveOutputInternal(
+ input.getPipeline(), outputStrategy, input.isBounded());
+ } else {
+ // If the windowFn didn't change, we just run a pass-through transform and then set the
+ // new windowing strategy.
+ return input.apply("Identity", ParDo.of(new DoFn<T, T>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(c.element());
+ }
+ })).setWindowingStrategyInternal(outputStrategy);
+ }
+ }
+
+ @Override
+ public void validate(PCollection<T> input) {
+ transform.validate(input);
+ }
+
+ @Override
+ protected Coder<?> getDefaultOutputCoder(PCollection<T> input) {
+ return input.getCoder();
+ }
+
+ @Override
+ protected String getKindString() {
+ return "Window.Into()";
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java
new file mode 100755
index 0000000..0198cca
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * A mapping relating {@link Aggregator}s and the {@link PTransform} in which they are used.
+ */
+class DataflowAggregatorTransforms {
+ private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms;
+ private final Multimap<PTransform<?, ?>, AppliedPTransform<?, ?, ?>> transformAppliedTransforms;
+ private final BiMap<AppliedPTransform<?, ?, ?>, String> appliedStepNames;
+
+ public DataflowAggregatorTransforms(
+ Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms,
+ Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
+ this.aggregatorTransforms = aggregatorTransforms;
+ appliedStepNames = HashBiMap.create(transformStepNames);
+
+ transformAppliedTransforms = HashMultimap.create();
+ for (AppliedPTransform<?, ?, ?> appliedTransform : transformStepNames.keySet()) {
+ transformAppliedTransforms.put(appliedTransform.getTransform(), appliedTransform);
+ }
+ }
+
+ /**
+ * Returns true if the provided {@link Aggregator} is used in the constructing {@link Pipeline}.
+ */
+ public boolean contains(Aggregator<?, ?> aggregator) {
+ return aggregatorTransforms.containsKey(aggregator);
+ }
+
+ /**
+ * Gets the step names in which the {@link Aggregator} is used.
+ */
+ public Collection<String> getAggregatorStepNames(Aggregator<?, ?> aggregator) {
+ Collection<String> names = new HashSet<>();
+ Collection<PTransform<?, ?>> transforms = aggregatorTransforms.get(aggregator);
+ for (PTransform<?, ?> transform : transforms) {
+ for (AppliedPTransform<?, ?, ?> applied : transformAppliedTransforms.get(transform)) {
+ names.add(appliedStepNames.get(applied));
+ }
+ }
+ return names;
+ }
+
+ /**
+ * Gets the {@link PTransform} that was assigned the provided step name.
+ */
+ public AppliedPTransform<?, ?, ?> getAppliedTransformForStepName(String stepName) {
+ return appliedStepNames.inverse().get(stepName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java
new file mode 100755
index 0000000..f725c46
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import com.google.api.services.dataflow.model.MetricStructuredName;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * Methods for extracting the values of an {@link Aggregator} from a collection of {@link
+ * MetricUpdate MetricUpdates}.
+ */
+final class DataflowMetricUpdateExtractor {
+ private static final String STEP_NAME_CONTEXT_KEY = "step";
+ private static final String IS_TENTATIVE_KEY = "tentative";
+
+ private DataflowMetricUpdateExtractor() {
+ // Do not instantiate.
+ }
+
+ /**
+ * Extract the values of the provided {@link Aggregator} at each {@link PTransform} it was used in
+ * according to the provided {@link DataflowAggregatorTransforms} from the given list of {@link
+ * MetricUpdate MetricUpdates}.
+ */
+ public static <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator,
+ DataflowAggregatorTransforms aggregatorTransforms, List<MetricUpdate> metricUpdates) {
+ Map<String, OutputT> results = new HashMap<>();
+ if (metricUpdates == null) {
+ return results;
+ }
+
+ String aggregatorName = aggregator.getName();
+ Collection<String> aggregatorSteps = aggregatorTransforms.getAggregatorStepNames(aggregator);
+
+ for (MetricUpdate metricUpdate : metricUpdates) {
+ MetricStructuredName metricStructuredName = metricUpdate.getName();
+ Map<String, String> context = metricStructuredName.getContext();
+ if (metricStructuredName.getName().equals(aggregatorName) && context != null
+ && aggregatorSteps.contains(context.get(STEP_NAME_CONTEXT_KEY))) {
+ AppliedPTransform<?, ?, ?> transform =
+ aggregatorTransforms.getAppliedTransformForStepName(
+ context.get(STEP_NAME_CONTEXT_KEY));
+ String fullName = transform.getFullName();
+ // Prefer the tentative (fresher) value if it exists.
+ if (Boolean.parseBoolean(context.get(IS_TENTATIVE_KEY)) || !results.containsKey(fullName)) {
+ results.put(fullName, toValue(aggregator, metricUpdate));
+ }
+ }
+ }
+
+ return results;
+
+ }
+
+ private static <OutputT> OutputT toValue(
+ Aggregator<?, OutputT> aggregator, MetricUpdate metricUpdate) {
+ CombineFn<?, ?, OutputT> combineFn = aggregator.getCombineFn();
+ Class<? super OutputT> outputType = combineFn.getOutputType().getRawType();
+
+ if (outputType.equals(Long.class)) {
+ @SuppressWarnings("unchecked")
+ OutputT asLong = (OutputT) Long.valueOf(toNumber(metricUpdate).longValue());
+ return asLong;
+ }
+ if (outputType.equals(Integer.class)) {
+ @SuppressWarnings("unchecked")
+ OutputT asInt = (OutputT) Integer.valueOf(toNumber(metricUpdate).intValue());
+ return asInt;
+ }
+ if (outputType.equals(Double.class)) {
+ @SuppressWarnings("unchecked")
+ OutputT asDouble = (OutputT) Double.valueOf(toNumber(metricUpdate).doubleValue());
+ return asDouble;
+ }
+ throw new UnsupportedOperationException(
+ "Unsupported Output Type " + outputType + " in aggregator " + aggregator);
+ }
+
+ private static Number toNumber(MetricUpdate update) {
+ if (update.getScalar() instanceof Number) {
+ return (Number) update.getScalar();
+ }
+ throw new IllegalArgumentException(
+ "Metric Update " + update + " does not have a numeric scalar");
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 00c88f9..0da7137 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -35,8 +35,6 @@ import java.net.SocketTimeoutException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
-import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
-import org.apache.beam.runners.dataflow.internal.DataflowMetricUpdateExtractor;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.sdk.AggregatorRetrievalException;
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index e9cf6f4..8e5901e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -58,7 +58,6 @@ import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowRunner.GroupByKeyAndSortValuesOnly;
import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext;
-import org.apache.beam.runners.dataflow.internal.ReadTranslator;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.DoFnInfo;
import org.apache.beam.runners.dataflow.util.OutputReference;
@@ -106,7 +105,7 @@ import org.slf4j.LoggerFactory;
* into Cloud Dataflow Service API {@link Job}s.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
-public class DataflowPipelineTranslator {
+class DataflowPipelineTranslator {
// Must be kept in sync with their internal counterparts.
private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 9da7d24..9ff856a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -72,14 +72,10 @@ import java.util.SortedSet;
import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
-import org.apache.beam.runners.dataflow.internal.AssignWindows;
-import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
-import org.apache.beam.runners.dataflow.internal.DataflowUnboundedReadFromBoundedSource;
import org.apache.beam.runners.dataflow.internal.IsmFormat;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder;
-import org.apache.beam.runners.dataflow.internal.ReadTranslator;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java
new file mode 100644
index 0000000..cfb5ebc
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.NameUtils;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
+ *
+ * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles},
+ * and element timestamps are propagated. While any elements remain, the watermark is the beginning
+ * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
+ * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
+ *
+ * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner
+ * {@link BoundedSource}.
+ * Sources that cannot be split are read entirely into memory, so this transform does not work well
+ * with large, unsplittable sources.
+ *
+ * <p>This transform is intended to be used by a runner during pipeline translation to convert
+ * a Read.Bounded into a Read.Unbounded.
+ *
+ * @deprecated This class is copied from beam runners core in order to avoid pipeline construction
+ * time dependency. It should be replaced in the dataflow worker as an execution time dependency.
+ */
+@Deprecated
+class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class);
+
+ private final BoundedSource<T> source;
+
+ /**
+ * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}.
+ */
+ public DataflowUnboundedReadFromBoundedSource(BoundedSource<T> source) {
+ this.source = source;
+ }
+
+ @Override
+ public PCollection<T> expand(PBegin input) {
+ return input.getPipeline().apply(
+ Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
+ }
+
+ @Override
+ protected Coder<T> getDefaultOutputCoder() {
+ return source.getDefaultOutputCoder();
+ }
+
+ @Override
+ public String getKindString() {
+ return String.format("Read(%s)", NameUtils.approximateSimpleName(source, "AnonymousSource"));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ // We explicitly do not register base-class data, instead we use the delegate inner source.
+ builder
+ .add(DisplayData.item("source", source.getClass()))
+ .include("source", source);
+ }
+
+ /**
+ * A {@code BoundedSource} to {@code UnboundedSource} adapter.
+ */
+ @VisibleForTesting
+ public static class BoundedToUnboundedSourceAdapter<T>
+ extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> {
+
+ private BoundedSource<T> boundedSource;
+
+ public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
+ this.boundedSource = boundedSource;
+ }
+
+ @Override
+ public void validate() {
+ boundedSource.validate();
+ }
+
+ @Override
+ public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(
+ int desiredNumSplits, PipelineOptions options) throws Exception {
+ try {
+ long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
+ if (desiredBundleSize <= 0) {
+ LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.",
+ boundedSource);
+ return ImmutableList.of(this);
+ }
+ List<? extends BoundedSource<T>> splits =
+ boundedSource.splitIntoBundles(desiredBundleSize, options);
+ if (splits == null) {
+ LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
+ return ImmutableList.of(this);
+ }
+ return Lists.transform(
+ splits,
+ new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() {
+ @Override
+ public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) {
+ return new BoundedToUnboundedSourceAdapter<>(input);
+ }});
+ } catch (Exception e) {
+ LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e);
+ return ImmutableList.of(this);
+ }
+ }
+
+ @Override
+ public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)
+ throws IOException {
+ if (checkpoint == null) {
+ return new Reader(null /* residualElements */, boundedSource, options);
+ } else {
+ return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);
+ }
+ }
+
+ @Override
+ public Coder<T> getDefaultOutputCoder() {
+ return boundedSource.getDefaultOutputCoder();
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
+ return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("source", boundedSource.getClass()));
+ builder.include("source", boundedSource);
+ }
+
+ @VisibleForTesting
+ static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
+ private final @Nullable List<TimestampedValue<T>> residualElements;
+ private final @Nullable BoundedSource<T> residualSource;
+
+ public Checkpoint(
+ @Nullable List<TimestampedValue<T>> residualElements,
+ @Nullable BoundedSource<T> residualSource) {
+ this.residualElements = residualElements;
+ this.residualSource = residualSource;
+ }
+
+ @Override
+ public void finalizeCheckpoint() {}
+
+ @VisibleForTesting
+ @Nullable List<TimestampedValue<T>> getResidualElements() {
+ return residualElements;
+ }
+
+ @VisibleForTesting
+ @Nullable BoundedSource<T> getResidualSource() {
+ return residualSource;
+ }
+ }
+
+ @VisibleForTesting
+ static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> {
+
+ @JsonCreator
+ public static CheckpointCoder<?> of(
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+ List<Coder<?>> components) {
+ checkArgument(components.size() == 1,
+ "Expecting 1 components, got %s", components.size());
+ return new CheckpointCoder<>(components.get(0));
+ }
+
+ // The coder for a list of residual elements and their timestamps
+ private final Coder<List<TimestampedValue<T>>> elemsCoder;
+ // The coder from the BoundedReader for coding each element
+ private final Coder<T> elemCoder;
+ // The nullable and serializable coder for the BoundedSource.
+ @SuppressWarnings("rawtypes")
+ private final Coder<BoundedSource> sourceCoder;
+
+ CheckpointCoder(Coder<T> elemCoder) {
+ this.elemsCoder = NullableCoder.of(
+ ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)));
+ this.elemCoder = elemCoder;
+ this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));
+ }
+
+ @Override
+ public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ elemsCoder.encode(value.residualElements, outStream, context.nested());
+ sourceCoder.encode(value.residualSource, outStream, context);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Checkpoint<T> decode(InputStream inStream, Context context)
+ throws CoderException, IOException {
+ return new Checkpoint<>(
+ elemsCoder.decode(inStream, context.nested()),
+ sourceCoder.decode(inStream, context));
+ }
+
+ @Override
+ public List<Coder<?>> getCoderArguments() {
+ return Arrays.<Coder<?>>asList(elemCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ throw new NonDeterministicException(this,
+ "CheckpointCoder uses Java Serialization, which may be non-deterministic.");
+ }
+ }
+
+ /**
+ * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into
+ * {@link ResidualElements} and {@link ResidualSource}.
+ *
+ * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains
+ * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will
+ * be split into {@link ResidualElements} and {@link ResidualSource}.
+ */
+ @VisibleForTesting
+ class Reader extends UnboundedReader<T> {
+ private ResidualElements residualElements;
+ private @Nullable ResidualSource residualSource;
+ private final PipelineOptions options;
+ private boolean done;
+
+ Reader(
+ @Nullable List<TimestampedValue<T>> residualElementsList,
+ @Nullable BoundedSource<T> residualSource,
+ PipelineOptions options) {
+ init(residualElementsList, residualSource, options);
+ this.options = checkNotNull(options, "options");
+ this.done = false;
+ }
+
+ private void init(
+ @Nullable List<TimestampedValue<T>> residualElementsList,
+ @Nullable BoundedSource<T> residualSource,
+ PipelineOptions options) {
+ this.residualElements = residualElementsList == null
+ ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList())
+ : new ResidualElements(residualElementsList);
+ this.residualSource =
+ residualSource == null ? null : new ResidualSource(residualSource, options);
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (residualElements.advance()) {
+ return true;
+ } else if (residualSource != null && residualSource.advance()) {
+ return true;
+ } else {
+ done = true;
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (residualSource != null) {
+ residualSource.close();
+ }
+ }
+
+ @Override
+ public T getCurrent() throws NoSuchElementException {
+ if (residualElements.hasCurrent()) {
+ return residualElements.getCurrent();
+ } else if (residualSource != null) {
+ return residualSource.getCurrent();
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ if (residualElements.hasCurrent()) {
+ return residualElements.getCurrentTimestamp();
+ } else if (residualSource != null) {
+ return residualSource.getCurrentTimestamp();
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p>If only part of the {@link ResidualElements} is consumed, the new
+ * checkpoint will contain the remaining elements in {@link ResidualElements} and
+ * the {@link ResidualSource}.
+ *
+ * <p>If all {@link ResidualElements} and part of the
+ * {@link ResidualSource} are consumed, the new checkpoint is done by splitting
+ * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}.
+ * {@link ResidualSource} is the source split from the current source,
+ * and {@link ResidualElements} contains rest elements from the current source after
+ * the splitting. For unsplittable source, it will put all remaining elements into
+ * the {@link ResidualElements}.
+ */
+ @Override
+ public Checkpoint<T> getCheckpointMark() {
+ Checkpoint<T> newCheckpoint;
+ if (!residualElements.done()) {
+ // Part of residualElements are consumed.
+ // Checkpoints the remaining elements and residualSource.
+ newCheckpoint = new Checkpoint<>(
+ residualElements.getRestElements(),
+ residualSource == null ? null : residualSource.getSource());
+ } else if (residualSource != null) {
+ newCheckpoint = residualSource.getCheckpointMark();
+ } else {
+ newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */);
+ }
+ // Re-initialize since the residualElements and the residualSource might be
+ // consumed or split by checkpointing.
+ init(newCheckpoint.residualElements, newCheckpoint.residualSource, options);
+ return newCheckpoint;
+ }
+
+ @Override
+ public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {
+ return BoundedToUnboundedSourceAdapter.this;
+ }
+ }
+
+ private class ResidualElements {
+ private final List<TimestampedValue<T>> elementsList;
+ private @Nullable Iterator<TimestampedValue<T>> elementsIterator;
+ private @Nullable TimestampedValue<T> currentT;
+ private boolean hasCurrent;
+ private boolean done;
+
+ ResidualElements(List<TimestampedValue<T>> residualElementsList) {
+ this.elementsList = checkNotNull(residualElementsList, "residualElementsList");
+ this.elementsIterator = null;
+ this.currentT = null;
+ this.hasCurrent = false;
+ this.done = false;
+ }
+
+ public boolean advance() {
+ if (elementsIterator == null) {
+ elementsIterator = elementsList.iterator();
+ }
+ if (elementsIterator.hasNext()) {
+ currentT = elementsIterator.next();
+ hasCurrent = true;
+ return true;
+ } else {
+ done = true;
+ hasCurrent = false;
+ return false;
+ }
+ }
+
+ boolean hasCurrent() {
+ return hasCurrent;
+ }
+
+ boolean done() {
+ return done;
+ }
+
+ TimestampedValue<T> getCurrentTimestampedValue() {
+ if (!hasCurrent) {
+ throw new NoSuchElementException();
+ }
+ return currentT;
+ }
+
+ T getCurrent() {
+ return getCurrentTimestampedValue().getValue();
+ }
+
+ Instant getCurrentTimestamp() {
+ return getCurrentTimestampedValue().getTimestamp();
+ }
+
+ List<TimestampedValue<T>> getRestElements() {
+ if (elementsIterator == null) {
+ return elementsList;
+ } else {
+ List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
+ while (elementsIterator.hasNext()) {
+ newResidualElements.add(elementsIterator.next());
+ }
+ return newResidualElements;
+ }
+ }
+ }
+
+ private class ResidualSource {
+ private BoundedSource<T> residualSource;
+ private PipelineOptions options;
+ private @Nullable BoundedReader<T> reader;
+ private boolean closed;
+
+ public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {
+ this.residualSource = checkNotNull(residualSource, "residualSource");
+ this.options = checkNotNull(options, "options");
+ this.reader = null;
+ this.closed = false;
+ }
+
+ private boolean advance() throws IOException {
+ if (reader == null && !closed) {
+ reader = residualSource.createReader(options);
+ return reader.start();
+ } else {
+ return reader.advance();
+ }
+ }
+
+ T getCurrent() throws NoSuchElementException {
+ if (reader == null) {
+ throw new NoSuchElementException();
+ }
+ return reader.getCurrent();
+ }
+
+ Instant getCurrentTimestamp() throws NoSuchElementException {
+ if (reader == null) {
+ throw new NoSuchElementException();
+ }
+ return reader.getCurrentTimestamp();
+ }
+
+ void close() throws IOException {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ closed = true;
+ }
+
+ BoundedSource<T> getSource() {
+ return residualSource;
+ }
+
+ Checkpoint<T> getCheckpointMark() {
+ if (reader == null) {
+ // Reader hasn't started, checkpoint the residualSource.
+ return new Checkpoint<>(null /* residualElements */, residualSource);
+ } else {
+ // Part of residualSource are consumed.
+ // Splits the residualSource and tracks the new residualElements in current source.
+ BoundedSource<T> residualSplit = null;
+ Double fractionConsumed = reader.getFractionConsumed();
+ if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) {
+ double fractionRest = 1 - fractionConsumed;
+ int splitAttempts = 8;
+ for (int i = 0; i < 8 && residualSplit == null; ++i) {
+ double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts;
+ residualSplit = reader.splitAtFraction(fractionToSplit);
+ }
+ }
+ List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
+ try {
+ while (advance()) {
+ newResidualElements.add(
+ TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp()));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read elements from the bounded reader.", e);
+ }
+ return new Checkpoint<>(newResidualElements, residualSplit);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
new file mode 100755
index 0000000..ed03b53
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.apache.beam.sdk.util.Structs.addBoolean;
+import static org.apache.beam.sdk.util.Structs.addDictionary;
+import static org.apache.beam.sdk.util.Structs.addLong;
+
+import com.google.api.services.dataflow.model.SourceMetadata;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.internal.CustomSources;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.PValue;
+
+/**
+ * Translator for the {@code Read} {@code PTransform} for the Dataflow back-end.
+ */
+class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
+ @Override
+ public void translate(Read.Bounded<?> transform, TranslationContext context) {
+ translateReadHelper(transform.getSource(), transform, context);
+ }
+
+ public static <T> void translateReadHelper(Source<T> source,
+ PTransform<?, ? extends PValue> transform,
+ TranslationContext context) {
+ try {
+ // TODO: Move this validation out of translation once IOChannelUtils is portable
+ // and can be reconstructed on the worker.
+ if (source instanceof FileBasedSource) {
+ ValueProvider<String> filePatternOrSpec =
+ ((FileBasedSource<?>) source).getFileOrPatternSpecProvider();
+ if (filePatternOrSpec.isAccessible()) {
+ context.getPipelineOptions()
+ .getPathValidator()
+ .validateInputFilePatternSupported(filePatternOrSpec.get());
+ }
+ }
+
+ StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
+ stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
+ stepContext.addInput(
+ PropertyNames.SOURCE_STEP_INPUT,
+ cloudSourceToDictionary(
+ CustomSources.serializeToCloudSource(source, context.getPipelineOptions())));
+ stepContext.addValueOnlyOutput(context.getOutput(transform));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // Represents a cloud Source as a dictionary for encoding inside the {@code SOURCE_STEP_INPUT}
+ // property of CloudWorkflowStep.input.
+ private static Map<String, Object> cloudSourceToDictionary(
+ com.google.api.services.dataflow.model.Source source) {
+ // Do not translate encoding - the source's encoding is translated elsewhere
+ // to the step's output info.
+ Map<String, Object> res = new HashMap<>();
+ addDictionary(res, PropertyNames.SOURCE_SPEC, source.getSpec());
+ if (source.getMetadata() != null) {
+ addDictionary(res, PropertyNames.SOURCE_METADATA,
+ cloudSourceMetadataToDictionary(source.getMetadata()));
+ }
+ if (source.getDoesNotNeedSplitting() != null) {
+ addBoolean(
+ res, PropertyNames.SOURCE_DOES_NOT_NEED_SPLITTING, source.getDoesNotNeedSplitting());
+ }
+ return res;
+ }
+
+ private static Map<String, Object> cloudSourceMetadataToDictionary(SourceMetadata metadata) {
+ Map<String, Object> res = new HashMap<>();
+ if (metadata.getEstimatedSizeBytes() != null) {
+ addLong(res, PropertyNames.SOURCE_ESTIMATED_SIZE_BYTES, metadata.getEstimatedSizeBytes());
+ }
+ if (metadata.getInfinite() != null) {
+ addBoolean(res, PropertyNames.SOURCE_IS_INFINITE, metadata.getInfinite());
+ }
+ return res;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
index 2aa8327..fb883a7 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.values.PValue;
* A {@link TransformTranslator} knows how to translate a particular subclass of {@link PTransform}
* for the Cloud Dataflow service. It does so by mutating the {@link TranslationContext}.
*/
-public interface TransformTranslator<TransformT extends PTransform> {
+interface TransformTranslator<TransformT extends PTransform> {
void translate(TransformT transform, TranslationContext context);
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
deleted file mode 100644
index 27fe13d..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.internal;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * A primitive {@link PTransform} that implements the {@link Window#into(WindowFn)}
- * {@link PTransform}.
- *
- * <p>For an application of {@link Window#into(WindowFn)} that changes the {@link WindowFn}, applies
- * a primitive {@link PTransform} in the Dataflow service.
- *
- * <p>For an application of {@link Window#into(WindowFn)} that does not change the {@link WindowFn},
- * applies an identity {@link ParDo} and sets the windowing strategy of the output
- * {@link PCollection}.
- *
- * <p>For internal use only.
- *
- * @param <T> the type of input element
- */
-public class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> {
- private final Window.Bound<T> transform;
-
- /**
- * Builds an instance of this class from the overriden transform.
- */
- @SuppressWarnings("unused") // Used via reflection
- public AssignWindows(Window.Bound<T> transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollection<T> expand(PCollection<T> input) {
- WindowingStrategy<?, ?> outputStrategy =
- transform.getOutputStrategyInternal(input.getWindowingStrategy());
- if (transform.getWindowFn() != null) {
- // If the windowFn changed, we create a primitive, and run the AssignWindows operation here.
- return PCollection.<T>createPrimitiveOutputInternal(
- input.getPipeline(), outputStrategy, input.isBounded());
- } else {
- // If the windowFn didn't change, we just run a pass-through transform and then set the
- // new windowing strategy.
- return input.apply("Identity", ParDo.of(new DoFn<T, T>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- c.output(c.element());
- }
- })).setWindowingStrategyInternal(outputStrategy);
- }
- }
-
- @Override
- public void validate(PCollection<T> input) {
- transform.validate(input);
- }
-
- @Override
- protected Coder<?> getDefaultOutputCoder(PCollection<T> input) {
- return input.getCoder();
- }
-
- @Override
- protected String getKindString() {
- return "Window.Into()";
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java
deleted file mode 100755
index fb78973..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.internal;
-
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-
-/**
- * A mapping relating {@link Aggregator}s and the {@link PTransform} in which they are used.
- */
-public class DataflowAggregatorTransforms {
- private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms;
- private final Multimap<PTransform<?, ?>, AppliedPTransform<?, ?, ?>> transformAppliedTransforms;
- private final BiMap<AppliedPTransform<?, ?, ?>, String> appliedStepNames;
-
- public DataflowAggregatorTransforms(
- Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms,
- Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) {
- this.aggregatorTransforms = aggregatorTransforms;
- appliedStepNames = HashBiMap.create(transformStepNames);
-
- transformAppliedTransforms = HashMultimap.create();
- for (AppliedPTransform<?, ?, ?> appliedTransform : transformStepNames.keySet()) {
- transformAppliedTransforms.put(appliedTransform.getTransform(), appliedTransform);
- }
- }
-
- /**
- * Returns true if the provided {@link Aggregator} is used in the constructing {@link Pipeline}.
- */
- public boolean contains(Aggregator<?, ?> aggregator) {
- return aggregatorTransforms.containsKey(aggregator);
- }
-
- /**
- * Gets the step names in which the {@link Aggregator} is used.
- */
- public Collection<String> getAggregatorStepNames(Aggregator<?, ?> aggregator) {
- Collection<String> names = new HashSet<>();
- Collection<PTransform<?, ?>> transforms = aggregatorTransforms.get(aggregator);
- for (PTransform<?, ?> transform : transforms) {
- for (AppliedPTransform<?, ?, ?> applied : transformAppliedTransforms.get(transform)) {
- names.add(appliedStepNames.get(applied));
- }
- }
- return names;
- }
-
- /**
- * Gets the {@link PTransform} that was assigned the provided step name.
- */
- public AppliedPTransform<?, ?, ?> getAppliedTransformForStepName(String stepName) {
- return appliedStepNames.inverse().get(stepName);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java
deleted file mode 100755
index d715437..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.internal;
-
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.PTransform;
-
-/**
- * Methods for extracting the values of an {@link Aggregator} from a collection of {@link
- * MetricUpdate MetricUpdates}.
- */
-public final class DataflowMetricUpdateExtractor {
- private static final String STEP_NAME_CONTEXT_KEY = "step";
- private static final String IS_TENTATIVE_KEY = "tentative";
-
- private DataflowMetricUpdateExtractor() {
- // Do not instantiate.
- }
-
- /**
- * Extract the values of the provided {@link Aggregator} at each {@link PTransform} it was used in
- * according to the provided {@link DataflowAggregatorTransforms} from the given list of {@link
- * MetricUpdate MetricUpdates}.
- */
- public static <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator,
- DataflowAggregatorTransforms aggregatorTransforms, List<MetricUpdate> metricUpdates) {
- Map<String, OutputT> results = new HashMap<>();
- if (metricUpdates == null) {
- return results;
- }
-
- String aggregatorName = aggregator.getName();
- Collection<String> aggregatorSteps = aggregatorTransforms.getAggregatorStepNames(aggregator);
-
- for (MetricUpdate metricUpdate : metricUpdates) {
- MetricStructuredName metricStructuredName = metricUpdate.getName();
- Map<String, String> context = metricStructuredName.getContext();
- if (metricStructuredName.getName().equals(aggregatorName) && context != null
- && aggregatorSteps.contains(context.get(STEP_NAME_CONTEXT_KEY))) {
- AppliedPTransform<?, ?, ?> transform =
- aggregatorTransforms.getAppliedTransformForStepName(
- context.get(STEP_NAME_CONTEXT_KEY));
- String fullName = transform.getFullName();
- // Prefer the tentative (fresher) value if it exists.
- if (Boolean.parseBoolean(context.get(IS_TENTATIVE_KEY)) || !results.containsKey(fullName)) {
- results.put(fullName, toValue(aggregator, metricUpdate));
- }
- }
- }
-
- return results;
-
- }
-
- private static <OutputT> OutputT toValue(
- Aggregator<?, OutputT> aggregator, MetricUpdate metricUpdate) {
- CombineFn<?, ?, OutputT> combineFn = aggregator.getCombineFn();
- Class<? super OutputT> outputType = combineFn.getOutputType().getRawType();
-
- if (outputType.equals(Long.class)) {
- @SuppressWarnings("unchecked")
- OutputT asLong = (OutputT) Long.valueOf(toNumber(metricUpdate).longValue());
- return asLong;
- }
- if (outputType.equals(Integer.class)) {
- @SuppressWarnings("unchecked")
- OutputT asInt = (OutputT) Integer.valueOf(toNumber(metricUpdate).intValue());
- return asInt;
- }
- if (outputType.equals(Double.class)) {
- @SuppressWarnings("unchecked")
- OutputT asDouble = (OutputT) Double.valueOf(toNumber(metricUpdate).doubleValue());
- return asDouble;
- }
- throw new UnsupportedOperationException(
- "Unsupported Output Type " + outputType + " in aggregator " + aggregator);
- }
-
- private static Number toNumber(MetricUpdate update) {
- if (update.getScalar() instanceof Number) {
- return (Number) update.getScalar();
- }
- throw new IllegalArgumentException(
- "Metric Update " + update + " does not have a numeric scalar");
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
deleted file mode 100644
index a2ae799..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.internal;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.NameUtils;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
- *
- * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles},
- * and element timestamps are propagated. While any elements remain, the watermark is the beginning
- * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
- * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
- *
- * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner
- * {@link BoundedSource}.
- * Sources that cannot be split are read entirely into memory, so this transform does not work well
- * with large, unsplittable sources.
- *
- * <p>This transform is intended to be used by a runner during pipeline translation to convert
- * a Read.Bounded into a Read.Unbounded.
- *
- * @deprecated This class is copied from beam runners core in order to avoid pipeline construction
- * time dependency. It should be replaced in the dataflow worker as an execution time dependency.
- */
-@Deprecated
-public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class);
-
- private final BoundedSource<T> source;
-
- /**
- * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}.
- */
- public DataflowUnboundedReadFromBoundedSource(BoundedSource<T> source) {
- this.source = source;
- }
-
- @Override
- public PCollection<T> expand(PBegin input) {
- return input.getPipeline().apply(
- Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
- }
-
- @Override
- protected Coder<T> getDefaultOutputCoder() {
- return source.getDefaultOutputCoder();
- }
-
- @Override
- public String getKindString() {
- return String.format("Read(%s)", NameUtils.approximateSimpleName(source, "AnonymousSource"));
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- // We explicitly do not register base-class data, instead we use the delegate inner source.
- builder
- .add(DisplayData.item("source", source.getClass()))
- .include("source", source);
- }
-
- /**
- * A {@code BoundedSource} to {@code UnboundedSource} adapter.
- */
- @VisibleForTesting
- public static class BoundedToUnboundedSourceAdapter<T>
- extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> {
-
- private BoundedSource<T> boundedSource;
-
- public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
- this.boundedSource = boundedSource;
- }
-
- @Override
- public void validate() {
- boundedSource.validate();
- }
-
- @Override
- public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(
- int desiredNumSplits, PipelineOptions options) throws Exception {
- try {
- long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
- if (desiredBundleSize <= 0) {
- LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.",
- boundedSource);
- return ImmutableList.of(this);
- }
- List<? extends BoundedSource<T>> splits =
- boundedSource.splitIntoBundles(desiredBundleSize, options);
- if (splits == null) {
- LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
- return ImmutableList.of(this);
- }
- return Lists.transform(
- splits,
- new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() {
- @Override
- public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) {
- return new BoundedToUnboundedSourceAdapter<>(input);
- }});
- } catch (Exception e) {
- LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e);
- return ImmutableList.of(this);
- }
- }
-
- @Override
- public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)
- throws IOException {
- if (checkpoint == null) {
- return new Reader(null /* residualElements */, boundedSource, options);
- } else {
- return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);
- }
- }
-
- @Override
- public Coder<T> getDefaultOutputCoder() {
- return boundedSource.getDefaultOutputCoder();
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- @Override
- public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
- return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder.add(DisplayData.item("source", boundedSource.getClass()));
- builder.include("source", boundedSource);
- }
-
- @VisibleForTesting
- static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
- private final @Nullable List<TimestampedValue<T>> residualElements;
- private final @Nullable BoundedSource<T> residualSource;
-
- public Checkpoint(
- @Nullable List<TimestampedValue<T>> residualElements,
- @Nullable BoundedSource<T> residualSource) {
- this.residualElements = residualElements;
- this.residualSource = residualSource;
- }
-
- @Override
- public void finalizeCheckpoint() {}
-
- @VisibleForTesting
- @Nullable List<TimestampedValue<T>> getResidualElements() {
- return residualElements;
- }
-
- @VisibleForTesting
- @Nullable BoundedSource<T> getResidualSource() {
- return residualSource;
- }
- }
-
- @VisibleForTesting
- static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> {
-
- @JsonCreator
- public static CheckpointCoder<?> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- checkArgument(components.size() == 1,
- "Expecting 1 components, got %s", components.size());
- return new CheckpointCoder<>(components.get(0));
- }
-
- // The coder for a list of residual elements and their timestamps
- private final Coder<List<TimestampedValue<T>>> elemsCoder;
- // The coder from the BoundedReader for coding each element
- private final Coder<T> elemCoder;
- // The nullable and serializable coder for the BoundedSource.
- @SuppressWarnings("rawtypes")
- private final Coder<BoundedSource> sourceCoder;
-
- CheckpointCoder(Coder<T> elemCoder) {
- this.elemsCoder = NullableCoder.of(
- ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)));
- this.elemCoder = elemCoder;
- this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));
- }
-
- @Override
- public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
- throws CoderException, IOException {
- elemsCoder.encode(value.residualElements, outStream, context.nested());
- sourceCoder.encode(value.residualSource, outStream, context);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Checkpoint<T> decode(InputStream inStream, Context context)
- throws CoderException, IOException {
- return new Checkpoint<>(
- elemsCoder.decode(inStream, context.nested()),
- sourceCoder.decode(inStream, context));
- }
-
- @Override
- public List<Coder<?>> getCoderArguments() {
- return Arrays.<Coder<?>>asList(elemCoder);
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- throw new NonDeterministicException(this,
- "CheckpointCoder uses Java Serialization, which may be non-deterministic.");
- }
- }
-
- /**
- * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into
- * {@link ResidualElements} and {@link ResidualSource}.
- *
- * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains
- * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will
- * be split into {@link ResidualElements} and {@link ResidualSource}.
- */
- @VisibleForTesting
- class Reader extends UnboundedReader<T> {
- private ResidualElements residualElements;
- private @Nullable ResidualSource residualSource;
- private final PipelineOptions options;
- private boolean done;
-
- Reader(
- @Nullable List<TimestampedValue<T>> residualElementsList,
- @Nullable BoundedSource<T> residualSource,
- PipelineOptions options) {
- init(residualElementsList, residualSource, options);
- this.options = checkNotNull(options, "options");
- this.done = false;
- }
-
- private void init(
- @Nullable List<TimestampedValue<T>> residualElementsList,
- @Nullable BoundedSource<T> residualSource,
- PipelineOptions options) {
- this.residualElements = residualElementsList == null
- ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList())
- : new ResidualElements(residualElementsList);
- this.residualSource =
- residualSource == null ? null : new ResidualSource(residualSource, options);
- }
-
- @Override
- public boolean start() throws IOException {
- return advance();
- }
-
- @Override
- public boolean advance() throws IOException {
- if (residualElements.advance()) {
- return true;
- } else if (residualSource != null && residualSource.advance()) {
- return true;
- } else {
- done = true;
- return false;
- }
- }
-
- @Override
- public void close() throws IOException {
- if (residualSource != null) {
- residualSource.close();
- }
- }
-
- @Override
- public T getCurrent() throws NoSuchElementException {
- if (residualElements.hasCurrent()) {
- return residualElements.getCurrent();
- } else if (residualSource != null) {
- return residualSource.getCurrent();
- } else {
- throw new NoSuchElementException();
- }
- }
-
- @Override
- public Instant getCurrentTimestamp() throws NoSuchElementException {
- if (residualElements.hasCurrent()) {
- return residualElements.getCurrentTimestamp();
- } else if (residualSource != null) {
- return residualSource.getCurrentTimestamp();
- } else {
- throw new NoSuchElementException();
- }
- }
-
- @Override
- public Instant getWatermark() {
- return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>If only part of the {@link ResidualElements} is consumed, the new
- * checkpoint will contain the remaining elements in {@link ResidualElements} and
- * the {@link ResidualSource}.
- *
- * <p>If all {@link ResidualElements} and part of the
- * {@link ResidualSource} are consumed, the new checkpoint is done by splitting
- * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}.
- * {@link ResidualSource} is the source split from the current source,
- * and {@link ResidualElements} contains rest elements from the current source after
- * the splitting. For unsplittable source, it will put all remaining elements into
- * the {@link ResidualElements}.
- */
- @Override
- public Checkpoint<T> getCheckpointMark() {
- Checkpoint<T> newCheckpoint;
- if (!residualElements.done()) {
- // Part of residualElements are consumed.
- // Checkpoints the remaining elements and residualSource.
- newCheckpoint = new Checkpoint<>(
- residualElements.getRestElements(),
- residualSource == null ? null : residualSource.getSource());
- } else if (residualSource != null) {
- newCheckpoint = residualSource.getCheckpointMark();
- } else {
- newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */);
- }
- // Re-initialize since the residualElements and the residualSource might be
- // consumed or split by checkpointing.
- init(newCheckpoint.residualElements, newCheckpoint.residualSource, options);
- return newCheckpoint;
- }
-
- @Override
- public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {
- return BoundedToUnboundedSourceAdapter.this;
- }
- }
-
- private class ResidualElements {
- private final List<TimestampedValue<T>> elementsList;
- private @Nullable Iterator<TimestampedValue<T>> elementsIterator;
- private @Nullable TimestampedValue<T> currentT;
- private boolean hasCurrent;
- private boolean done;
-
- ResidualElements(List<TimestampedValue<T>> residualElementsList) {
- this.elementsList = checkNotNull(residualElementsList, "residualElementsList");
- this.elementsIterator = null;
- this.currentT = null;
- this.hasCurrent = false;
- this.done = false;
- }
-
- public boolean advance() {
- if (elementsIterator == null) {
- elementsIterator = elementsList.iterator();
- }
- if (elementsIterator.hasNext()) {
- currentT = elementsIterator.next();
- hasCurrent = true;
- return true;
- } else {
- done = true;
- hasCurrent = false;
- return false;
- }
- }
-
- boolean hasCurrent() {
- return hasCurrent;
- }
-
- boolean done() {
- return done;
- }
-
- TimestampedValue<T> getCurrentTimestampedValue() {
- if (!hasCurrent) {
- throw new NoSuchElementException();
- }
- return currentT;
- }
-
- T getCurrent() {
- return getCurrentTimestampedValue().getValue();
- }
-
- Instant getCurrentTimestamp() {
- return getCurrentTimestampedValue().getTimestamp();
- }
-
- List<TimestampedValue<T>> getRestElements() {
- if (elementsIterator == null) {
- return elementsList;
- } else {
- List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
- while (elementsIterator.hasNext()) {
- newResidualElements.add(elementsIterator.next());
- }
- return newResidualElements;
- }
- }
- }
-
- private class ResidualSource {
- private BoundedSource<T> residualSource;
- private PipelineOptions options;
- private @Nullable BoundedReader<T> reader;
- private boolean closed;
-
- public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) {
- this.residualSource = checkNotNull(residualSource, "residualSource");
- this.options = checkNotNull(options, "options");
- this.reader = null;
- this.closed = false;
- }
-
- private boolean advance() throws IOException {
- if (reader == null && !closed) {
- reader = residualSource.createReader(options);
- return reader.start();
- } else {
- return reader.advance();
- }
- }
-
- T getCurrent() throws NoSuchElementException {
- if (reader == null) {
- throw new NoSuchElementException();
- }
- return reader.getCurrent();
- }
-
- Instant getCurrentTimestamp() throws NoSuchElementException {
- if (reader == null) {
- throw new NoSuchElementException();
- }
- return reader.getCurrentTimestamp();
- }
-
- void close() throws IOException {
- if (reader != null) {
- reader.close();
- reader = null;
- }
- closed = true;
- }
-
- BoundedSource<T> getSource() {
- return residualSource;
- }
-
- Checkpoint<T> getCheckpointMark() {
- if (reader == null) {
- // Reader hasn't started, checkpoint the residualSource.
- return new Checkpoint<>(null /* residualElements */, residualSource);
- } else {
- // Part of residualSource are consumed.
- // Splits the residualSource and tracks the new residualElements in current source.
- BoundedSource<T> residualSplit = null;
- Double fractionConsumed = reader.getFractionConsumed();
- if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) {
- double fractionRest = 1 - fractionConsumed;
- int splitAttempts = 8;
- for (int i = 0; i < 8 && residualSplit == null; ++i) {
- double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts;
- residualSplit = reader.splitAtFraction(fractionToSplit);
- }
- }
- List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
- try {
- while (advance()) {
- newResidualElements.add(
- TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp()));
- }
- } catch (IOException e) {
- throw new RuntimeException("Failed to read elements from the bounded reader.", e);
- }
- return new Checkpoint<>(newResidualElements, residualSplit);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
deleted file mode 100755
index a15a2a3..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.internal;
-
-import static org.apache.beam.sdk.util.Structs.addBoolean;
-import static org.apache.beam.sdk.util.Structs.addDictionary;
-import static org.apache.beam.sdk.util.Structs.addLong;
-
-import com.google.api.services.dataflow.model.SourceMetadata;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.runners.dataflow.TransformTranslator;
-import org.apache.beam.sdk.io.FileBasedSource;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.values.PValue;
-
-/**
- * Translator for the {@code Read} {@code PTransform} for the Dataflow back-end.
- */
-public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> {
- @Override
- public void translate(Read.Bounded<?> transform, TranslationContext context) {
- translateReadHelper(transform.getSource(), transform, context);
- }
-
- public static <T> void translateReadHelper(Source<T> source,
- PTransform<?, ? extends PValue> transform,
- TranslationContext context) {
- try {
- // TODO: Move this validation out of translation once IOChannelUtils is portable
- // and can be reconstructed on the worker.
- if (source instanceof FileBasedSource) {
- ValueProvider<String> filePatternOrSpec =
- ((FileBasedSource<?>) source).getFileOrPatternSpecProvider();
- if (filePatternOrSpec.isAccessible()) {
- context.getPipelineOptions()
- .getPathValidator()
- .validateInputFilePatternSupported(filePatternOrSpec.get());
- }
- }
-
- StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
- stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
- stepContext.addInput(
- PropertyNames.SOURCE_STEP_INPUT,
- cloudSourceToDictionary(
- CustomSources.serializeToCloudSource(source, context.getPipelineOptions())));
- stepContext.addValueOnlyOutput(context.getOutput(transform));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- // Represents a cloud Source as a dictionary for encoding inside the {@code SOURCE_STEP_INPUT}
- // property of CloudWorkflowStep.input.
- private static Map<String, Object> cloudSourceToDictionary(
- com.google.api.services.dataflow.model.Source source) {
- // Do not translate encoding - the source's encoding is translated elsewhere
- // to the step's output info.
- Map<String, Object> res = new HashMap<>();
- addDictionary(res, PropertyNames.SOURCE_SPEC, source.getSpec());
- if (source.getMetadata() != null) {
- addDictionary(res, PropertyNames.SOURCE_METADATA,
- cloudSourceMetadataToDictionary(source.getMetadata()));
- }
- if (source.getDoesNotNeedSplitting() != null) {
- addBoolean(
- res, PropertyNames.SOURCE_DOES_NOT_NEED_SPLITTING, source.getDoesNotNeedSplitting());
- }
- return res;
- }
-
- private static Map<String, Object> cloudSourceMetadataToDictionary(SourceMetadata metadata) {
- Map<String, Object> res = new HashMap<>();
- if (metadata.getEstimatedSizeBytes() != null) {
- addLong(res, PropertyNames.SOURCE_ESTIMATED_SIZE_BYTES, metadata.getEstimatedSizeBytes());
- }
- if (metadata.getInfinite() != null) {
- addBoolean(res, PropertyNames.SOURCE_IS_INFINITE, metadata.getInfinite());
- }
- return res;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 6999e03..d5d7aa9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -54,7 +54,6 @@ import java.math.BigDecimal;
import java.net.SocketTimeoutException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.sdk.AggregatorRetrievalException;
[3/5] beam git commit: Reduce visibility of many Dataflow runner
internals
Posted by ke...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java
new file mode 100644
index 0000000..c479332
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.dataflow;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@see DataflowUnboundedReadFromBoundedSource}.
+ */
+@RunWith(JUnit4.class)
+public class DataflowUnboundedReadFromBoundedSourceTest {
+ @Test
+ public void testKind() {
+ DataflowUnboundedReadFromBoundedSource<?> read = new
+ DataflowUnboundedReadFromBoundedSource<>(new NoopNamedSource());
+
+ assertEquals("Read(NoopNamedSource)", read.getKindString());
+ }
+
+ @Test
+ public void testKindAnonymousSource() {
+ NoopNamedSource anonSource = new NoopNamedSource() {};
+ DataflowUnboundedReadFromBoundedSource<?> read = new
+ DataflowUnboundedReadFromBoundedSource<>(anonSource);
+
+ assertEquals("Read(AnonymousSource)", read.getKindString());
+ }
+
+ /** Source implementation only useful for its identity. */
+ static class NoopNamedSource extends BoundedSource<String> {
+ @Override
+ public List<? extends BoundedSource<String>> splitIntoBundles(long desiredBundleSizeBytes,
+ PipelineOptions options) throws Exception {
+ return null;
+ }
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ return 0;
+ }
+ @Override
+ public BoundedReader<String> createReader(
+ PipelineOptions options) throws IOException {
+ return null;
+ }
+ @Override
+ public void validate() {
+
+ }
+ @Override
+ public Coder<String> getDefaultOutputCoder() {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java
deleted file mode 100644
index d38428b..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.dataflow.internal;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@see DataflowUnboundedReadFromBoundedSource}.
- */
-@RunWith(JUnit4.class)
-public class DataflowUnboundedReadFromBoundedSourceTest {
- @Test
- public void testKind() {
- DataflowUnboundedReadFromBoundedSource<?> read = new
- DataflowUnboundedReadFromBoundedSource<>(new NoopNamedSource());
-
- assertEquals("Read(NoopNamedSource)", read.getKindString());
- }
-
- @Test
- public void testKindAnonymousSource() {
- NoopNamedSource anonSource = new NoopNamedSource() {};
- DataflowUnboundedReadFromBoundedSource<?> read = new
- DataflowUnboundedReadFromBoundedSource<>(anonSource);
-
- assertEquals("Read(AnonymousSource)", read.getKindString());
- }
-
- /** Source implementation only useful for its identity. */
- static class NoopNamedSource extends BoundedSource<String> {
- @Override
- public List<? extends BoundedSource<String>> splitIntoBundles(long desiredBundleSizeBytes,
- PipelineOptions options) throws Exception {
- return null;
- }
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- return 0;
- }
- @Override
- public BoundedReader<String> createReader(
- PipelineOptions options) throws IOException {
- return null;
- }
- @Override
- public void validate() {
-
- }
- @Override
- public Coder<String> getDefaultOutputCoder() {
- return null;
- }
- }
-}