You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:47 UTC
[20/28] beam git commit: Revert "[BEAM-2610] This closes #3553"
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
index caad7f8..e7542cb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.values.PCollectionView;
/** A {@link DataflowRunner} marker class for creating a {@link PCollectionView}. */
public class CreateDataflowView<ElemT, ViewT>
- extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
+ extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> of(PCollectionView<ViewT> view) {
return new CreateDataflowView<>(view);
}
@@ -36,10 +36,8 @@ public class CreateDataflowView<ElemT, ViewT>
}
@Override
- public PCollection<ElemT> expand(PCollection<ElemT> input) {
- return PCollection.<ElemT>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
- .setCoder(input.getCoder());
+ public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
+ return view;
}
public PCollectionView<ViewT> getView() {
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index f1783de..8eaf61b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -56,8 +56,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
-import org.apache.beam.runners.core.construction.SplittableParDo;
-import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly;
import org.apache.beam.runners.dataflow.DataflowRunner.CombineGroupedValues;
@@ -397,9 +395,7 @@ public class DataflowPipelineTranslator {
@Override
public <InputT extends PValue> InputT getInput(PTransform<InputT, ?> transform) {
- return (InputT)
- Iterables.getOnlyElement(
- TransformInputs.nonAdditionalInputs(getCurrentTransform(transform)));
+ return (InputT) Iterables.getOnlyElement(getInputs(transform).values());
}
@Override
@@ -444,14 +440,6 @@ public class DataflowPipelineTranslator {
public void visitValue(PValue value, TransformHierarchy.Node producer) {
LOG.debug("Checking translation of {}", value);
// Primitive transforms are the only ones assigned step names.
- if (producer.getTransform() instanceof CreateDataflowView) {
- // CreateDataflowView produces a dummy output (as it must be a primitive transform) but
- // in the Dataflow Job graph produces only the view and not the output PCollection.
- asOutputReference(
- ((CreateDataflowView) producer.getTransform()).getView(),
- producer.toAppliedPTransform(getPipeline()));
- return;
- }
asOutputReference(value, producer.toAppliedPTransform(getPipeline()));
}
@@ -477,7 +465,6 @@ public class DataflowPipelineTranslator {
StepTranslator stepContext = new StepTranslator(this, step);
stepContext.addInput(PropertyNames.USER_NAME, getFullName(transform));
stepContext.addDisplayData(step, stepName, transform);
- LOG.info("Adding {} as step {}", getCurrentTransform(transform).getFullName(), stepName);
return stepContext;
}
@@ -690,7 +677,7 @@ public class DataflowPipelineTranslator {
context.addStep(transform, "CollectionToSingleton");
PCollection<ElemT> input = context.getInput(transform);
stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
- stepContext.addCollectionToSingletonOutput(input, transform.getView());
+ stepContext.addCollectionToSingletonOutput(input, context.getOutput(transform));
}
});
@@ -793,7 +780,6 @@ public class DataflowPipelineTranslator {
context.getPipelineOptions().as(StreamingOptions.class).isStreaming();
boolean disallowCombinerLifting =
!windowingStrategy.getWindowFn().isNonMerging()
- || !windowingStrategy.getWindowFn().assignsToOneWindow()
|| (isStreaming && !transform.fewKeys())
// TODO: Allow combiner lifting on the non-default trigger, as appropriate.
|| !(windowingStrategy.getTrigger() instanceof DefaultTrigger);
@@ -888,45 +874,6 @@ public class DataflowPipelineTranslator {
// IO Translation.
registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
-
- ///////////////////////////////////////////////////////////////////////////
- // Splittable DoFn translation.
-
- registerTransformTranslator(
- SplittableParDo.ProcessKeyedElements.class,
- new TransformTranslator<SplittableParDo.ProcessKeyedElements>() {
- @Override
- public void translate(
- SplittableParDo.ProcessKeyedElements transform, TranslationContext context) {
- translateTyped(transform, context);
- }
-
- private <InputT, OutputT, RestrictionT> void translateTyped(
- SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT> transform,
- TranslationContext context) {
- StepTranslationContext stepContext =
- context.addStep(transform, "SplittableProcessKeyed");
-
- translateInputs(
- stepContext, context.getInput(transform), transform.getSideInputs(), context);
- BiMap<Long, TupleTag<?>> outputMap =
- translateOutputs(context.getOutputs(transform), stepContext);
- stepContext.addInput(
- PropertyNames.SERIALIZED_FN,
- byteArrayToJsonString(
- serializeToByteArray(
- DoFnInfo.forFn(
- transform.getFn(),
- transform.getInputWindowingStrategy(),
- transform.getSideInputs(),
- transform.getElementCoder(),
- outputMap.inverse().get(transform.getMainOutputTag()),
- outputMap))));
- stepContext.addInput(
- PropertyNames.RESTRICTION_CODER,
- CloudObjects.asCloudObject(transform.getRestrictionCoder()));
- }
- });
}
private static void translateInputs(
@@ -973,11 +920,6 @@ public class DataflowPipelineTranslator {
fn));
}
- if (signature.usesState() || signature.usesTimers()) {
- DataflowRunner.verifyStateSupported(fn);
- DataflowRunner.verifyStateSupportForWindowingStrategy(windowingStrategy);
- }
-
stepContext.addInput(PropertyNames.USER_FN, fn.getClass().getName());
stepContext.addInput(
PropertyNames.SERIALIZED_FN,
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 8935759..3e7c8ce 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -67,12 +67,11 @@ import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.core.construction.UnconsumedReads;
-import org.apache.beam.runners.core.construction.WriteFilesTranslation;
+import org.apache.beam.runners.dataflow.BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.runners.dataflow.util.DataflowTemplateJob;
import org.apache.beam.runners.dataflow.util.DataflowTransport;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -93,7 +92,6 @@ import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
@@ -107,8 +105,6 @@ import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
-import org.apache.beam.sdk.state.MapState;
-import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.GroupedValues;
import org.apache.beam.sdk.transforms.Create;
@@ -121,8 +117,6 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.InstanceBuilder;
@@ -135,12 +129,10 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.DateTimeUtils;
@@ -333,24 +325,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
new StreamingFnApiCreateOverrideFactory()));
}
overridesBuilder
- // Support Splittable DoFn for now only in streaming mode.
- // The order of the following overrides is important because they are applied in order.
-
- // By default Dataflow runner replaces single-output ParDo with a ParDoSingle override.
- // However, we want a different expansion for single-output splittable ParDo.
- .add(
- PTransformOverride.of(
- PTransformMatchers.splittableParDoSingle(),
- new ReflectiveOneToOneOverrideFactory(
- SplittableParDoOverrides.ParDoSingleViaMulti.class, this)))
- .add(
- PTransformOverride.of(
- PTransformMatchers.splittableParDoMulti(),
- new SplittableParDoOverrides.SplittableParDoOverrideFactory()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.writeWithRunnerDeterminedSharding(),
- new StreamingShardedWriteFactory(options)))
.add(
// Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and
// must precede it
@@ -376,29 +350,34 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
PTransformOverride.of(
PTransformMatchers.stateOrTimerParDoSingle(),
BatchStatefulParDoOverrides.singleOutputOverrideFactory()))
+
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+ new BatchCombineGloballyAsSingletonViewFactory(this)))
.add(
PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.MapViewFn.class),
+ PTransformMatchers.classEqualTo(View.AsMap.class),
new ReflectiveOneToOneOverrideFactory(
BatchViewOverrides.BatchViewAsMap.class, this)))
.add(
PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.MultimapViewFn.class),
+ PTransformMatchers.classEqualTo(View.AsMultimap.class),
new ReflectiveOneToOneOverrideFactory(
BatchViewOverrides.BatchViewAsMultimap.class, this)))
.add(
PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.SingletonViewFn.class),
+ PTransformMatchers.classEqualTo(View.AsSingleton.class),
new ReflectiveOneToOneOverrideFactory(
BatchViewOverrides.BatchViewAsSingleton.class, this)))
.add(
PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.ListViewFn.class),
+ PTransformMatchers.classEqualTo(View.AsList.class),
new ReflectiveOneToOneOverrideFactory(
BatchViewOverrides.BatchViewAsList.class, this)))
.add(
PTransformOverride.of(
- PTransformMatchers.createViewWithViewFn(PCollectionViews.IterableViewFn.class),
+ PTransformMatchers.classEqualTo(View.AsIterable.class),
new ReflectiveOneToOneOverrideFactory(
BatchViewOverrides.BatchViewAsIterable.class, this)));
}
@@ -1455,61 +1434,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@VisibleForTesting
- static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT>
- implements PTransformOverrideFactory<
- PCollection<UserT>, PDone, WriteFiles<UserT, DestinationT, OutputT>> {
- // We pick 10 as a a default, as it works well with the default number of workers started
- // by Dataflow.
- static final int DEFAULT_NUM_SHARDS = 10;
- DataflowPipelineWorkerPoolOptions options;
-
- StreamingShardedWriteFactory(PipelineOptions options) {
- this.options = options.as(DataflowPipelineWorkerPoolOptions.class);
- }
-
- @Override
- public PTransformReplacement<PCollection<UserT>, PDone> getReplacementTransform(
- AppliedPTransform<PCollection<UserT>, PDone, WriteFiles<UserT, DestinationT, OutputT>>
- transform) {
- // By default, if numShards is not set WriteFiles will produce one file per bundle. In
- // streaming, there are large numbers of small bundles, resulting in many tiny files.
- // Instead we pick max workers * 2 to ensure full parallelism, but prevent too-many files.
- // (current_num_workers * 2 might be a better choice, but that value is not easily available
- // today).
- // If the user does not set either numWorkers or maxNumWorkers, default to 10 shards.
- int numShards;
- if (options.getMaxNumWorkers() > 0) {
- numShards = options.getMaxNumWorkers() * 2;
- } else if (options.getNumWorkers() > 0) {
- numShards = options.getNumWorkers() * 2;
- } else {
- numShards = DEFAULT_NUM_SHARDS;
- }
-
- try {
- WriteFiles<UserT, DestinationT, OutputT> replacement =
- WriteFiles.<UserT, DestinationT, OutputT>to(
- WriteFilesTranslation.<UserT, DestinationT, OutputT>getSink(transform),
- WriteFilesTranslation.<UserT, OutputT>getFormatFunction(transform));
- if (WriteFilesTranslation.isWindowedWrites(transform)) {
- replacement = replacement.withWindowedWrites();
- }
- return PTransformReplacement.of(
- PTransformReplacements.getSingletonMainInput(transform),
- replacement.withNumShards(numShards));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs,
- PDone newOutput) {
- return Collections.emptyMap();
- }
- }
-
- @VisibleForTesting
static String getContainerImageForJob(DataflowPipelineOptions options) {
String workerHarnessContainerImage = options.getWorkerHarnessContainerImage();
if (!workerHarnessContainerImage.contains("IMAGE")) {
@@ -1522,39 +1446,4 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
return workerHarnessContainerImage.replace("IMAGE", "beam-java-batch");
}
}
-
- static void verifyStateSupported(DoFn<?, ?> fn) {
- DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
-
- for (DoFnSignature.StateDeclaration stateDecl : signature.stateDeclarations().values()) {
-
- // https://issues.apache.org/jira/browse/BEAM-1474
- if (stateDecl.stateType().isSubtypeOf(TypeDescriptor.of(MapState.class))) {
- throw new UnsupportedOperationException(String.format(
- "%s does not currently support %s",
- DataflowRunner.class.getSimpleName(),
- MapState.class.getSimpleName()
- ));
- }
-
- // https://issues.apache.org/jira/browse/BEAM-1479
- if (stateDecl.stateType().isSubtypeOf(TypeDescriptor.of(SetState.class))) {
- throw new UnsupportedOperationException(String.format(
- "%s does not currently support %s",
- DataflowRunner.class.getSimpleName(),
- SetState.class.getSimpleName()
- ));
- }
- }
- }
-
- static void verifyStateSupportForWindowingStrategy(WindowingStrategy strategy) {
- // https://issues.apache.org/jira/browse/BEAM-2507
- if (!strategy.getWindowFn().isNonMerging()) {
- throw new UnsupportedOperationException(
- String.format(
- "%s does not currently support state or timers with merging windows",
- DataflowRunner.class.getSimpleName()));
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
deleted file mode 100644
index fc010f8..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow;
-
-import java.util.Map;
-import org.apache.beam.runners.core.construction.ForwardingPTransform;
-import org.apache.beam.runners.core.construction.PTransformReplacements;
-import org.apache.beam.runners.core.construction.ReplacementOutputs;
-import org.apache.beam.runners.core.construction.SplittableParDo;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-
-/** Transform overrides for supporting {@link SplittableParDo} in the Dataflow runner. */
-class SplittableParDoOverrides {
- static class ParDoSingleViaMulti<InputT, OutputT>
- extends ForwardingPTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
- private final ParDo.SingleOutput<InputT, OutputT> original;
-
- public ParDoSingleViaMulti(
- DataflowRunner ignored, ParDo.SingleOutput<InputT, OutputT> original) {
- this.original = original;
- }
-
- @Override
- protected PTransform<PCollection<? extends InputT>, PCollection<OutputT>> delegate() {
- return original;
- }
-
- @Override
- public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
- TupleTag<OutputT> mainOutput = new TupleTag<>();
- return input.apply(original.withOutputTags(mainOutput, TupleTagList.empty())).get(mainOutput);
- }
- }
-
- static class SplittableParDoOverrideFactory<InputT, OutputT, RestrictionT>
- implements PTransformOverrideFactory<
- PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>> {
- @Override
- public PTransformReplacement<PCollection<InputT>, PCollectionTuple> getReplacementTransform(
- AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>>
- appliedTransform) {
- return PTransformReplacement.of(
- PTransformReplacements.getSingletonMainInput(appliedTransform),
- SplittableParDo.forJavaParDo(appliedTransform.getTransform()));
- }
-
- @Override
- public Map<PValue, ReplacementOutput> mapOutputs(
- Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) {
- return ReplacementOutputs.tagged(outputs, newOutput);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
index 1853248..6c385d7 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
@@ -42,12 +42,12 @@ import org.apache.beam.sdk.values.PCollectionView;
class StreamingViewOverrides {
static class StreamingCreatePCollectionViewFactory<ElemT, ViewT>
extends SingleInputOutputOverrideFactory<
- PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> {
+ PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> {
@Override
- public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>>
+ public PTransformReplacement<PCollection<ElemT>, PCollectionView<ViewT>>
getReplacementTransform(
AppliedPTransform<
- PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>>
+ PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>>
transform) {
StreamingCreatePCollectionView<ElemT, ViewT> streamingView =
new StreamingCreatePCollectionView<>(transform.getTransform().getView());
@@ -56,7 +56,7 @@ class StreamingViewOverrides {
}
private static class StreamingCreatePCollectionView<ElemT, ViewT>
- extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
+ extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
private final PCollectionView<ViewT> view;
private StreamingCreatePCollectionView(PCollectionView<ViewT> view) {
@@ -64,7 +64,7 @@ class StreamingViewOverrides {
}
@Override
- public PCollection<ElemT> expand(PCollection<ElemT> input) {
+ public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
return input
.apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults())
.apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
index 06ed1e0..a7452b2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -22,7 +22,6 @@ import java.util.Map;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.OutputReference;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
@@ -37,8 +36,7 @@ import org.apache.beam.sdk.values.TupleTag;
* A {@link TransformTranslator} knows how to translate a particular subclass of {@link PTransform}
* for the Cloud Dataflow service. It does so by mutating the {@link TranslationContext}.
*/
-@Internal
-public interface TransformTranslator<TransformT extends PTransform> {
+interface TransformTranslator<TransformT extends PTransform> {
void translate(TransformT transform, TranslationContext context);
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
index 55e0c4e..f82f1f1 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
@@ -63,5 +63,4 @@ public class PropertyNames {
public static final String USES_KEYED_STATE = "uses_keyed_state";
public static final String VALUE = "value";
public static final String DISPLAY_DATA = "display_data";
- public static final String RESTRICTION_CODER = "restriction_coder";
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java
index 172dc6e..bff379f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.dataflow.util;
-import com.google.common.base.Strings;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
@@ -99,19 +98,26 @@ public final class TimeUtil {
int hour = Integer.valueOf(matcher.group(4));
int minute = Integer.valueOf(matcher.group(5));
int second = Integer.valueOf(matcher.group(6));
- int millis = computeMillis(matcher.group(7));
+ int millis = 0;
+
+ String frac = matcher.group(7);
+ if (frac != null) {
+ int fracs = Integer.valueOf(frac);
+ if (frac.length() == 3) { // millisecond resolution
+ millis = fracs;
+ } else if (frac.length() == 6) { // microsecond resolution
+ millis = fracs / 1000;
+ } else if (frac.length() == 9) { // nanosecond resolution
+ millis = fracs / 1000000;
+ } else {
+ return null;
+ }
+ }
return new DateTime(year, month, day, hour, minute, second, millis,
ISOChronology.getInstanceUTC()).toInstant();
}
- private static int computeMillis(String frac) {
- if (frac == null) {
- return 0;
- }
- return Integer.valueOf(frac.length() > 3 ? frac.substring(0, 3) : Strings.padEnd(frac, 3, '0'));
- }
-
/**
* Converts a {@link ReadableDuration} into a Dataflow API duration string.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 43b2788..89dc2d5 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -18,14 +18,11 @@
package org.apache.beam.runners.dataflow;
import static org.apache.beam.runners.dataflow.util.Structs.getString;
-import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
-import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -69,22 +66,17 @@ import java.util.Set;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
-import org.apache.beam.runners.dataflow.util.CloudObject;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
-import org.apache.beam.runners.dataflow.util.DoFnInfo;
import org.apache.beam.runners.dataflow.util.OutputReference;
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.runners.dataflow.util.Structs;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
@@ -99,12 +91,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -113,8 +100,6 @@ import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.WindowingStrategy;
-import org.hamcrest.Matchers;
-import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -911,68 +896,6 @@ public class DataflowPipelineTranslatorTest implements Serializable {
not(equalTo("true")));
}
- /**
- * Smoke test to fail fast if translation of a splittable ParDo
- * in streaming breaks.
- */
- @Test
- public void testStreamingSplittableParDoTranslation() throws Exception {
- DataflowPipelineOptions options = buildPipelineOptions();
- DataflowRunner runner = DataflowRunner.fromOptions(options);
- options.setStreaming(true);
- DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
-
- Pipeline pipeline = Pipeline.create(options);
-
- PCollection<String> windowedInput = pipeline
- .apply(Create.of("a"))
- .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
- windowedInput.apply(ParDo.of(new TestSplittableFn()));
-
- runner.replaceTransforms(pipeline);
-
- Job job =
- translator
- .translate(
- pipeline,
- runner,
- Collections.<DataflowPackage>emptyList())
- .getJob();
-
- // The job should contain a SplittableParDo.ProcessKeyedElements step, translated as
- // "SplittableProcessKeyed".
-
- List<Step> steps = job.getSteps();
- Step processKeyedStep = null;
- for (Step step : steps) {
- if (step.getKind().equals("SplittableProcessKeyed")) {
- assertNull(processKeyedStep);
- processKeyedStep = step;
- }
- }
- assertNotNull(processKeyedStep);
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- DoFnInfo<String, Integer> fnInfo =
- (DoFnInfo<String, Integer>)
- SerializableUtils.deserializeFromByteArray(
- jsonStringToByteArray(
- Structs.getString(
- processKeyedStep.getProperties(), PropertyNames.SERIALIZED_FN)),
- "DoFnInfo");
- assertThat(fnInfo.getDoFn(), instanceOf(TestSplittableFn.class));
- assertThat(
- fnInfo.getWindowingStrategy().getWindowFn(),
- Matchers.<WindowFn>equalTo(FixedWindows.of(Duration.standardMinutes(1))));
- Coder<?> restrictionCoder =
- CloudObjects.coderFromCloudObject(
- (CloudObject)
- Structs.getObject(
- processKeyedStep.getProperties(), PropertyNames.RESTRICTION_CODER));
-
- assertEquals(SerializableCoder.of(OffsetRange.class), restrictionCoder);
- }
-
@Test
public void testToSingletonTranslationWithIsmSideInput() throws Exception {
// A "change detector" test that makes sure the translation
@@ -997,15 +920,15 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertAllStepOutputsHaveUniqueIds(job);
List<Step> steps = job.getSteps();
- assertEquals(9, steps.size());
+ assertEquals(5, steps.size());
@SuppressWarnings("unchecked")
List<Map<String, Object>> toIsmRecordOutputs =
- (List<Map<String, Object>>) steps.get(7).getProperties().get(PropertyNames.OUTPUT_INFO);
+ (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO);
assertTrue(
Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
- Step collectionToSingletonStep = steps.get(8);
+ Step collectionToSingletonStep = steps.get(4);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}
@@ -1167,16 +1090,4 @@ public class DataflowPipelineTranslatorTest implements Serializable {
assertTrue(String.format("Found duplicate output ids %s", outputIds),
outputIds.size() == 0);
}
-
- private static class TestSplittableFn extends DoFn<String, Integer> {
- @ProcessElement
- public void process(ProcessContext c, OffsetRangeTracker tracker) {
- // noop
- }
-
- @GetInitialRestriction
- public OffsetRange getInitialRange(String element) {
- return null;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 94985f8..8f10b18 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -23,7 +23,6 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -50,7 +49,6 @@ import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.Serializable;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.channels.FileChannel;
@@ -64,60 +62,36 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
-import org.apache.beam.runners.dataflow.DataflowRunner.StreamingShardedWriteFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
-import org.apache.beam.sdk.io.DynamicFileDestinations;
-import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
-import org.apache.beam.sdk.state.MapState;
-import org.apache.beam.sdk.state.SetState;
-import org.apache.beam.sdk.state.StateSpec;
-import org.apache.beam.sdk.state.StateSpecs;
-import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.hamcrest.Description;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
-import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
@@ -133,11 +107,9 @@ import org.mockito.stubbing.Answer;
/**
* Tests for the {@link DataflowRunner}.
- *
- * <p>Implements {@link Serializable} because it is caught in closures.
*/
@RunWith(JUnit4.class)
-public class DataflowRunnerTest implements Serializable {
+public class DataflowRunnerTest {
private static final String VALID_STAGING_BUCKET = "gs://valid-bucket/staging";
private static final String VALID_TEMP_BUCKET = "gs://valid-bucket/temp";
@@ -147,12 +119,15 @@ public class DataflowRunnerTest implements Serializable {
private static final String PROJECT_ID = "some-project";
private static final String REGION_ID = "some-region-1";
- @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
- @Rule public transient ExpectedException thrown = ExpectedException.none();
- @Rule public transient ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class);
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+ @Rule
+ public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class);
- private transient Dataflow.Projects.Locations.Jobs mockJobs;
- private transient GcsUtil mockGcsUtil;
+ private Dataflow.Projects.Locations.Jobs mockJobs;
+ private GcsUtil mockGcsUtil;
// Asserts that the given Job has all expected fields set.
private static void assertValidJob(Job job) {
@@ -848,6 +823,7 @@ public class DataflowRunnerTest implements Serializable {
DataflowRunner.fromOptions(options);
}
+
@Test
public void testValidProfileLocation() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
@@ -1015,71 +991,6 @@ public class DataflowRunnerTest implements Serializable {
assertTrue(transform.translated);
}
- private void verifyMapStateUnsupported(PipelineOptions options) throws Exception {
- Pipeline p = Pipeline.create(options);
- p.apply(Create.of(KV.of(13, 42)))
- .apply(
- ParDo.of(
- new DoFn<KV<Integer, Integer>, Void>() {
- @StateId("fizzle")
- private final StateSpec<MapState<Void, Void>> voidState = StateSpecs.map();
-
- @ProcessElement
- public void process() {}
- }));
-
- thrown.expectMessage("MapState");
- thrown.expect(UnsupportedOperationException.class);
- p.run();
- }
-
- @Test
- public void testMapStateUnsupportedInBatch() throws Exception {
- PipelineOptions options = buildPipelineOptions();
- options.as(StreamingOptions.class).setStreaming(false);
- verifyMapStateUnsupported(options);
- }
-
- @Test
- public void testMapStateUnsupportedInStreaming() throws Exception {
- PipelineOptions options = buildPipelineOptions();
- options.as(StreamingOptions.class).setStreaming(true);
- verifyMapStateUnsupported(options);
- }
-
- private void verifySetStateUnsupported(PipelineOptions options) throws Exception {
- Pipeline p = Pipeline.create(options);
- p.apply(Create.of(KV.of(13, 42)))
- .apply(
- ParDo.of(
- new DoFn<KV<Integer, Integer>, Void>() {
- @StateId("fizzle")
- private final StateSpec<SetState<Void>> voidState = StateSpecs.set();
-
- @ProcessElement
- public void process() {}
- }));
-
- thrown.expectMessage("SetState");
- thrown.expect(UnsupportedOperationException.class);
- p.run();
- }
-
- @Test
- public void testSetStateUnsupportedInBatch() throws Exception {
- PipelineOptions options = buildPipelineOptions();
- options.as(StreamingOptions.class).setStreaming(false);
- Pipeline p = Pipeline.create(options);
- verifySetStateUnsupported(options);
- }
-
- @Test
- public void testSetStateUnsupportedInStreaming() throws Exception {
- PipelineOptions options = buildPipelineOptions();
- options.as(StreamingOptions.class).setStreaming(true);
- verifySetStateUnsupported(options);
- }
-
/** Records all the composite transforms visited within the Pipeline. */
private static class CompositeTransformRecorder extends PipelineVisitor.Defaults {
private List<PTransform<?, ?>> transforms = new ArrayList<>();
@@ -1136,8 +1047,8 @@ public class DataflowRunnerTest implements Serializable {
}
/**
- * Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally when the
- * runner is successfully run.
+ * Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally
+ * when the runner issuccessfully run.
*/
@Test
public void testTemplateRunnerFullCompletion() throws Exception {
@@ -1216,89 +1127,4 @@ public class DataflowRunnerTest implements Serializable {
assertThat(
getContainerImageForJob(options), equalTo("gcr.io/java/foo"));
}
-
- @Test
- public void testStreamingWriteWithNoShardingReturnsNewTransform() {
- PipelineOptions options = TestPipeline.testingPipelineOptions();
- options.as(DataflowPipelineWorkerPoolOptions.class).setMaxNumWorkers(10);
- testStreamingWriteOverride(options, 20);
- }
-
- @Test
- public void testStreamingWriteWithNoShardingReturnsNewTransformMaxWorkersUnset() {
- PipelineOptions options = TestPipeline.testingPipelineOptions();
- testStreamingWriteOverride(options, StreamingShardedWriteFactory.DEFAULT_NUM_SHARDS);
- }
-
- private void verifyMergingStatefulParDoRejected(PipelineOptions options) throws Exception {
- Pipeline p = Pipeline.create(options);
-
- p.apply(Create.of(KV.of(13, 42)))
- .apply(Window.<KV<Integer, Integer>>into(Sessions.withGapDuration(Duration.millis(1))))
- .apply(ParDo.of(new DoFn<KV<Integer, Integer>, Void>() {
- @StateId("fizzle")
- private final StateSpec<ValueState<Void>> voidState = StateSpecs.value();
-
- @ProcessElement
- public void process() {}
- }));
-
- thrown.expectMessage("merging");
- thrown.expect(UnsupportedOperationException.class);
- p.run();
- }
-
- @Test
- public void testMergingStatefulRejectedInStreaming() throws Exception {
- PipelineOptions options = buildPipelineOptions();
- options.as(StreamingOptions.class).setStreaming(true);
- verifyMergingStatefulParDoRejected(options);
- }
-
- @Test
- public void testMergingStatefulRejectedInBatch() throws Exception {
- PipelineOptions options = buildPipelineOptions();
- options.as(StreamingOptions.class).setStreaming(false);
- verifyMergingStatefulParDoRejected(options);
- }
-
- private void testStreamingWriteOverride(PipelineOptions options, int expectedNumShards) {
- TestPipeline p = TestPipeline.fromOptions(options);
-
- StreamingShardedWriteFactory<Object, Void, Object> factory =
- new StreamingShardedWriteFactory<>(p.getOptions());
- WriteFiles<Object, Void, Object> original =
- WriteFiles.to(new TestSink(tmpFolder.toString()), SerializableFunctions.identity());
- PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
- AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object, Void, Object>>
- originalApplication =
- AppliedPTransform.of(
- "writefiles",
- objs.expand(),
- Collections.<TupleTag<?>, PValue>emptyMap(),
- original,
- p);
-
- WriteFiles<Object, Void, Object> replacement =
- (WriteFiles<Object, Void, Object>)
- factory.getReplacementTransform(originalApplication).getTransform();
- assertThat(replacement, not(equalTo((Object) original)));
- assertThat(replacement.getNumShards().get(), equalTo(expectedNumShards));
- }
-
- private static class TestSink extends FileBasedSink<Object, Void> {
- @Override
- public void validate(PipelineOptions options) {}
-
- TestSink(String tmpFolder) {
- super(
- StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)),
- DynamicFileDestinations.constant(null));
- }
-
- @Override
- public WriteOperation<Object, Void> createWriteOperation() {
- throw new IllegalArgumentException("Should not be used");
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java
index 1ac9fab..e0785d4 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java
@@ -47,14 +47,8 @@ public final class TimeUtilTest {
assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001001Z"));
assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000000Z"));
assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000001Z"));
- assertEquals(new Instant(0), fromCloudTime("1970-01-01T00:00:00.0Z"));
- assertEquals(new Instant(0), fromCloudTime("1970-01-01T00:00:00.00Z"));
- assertEquals(new Instant(420), fromCloudTime("1970-01-01T00:00:00.42Z"));
- assertEquals(new Instant(300), fromCloudTime("1970-01-01T00:00:00.3Z"));
- assertEquals(new Instant(20), fromCloudTime("1970-01-01T00:00:00.02Z"));
assertNull(fromCloudTime(""));
assertNull(fromCloudTime("1970-01-01T00:00:00"));
- assertNull(fromCloudTime("1970-01-01T00:00:00.1e3Z"));
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index b00ba9c..38aada8 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 7f70204..ddb4aca 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -34,6 +34,8 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <spark.version>1.6.3</spark.version>
+ <hadoop.version>2.2.0</hadoop.version>
<kafka.version>0.9.0.1</kafka.version>
<jackson.version>2.4.4</jackson.version>
<dropwizard.metrics.version>3.1.2</dropwizard.metrics.version>
@@ -101,27 +103,6 @@
<threadCount>4</threadCount>
</configuration>
</execution>
- <execution>
- <id>streaming-tests</id>
- <phase>test</phase>
- <goals>
- <goal>test</goal>
- </goals>
- <configuration>
- <groups>
- org.apache.beam.runners.spark.StreamingTest
- </groups>
- <systemPropertyVariables>
- <beamTestPipelineOptions>
- [
- "--runner=TestSparkRunner",
- "--forceStreaming=true",
- "--enableSparkMetricSinks=true"
- ]
- </beamTestPipelineOptions>
- </systemPropertyVariables>
- </configuration>
- </execution>
</executions>
</plugin>
</plugins>
@@ -133,33 +114,31 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
+ <version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
+ <version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_2.10</artifactId>
+ <version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
<scope>provided</scope>
- <exclusions>
- <!-- Fix build on JDK-9 -->
- <exclusion>
- <groupId>jdk.tools</groupId>
- <artifactId>jdk.tools</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
@@ -216,11 +195,6 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-text</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
@@ -347,13 +321,6 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-core-java</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
@@ -393,6 +360,27 @@
</systemPropertyVariables>
</configuration>
</execution>
+ <execution>
+ <id>streaming-tests</id>
+ <phase>test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <groups>
+ org.apache.beam.runners.spark.StreamingTest
+ </groups>
+ <systemPropertyVariables>
+ <beamTestPipelineOptions>
+ [
+ "--runner=TestSparkRunner",
+ "--forceStreaming=true",
+ "--enableSparkMetricSinks=true"
+ ]
+ </beamTestPipelineOptions>
+ </systemPropertyVariables>
+ </configuration>
+ </execution>
</executions>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
index 6972acb..d75c955 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
@@ -35,7 +35,8 @@ import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
-import org.apache.commons.text.WordUtils;
+import org.apache.commons.lang3.text.WordUtils;
+
/**
* Pipeline visitor for translating a Beam pipeline into equivalent Spark operations.
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 595521f..9e2426e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -26,7 +26,6 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.metrics.AggregatorMetricSource;
@@ -171,7 +170,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
}
// register Watermarks listener to broadcast the advanced WMs.
- jssc.addStreamingListener(new JavaStreamingListenerWrapper(new WatermarksListener()));
+ jssc.addStreamingListener(new JavaStreamingListenerWrapper(new WatermarksListener(jssc)));
// The reason we call initAccumulators here even though it is called in
// SparkRunnerStreamingContextFactory is because the factory is not called when resuming
@@ -360,12 +359,10 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
protected boolean shouldDefer(TransformHierarchy.Node node) {
// if the input is not a PCollection, or it is but with non merging windows, don't defer.
- Collection<PValue> nonAdditionalInputs =
- TransformInputs.nonAdditionalInputs(node.toAppliedPTransform(getPipeline()));
- if (nonAdditionalInputs.size() != 1) {
+ if (node.getInputs().size() != 1) {
return false;
}
- PValue input = Iterables.getOnlyElement(nonAdditionalInputs);
+ PValue input = Iterables.getOnlyElement(node.getInputs().values());
if (!(input instanceof PCollection)
|| ((PCollection) input).getWindowingStrategy().getWindowFn().isNonMerging()) {
return false;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index a13a3b1..eccee57 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -169,7 +169,7 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
result.waitUntilFinish(Duration.millis(batchDurationMillis));
do {
SparkTimerInternals sparkTimerInternals =
- SparkTimerInternals.global(GlobalWatermarkHolder.get(batchDurationMillis));
+ SparkTimerInternals.global(GlobalWatermarkHolder.get());
sparkTimerInternals.advanceWatermark();
globalWatermark = sparkTimerInternals.currentInputWatermarkTime();
// let another batch-interval period of execution, just to reason about WM propagation.
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 1385e07..be4f3f6 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -104,15 +104,13 @@ public class SparkGroupAlsoByWindowViaWindowSet {
public static <K, InputT, W extends BoundedWindow>
JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> groupAlsoByWindow(
- final JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> inputDStream,
+ JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> inputDStream,
final Coder<K> keyCoder,
final Coder<WindowedValue<InputT>> wvCoder,
final WindowingStrategy<?, W> windowingStrategy,
final SparkRuntimeContext runtimeContext,
final List<Integer> sourceIds) {
- final long batchDurationMillis =
- runtimeContext.getPipelineOptions().as(SparkPipelineOptions.class).getBatchIntervalMillis();
final IterableCoder<WindowedValue<InputT>> itrWvCoder = IterableCoder.of(wvCoder);
final Coder<InputT> iCoder = ((FullWindowedValueCoder<InputT>) wvCoder).getValueCoder();
final Coder<? extends BoundedWindow> wCoder =
@@ -241,7 +239,7 @@ public class SparkGroupAlsoByWindowViaWindowSet {
SparkStateInternals<K> stateInternals;
SparkTimerInternals timerInternals = SparkTimerInternals.forStreamFromSources(
- sourceIds, GlobalWatermarkHolder.get(batchDurationMillis));
+ sourceIds, GlobalWatermarkHolder.get());
// get state(internals) per key.
if (prevStateAndTimersOpt.isEmpty()) {
// no previous state.
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
index a68da55..107915f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
@@ -34,6 +34,7 @@ import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.spark.broadcast.Broadcast;
import org.joda.time.Instant;
@@ -57,10 +58,10 @@ public class SparkTimerInternals implements TimerInternals {
/** Build the {@link TimerInternals} according to the feeding streams. */
public static SparkTimerInternals forStreamFromSources(
List<Integer> sourceIds,
- Map<Integer, SparkWatermarks> watermarks) {
- // if watermarks are invalid for the specific ids, use defaults.
- if (watermarks == null || watermarks.isEmpty()
- || Collections.disjoint(sourceIds, watermarks.keySet())) {
+ @Nullable Broadcast<Map<Integer, SparkWatermarks>> broadcast) {
+ // if broadcast is invalid for the specific ids, use defaults.
+ if (broadcast == null || broadcast.getValue().isEmpty()
+ || Collections.disjoint(sourceIds, broadcast.getValue().keySet())) {
return new SparkTimerInternals(
BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(0));
}
@@ -70,7 +71,7 @@ public class SparkTimerInternals implements TimerInternals {
// synchronized processing time should clearly be synchronized.
Instant synchronizedProcessingTime = null;
for (Integer sourceId: sourceIds) {
- SparkWatermarks sparkWatermarks = watermarks.get(sourceId);
+ SparkWatermarks sparkWatermarks = broadcast.getValue().get(sourceId);
if (sparkWatermarks != null) {
// keep slowest WMs.
slowestLowWatermark = slowestLowWatermark.isBefore(sparkWatermarks.getLowWatermark())
@@ -93,9 +94,10 @@ public class SparkTimerInternals implements TimerInternals {
}
/** Build a global {@link TimerInternals} for all feeding streams.*/
- public static SparkTimerInternals global(Map<Integer, SparkWatermarks> watermarks) {
- return watermarks == null ? forStreamFromSources(Collections.<Integer>emptyList(), null)
- : forStreamFromSources(Lists.newArrayList(watermarks.keySet()), watermarks);
+ public static SparkTimerInternals global(
+ @Nullable Broadcast<Map<Integer, SparkWatermarks>> broadcast) {
+ return broadcast == null ? forStreamFromSources(Collections.<Integer>emptyList(), null)
+ : forStreamFromSources(Lists.newArrayList(broadcast.getValue().keySet()), broadcast);
}
Collection<TimerData> getTimers() {
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 0c6c4d1..8102926 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -26,7 +26,6 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
-import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.sdk.Pipeline;
@@ -104,8 +103,7 @@ public class EvaluationContext {
public <T extends PValue> T getInput(PTransform<T, ?> transform) {
@SuppressWarnings("unchecked")
- T input =
- (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform()));
+ T input = (T) Iterables.getOnlyElement(getInputs(transform).values());
return input;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index ac5e0cd..64aa35a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -508,6 +508,50 @@ public final class TransformTranslator {
};
}
+ private static <T> TransformEvaluator<View.AsSingleton<T>> viewAsSingleton() {
+ return new TransformEvaluator<View.AsSingleton<T>>() {
+ @Override
+ public void evaluate(View.AsSingleton<T> transform, EvaluationContext context) {
+ Iterable<? extends WindowedValue<?>> iter =
+ context.getWindowedValues(context.getInput(transform));
+ PCollectionView<T> output = context.getOutput(transform);
+ Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
+
+ @SuppressWarnings("unchecked")
+ Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter;
+
+ context.putPView(output, iterCast, coderInternal);
+ }
+
+ @Override
+ public String toNativeString() {
+ return "collect()";
+ }
+ };
+ }
+
+ private static <T> TransformEvaluator<View.AsIterable<T>> viewAsIter() {
+ return new TransformEvaluator<View.AsIterable<T>>() {
+ @Override
+ public void evaluate(View.AsIterable<T> transform, EvaluationContext context) {
+ Iterable<? extends WindowedValue<?>> iter =
+ context.getWindowedValues(context.getInput(transform));
+ PCollectionView<Iterable<T>> output = context.getOutput(transform);
+ Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
+
+ @SuppressWarnings("unchecked")
+ Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter;
+
+ context.putPView(output, iterCast, coderInternal);
+ }
+
+ @Override
+ public String toNativeString() {
+ return "collect()";
+ }
+ };
+ }
+
private static <ReadT, WriteT> TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>
createPCollView() {
return new TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>() {
@@ -516,7 +560,7 @@ public final class TransformTranslator {
EvaluationContext context) {
Iterable<? extends WindowedValue<?>> iter =
context.getWindowedValues(context.getInput(transform));
- PCollectionView<WriteT> output = transform.getView();
+ PCollectionView<WriteT> output = context.getOutput(transform);
Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal();
@SuppressWarnings("unchecked")
@@ -601,8 +645,8 @@ public final class TransformTranslator {
EVALUATORS.put(Combine.PerKey.class, combinePerKey());
EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
EVALUATORS.put(Create.Values.class, create());
-// EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
-// EVALUATORS.put(View.AsIterable.class, viewAsIter());
+ EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
+ EVALUATORS.put(View.AsIterable.class, viewAsIter());
EVALUATORS.put(View.CreatePCollectionView.class, createPCollView());
EVALUATORS.put(Window.Assign.class, window());
EVALUATORS.put(Reshuffle.class, reshuffle());
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
index 2cb6f26..8b384d8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
@@ -21,43 +21,31 @@ package org.apache.beam.runners.spark.util;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Maps;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.spark.SparkEnv;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.storage.BlockId;
-import org.apache.spark.storage.BlockManager;
-import org.apache.spark.storage.BlockResult;
-import org.apache.spark.storage.BlockStore;
-import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
import org.joda.time.Instant;
-import scala.Option;
+
/**
- * A {@link BlockStore} variable to hold the global watermarks for a micro-batch.
+ * A {@link Broadcast} variable to hold the global watermarks for a micro-batch.
*
* <p>For each source, holds a queue for the watermarks of each micro-batch that was read,
* and advances the watermarks according to the queue (first-in-first-out).
*/
public class GlobalWatermarkHolder {
- private static final Map<Integer, Queue<SparkWatermarks>> sourceTimes = new HashMap<>();
- private static final BlockId WATERMARKS_BLOCK_ID = BlockId.apply("broadcast_0WATERMARKS");
-
- private static volatile Map<Integer, SparkWatermarks> driverWatermarks = null;
- private static volatile LoadingCache<String, Map<Integer, SparkWatermarks>> watermarkCache = null;
+ // the broadcast is broadcasted to the workers.
+ private static volatile Broadcast<Map<Integer, SparkWatermarks>> broadcast = null;
+ // this should only live in the driver so transient.
+ private static final transient Map<Integer, Queue<SparkWatermarks>> sourceTimes = new HashMap<>();
public static void add(int sourceId, SparkWatermarks sparkWatermarks) {
Queue<SparkWatermarks> timesQueue = sourceTimes.get(sourceId);
@@ -83,48 +71,22 @@ public class GlobalWatermarkHolder {
* Returns the {@link Broadcast} containing the {@link SparkWatermarks} mapped
* to their sources.
*/
- @SuppressWarnings("unchecked")
- public static Map<Integer, SparkWatermarks> get(Long cacheInterval) {
- if (driverWatermarks != null) {
- // if we are executing in local mode simply return the local values.
- return driverWatermarks;
- } else {
- if (watermarkCache == null) {
- initWatermarkCache(cacheInterval);
- }
- try {
- return watermarkCache.get("SINGLETON");
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- private static synchronized void initWatermarkCache(Long batchDuration) {
- if (watermarkCache == null) {
- watermarkCache =
- CacheBuilder.newBuilder()
- // expire watermarks every half batch duration to ensure they update in every batch.
- .expireAfterWrite(batchDuration / 2, TimeUnit.MILLISECONDS)
- .build(new WatermarksLoader());
- }
+ public static Broadcast<Map<Integer, SparkWatermarks>> get() {
+ return broadcast;
}
/**
* Advances the watermarks to the next-in-line watermarks.
* SparkWatermarks are monotonically increasing.
*/
- @SuppressWarnings("unchecked")
- public static void advance() {
- synchronized (GlobalWatermarkHolder.class) {
- BlockManager blockManager = SparkEnv.get().blockManager();
-
+ public static void advance(JavaSparkContext jsc) {
+ synchronized (GlobalWatermarkHolder.class){
if (sourceTimes.isEmpty()) {
return;
}
// update all sources' watermarks into the new broadcast.
- Map<Integer, SparkWatermarks> newValues = new HashMap<>();
+ Map<Integer, SparkWatermarks> newBroadcast = new HashMap<>();
for (Map.Entry<Integer, Queue<SparkWatermarks>> en: sourceTimes.entrySet()) {
if (en.getValue().isEmpty()) {
@@ -137,22 +99,8 @@ public class GlobalWatermarkHolder {
Instant currentLowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
Instant currentHighWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
Instant currentSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
- Option<BlockResult> currentOption = blockManager.getRemote(WATERMARKS_BLOCK_ID);
- Map<Integer, SparkWatermarks> current;
- if (currentOption.isDefined()) {
- current = (Map<Integer, SparkWatermarks>) currentOption.get().data().next();
- } else {
- current = Maps.newHashMap();
- blockManager.putSingle(
- WATERMARKS_BLOCK_ID,
- current,
- StorageLevel.MEMORY_ONLY(),
- true);
- }
-
- if (current.containsKey(sourceId)) {
- SparkWatermarks currentTimes = current.get(sourceId);
+ if (broadcast != null && broadcast.getValue().containsKey(sourceId)) {
+ SparkWatermarks currentTimes = broadcast.getValue().get(sourceId);
currentLowWatermark = currentTimes.getLowWatermark();
currentHighWatermark = currentTimes.getHighWatermark();
currentSynchronizedProcessingTime = currentTimes.getSynchronizedProcessingTime();
@@ -171,21 +119,20 @@ public class GlobalWatermarkHolder {
nextLowWatermark, nextHighWatermark));
checkState(nextSynchronizedProcessingTime.isAfter(currentSynchronizedProcessingTime),
"Synchronized processing time must advance.");
- newValues.put(
+ newBroadcast.put(
sourceId,
new SparkWatermarks(
nextLowWatermark, nextHighWatermark, nextSynchronizedProcessingTime));
}
// update the watermarks broadcast only if something has changed.
- if (!newValues.isEmpty()) {
- driverWatermarks = newValues;
- blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
- blockManager.putSingle(
- WATERMARKS_BLOCK_ID,
- newValues,
- StorageLevel.MEMORY_ONLY(),
- true);
+ if (!newBroadcast.isEmpty()) {
+ if (broadcast != null) {
+ // for now this is blocking, we could make this asynchronous
+ // but it could slow down WM propagation.
+ broadcast.destroy();
+ }
+ broadcast = jsc.broadcast(newBroadcast);
}
}
}
@@ -193,12 +140,7 @@ public class GlobalWatermarkHolder {
@VisibleForTesting
public static synchronized void clear() {
sourceTimes.clear();
- driverWatermarks = null;
- SparkEnv sparkEnv = SparkEnv.get();
- if (sparkEnv != null) {
- BlockManager blockManager = sparkEnv.blockManager();
- blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
- }
+ broadcast = null;
}
/**
@@ -243,24 +185,15 @@ public class GlobalWatermarkHolder {
/** Advance the WMs onBatchCompleted event. */
public static class WatermarksListener extends JavaStreamingListener {
- @Override
- public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) {
- GlobalWatermarkHolder.advance();
- }
- }
+ private final JavaStreamingContext jssc;
- private static class WatermarksLoader extends CacheLoader<String, Map<Integer, SparkWatermarks>> {
+ public WatermarksListener(JavaStreamingContext jssc) {
+ this.jssc = jssc;
+ }
- @SuppressWarnings("unchecked")
@Override
- public Map<Integer, SparkWatermarks> load(@Nonnull String key) throws Exception {
- Option<BlockResult> blockResultOption =
- SparkEnv.get().blockManager().getRemote(WATERMARKS_BLOCK_ID);
- if (blockResultOption.isDefined()) {
- return (Map<Integer, SparkWatermarks>) blockResultOption.get().data().next();
- } else {
- return Maps.newHashMap();
- }
+ public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) {
+ GlobalWatermarkHolder.advance(jssc.sparkContext());
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java
index 1708123..47a6e3f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java
@@ -65,17 +65,17 @@ public class GlobalWatermarkHolderTest {
instant.plus(Duration.millis(5)),
instant.plus(Duration.millis(5)),
instant));
- GlobalWatermarkHolder.advance();
+ GlobalWatermarkHolder.advance(jsc);
// low < high.
GlobalWatermarkHolder.add(1,
new SparkWatermarks(
instant.plus(Duration.millis(10)),
instant.plus(Duration.millis(15)),
instant.plus(Duration.millis(100))));
- GlobalWatermarkHolder.advance();
+ GlobalWatermarkHolder.advance(jsc);
// assert watermarks in Broadcast.
- SparkWatermarks currentWatermarks = GlobalWatermarkHolder.get(0L).get(1);
+ SparkWatermarks currentWatermarks = GlobalWatermarkHolder.get().getValue().get(1);
assertThat(currentWatermarks.getLowWatermark(), equalTo(instant.plus(Duration.millis(10))));
assertThat(currentWatermarks.getHighWatermark(), equalTo(instant.plus(Duration.millis(15))));
assertThat(currentWatermarks.getSynchronizedProcessingTime(),
@@ -93,7 +93,7 @@ public class GlobalWatermarkHolderTest {
instant.plus(Duration.millis(25)),
instant.plus(Duration.millis(20)),
instant.plus(Duration.millis(200))));
- GlobalWatermarkHolder.advance();
+ GlobalWatermarkHolder.advance(jsc);
}
@Test
@@ -106,7 +106,7 @@ public class GlobalWatermarkHolderTest {
instant.plus(Duration.millis(5)),
instant.plus(Duration.millis(10)),
instant));
- GlobalWatermarkHolder.advance();
+ GlobalWatermarkHolder.advance(jsc);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Synchronized processing time must advance.");
@@ -117,7 +117,7 @@ public class GlobalWatermarkHolderTest {
instant.plus(Duration.millis(5)),
instant.plus(Duration.millis(10)),
instant));
- GlobalWatermarkHolder.advance();
+ GlobalWatermarkHolder.advance(jsc);
}
@Test
@@ -136,15 +136,15 @@ public class GlobalWatermarkHolderTest {
instant.plus(Duration.millis(6)),
instant));
- GlobalWatermarkHolder.advance();
+ GlobalWatermarkHolder.advance(jsc);
// assert watermarks for source 1.
- SparkWatermarks watermarksForSource1 = GlobalWatermarkHolder.get(0L).get(1);
+ SparkWatermarks watermarksForSource1 = GlobalWatermarkHolder.get().getValue().get(1);
assertThat(watermarksForSource1.getLowWatermark(), equalTo(instant.plus(Duration.millis(5))));
assertThat(watermarksForSource1.getHighWatermark(), equalTo(instant.plus(Duration.millis(10))));
// assert watermarks for source 2.
- SparkWatermarks watermarksForSource2 = GlobalWatermarkHolder.get(0L).get(2);
+ SparkWatermarks watermarksForSource2 = GlobalWatermarkHolder.get().getValue().get(2);
assertThat(watermarksForSource2.getLowWatermark(), equalTo(instant.plus(Duration.millis(3))));
assertThat(watermarksForSource2.getHighWatermark(), equalTo(instant.plus(Duration.millis(6))));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index 246eb81..64ff98c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -52,6 +52,7 @@ import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Test;
+
/**
* Test {@link SparkRunnerDebugger} with different pipelines.
*/
@@ -84,20 +85,17 @@ public class SparkRunnerDebuggerTest {
.apply(MapElements.via(new WordCount.FormatAsTextFn()))
.apply(TextIO.write().to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt"));
- final String expectedPipeline =
- "sparkContext.parallelize(Arrays.asList(...))\n"
- + "_.mapPartitions("
- + "new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"
- + "_.mapPartitions(new org.apache.beam.sdk.transforms.Count$PerElement$1())\n"
- + "_.combineByKey(..., new org.apache.beam.sdk.transforms.Count$CountFn(), ...)\n"
- + "_.groupByKey()\n"
- + "_.map(new org.apache.beam.sdk.transforms.Sum$SumLongFn())\n"
- + "_.mapPartitions(new org.apache.beam.runners.spark"
- + ".SparkRunnerDebuggerTest$PlusOne())\n"
- + "sparkContext.union(...)\n"
- + "_.mapPartitions("
- + "new org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n"
- + "_.<org.apache.beam.sdk.io.TextIO$Write>";
+ final String expectedPipeline = "sparkContext.parallelize(Arrays.asList(...))\n"
+ + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"
+ + "_.mapPartitions(new org.apache.beam.sdk.transforms.Count$PerElement$1())\n"
+ + "_.combineByKey(..., new org.apache.beam.sdk.transforms.Count$CountFn(), ...)\n"
+ + "_.groupByKey()\n"
+ + "_.map(new org.apache.beam.sdk.transforms.Sum$SumLongFn())\n"
+ + "_.mapPartitions(new org.apache.beam.runners.spark"
+ + ".SparkRunnerDebuggerTest$PlusOne())\n"
+ + "sparkContext.union(...)\n"
+ + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n"
+ + "_.<org.apache.beam.sdk.io.AutoValue_TextIO_Write>";
SparkRunnerDebugger.DebugSparkPipelineResult result =
(SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run();