You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/12 02:28:25 UTC
[07/39] incubator-beam git commit: BEAM-261 Checkpointing for pushed
back inputs.
BEAM-261 Checkpointing for pushed back inputs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fd7f46c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fd7f46c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fd7f46c1
Branch: refs/heads/master
Commit: fd7f46c19b9c95a63b522793bb6fb8a849167cbc
Parents: 047cff4
Author: Thomas Weise <th...@apache.org>
Authored: Thu Oct 13 00:56:37 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Mon Oct 17 09:22:16 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/apex/ApexRunner.java | 3 +-
.../translators/ParDoBoundMultiTranslator.java | 10 ++-
.../apex/translators/ParDoBoundTranslator.java | 11 ++-
.../functions/ApexParDoOperator.java | 32 +++++---
.../utils/ValueAndCoderKryoSerializable.java | 81 ++++++++++++++++++++
.../translators/ParDoBoundTranslatorTest.java | 42 ++++++----
6 files changed, 149 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index e2ebc29..ad49f08 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -72,7 +72,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
private final ApexPipelineOptions options;
/**
- * TODO: this isn't thread sa
+ * TODO: this isn't thread safe and may cause issues when tests run in parallel
* Holds any most resent assertion error that was raised while processing elements.
* Used in the unit test driver in embedded to propagate the exception.
*/
@@ -89,7 +89,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
@Override
public <OutputT extends POutput, InputT extends PInput> OutputT apply(
PTransform<InputT, OutputT> transform, InputT input) {
-//System.out.println("transform: " + transform);
if (Window.Bound.class.equals(transform.getClass())) {
return (OutputT) ((PCollection) input).apply(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
index 6488bf6..9c5f2b5 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
@@ -22,8 +22,11 @@ import java.util.List;
import java.util.Map;
import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
@@ -43,10 +46,15 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements TransformTran
public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
OldDoFn<InputT, OutputT> doFn = transform.getFn();
PCollectionTuple output = context.getOutput();
+ PCollection<InputT> input = context.getInput();
List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+ Coder<InputT> inputCoder = input.getCoder();
+ WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
+ input.getWindowingStrategy().getWindowFn().windowCoder());
+
ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(context.getPipelineOptions(),
doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(),
- context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs);
+ context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs, wvInputCoder);
Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
index fa3df7c..8a7dd4b 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
@@ -21,8 +21,12 @@ package org.apache.beam.runners.apex.translators;
import java.util.List;
import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
@@ -41,10 +45,15 @@ public class ParDoBoundTranslator<InputT, OutputT> implements
public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
OldDoFn<InputT, OutputT> doFn = transform.getFn();
PCollection<OutputT> output = context.getOutput();
+ PCollection<InputT> input = context.getInput();
List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+ Coder<InputT> inputCoder = input.getCoder();
+ WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
+ input.getWindowingStrategy().getWindowFn().windowCoder());
+
ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(context.getPipelineOptions(),
doFn, new TupleTag<OutputT>(), TupleTagList.empty().getAll() /*sideOutputTags*/,
- output.getWindowingStrategy(), sideInputs);
+ output.getWindowingStrategy(), sideInputs, wvInputCoder);
context.addOperator(operator, operator.output);
context.addStream(context.getInput(), operator.input);
if (!sideInputs.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
index 995fee1..a951ca7 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
@@ -26,11 +26,14 @@ import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translators.utils.NoOpStepContext;
import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
+import org.apache.beam.runners.apex.translators.utils.ValueAndCoderKryoSerializable;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.repackaged.com.google.common.base.Throwables;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
@@ -84,8 +87,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
// TODO: not Kryo serializable, integrate codec
//@Bind(JavaSerializer.class)
private transient StateInternals<Void> sideInputStateInternals = InMemoryStateInternals.forKey(null);
- // TODO: not Kryo serializable, integrate codec
- private List<WindowedValue<InputT>> pushedBack = new ArrayList<>();
+ private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> pushedBack;
private LongMin pushedBackWatermark = new LongMin();
private long currentInputWatermark = Long.MIN_VALUE;
private long currentOutputWatermark = currentInputWatermark;
@@ -100,7 +102,8 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> sideOutputTags,
WindowingStrategy<?, ?> windowingStrategy,
- List<PCollectionView<?>> sideInputs
+ List<PCollectionView<?>> sideInputs,
+ Coder<WindowedValue<InputT>> inputCoder
)
{
this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
@@ -110,19 +113,28 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
this.windowingStrategy = windowingStrategy;
this.sideInputs = sideInputs;
- if (sideOutputTags != null && sideOutputTags.size() > sideOutputPorts.length) {
+ if (sideOutputTags.size() > sideOutputPorts.length) {
String msg = String.format("Too many side outputs (currently only supporting %s).",
sideOutputPorts.length);
throw new UnsupportedOperationException(msg);
}
+
+ Coder<List<WindowedValue<InputT>>> coder = ListCoder.of(inputCoder);
+ this.pushedBack = new ValueAndCoderKryoSerializable<>(new ArrayList<WindowedValue<InputT>>(), coder);
+
}
@SuppressWarnings("unused") // for Kryo
private ApexParDoOperator() {
- this(null, null, null, null, null, null);
+ this.pipelineOptions = null;
+ this.doFn = null;
+ this.mainOutputTag = null;
+ this.sideOutputTags = null;
+ this.windowingStrategy = null;
+ this.sideInputs = null;
+ this.pushedBack = null;
}
-
public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>()
{
@Override
@@ -137,7 +149,7 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(t.getValue());
for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
- pushedBack.add(pushedBackValue);
+ pushedBack.get().add(pushedBackValue);
}
}
}
@@ -162,16 +174,16 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
sideInputHandler.addSideInputValue(sideInput, t.getValue());
List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
- for (WindowedValue<InputT> elem : pushedBack) {
+ for (WindowedValue<InputT> elem : pushedBack.get()) {
Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(elem);
Iterables.addAll(newPushedBack, justPushedBack);
}
- pushedBack.clear();
+ pushedBack.get().clear();
pushedBackWatermark.clear();
for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
- pushedBack.add(pushedBackValue);
+ pushedBack.get().add(pushedBackValue);
}
// potentially emit watermark
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
new file mode 100644
index 0000000..2de737d
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translators.utils;
+
+import java.io.IOException;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Throwables;
+
+
+/**
+ * A {@link KryoSerializable} holder that uses the specified {@link Coder}.
+ * @param <T>
+ */
+public class ValueAndCoderKryoSerializable<T> implements KryoSerializable
+{
+ private static JavaSerializer JAVA_SERIALIZER = new JavaSerializer();
+ private T value;
+ private Coder<T> coder;
+
+ public ValueAndCoderKryoSerializable(T value, Coder<T> coder) {
+ this.value = value;
+ this.coder = coder;
+ }
+
+ @SuppressWarnings("unused") // for Kryo
+ private ValueAndCoderKryoSerializable() {
+ }
+
+ public T get() {
+ return value;
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output)
+ {
+ try {
+ kryo.writeClass(output, coder.getClass());
+ kryo.writeObject(output, coder, JAVA_SERIALIZER);
+ coder.encode(value, output, Context.OUTER);
+ } catch (IOException e) {
+ Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void read(Kryo kryo, Input input)
+ {
+ try {
+ @SuppressWarnings("unchecked")
+ Class<Coder<T>> type = kryo.readClass(input).getType();
+ coder = kryo.readObject(input, type, JAVA_SERIALIZER);
+ value = coder.decode(input, Context.OUTER);
+ } catch (IOException e) {
+ Throwables.propagate(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
index 301f6f8..b9748ee 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
@@ -18,47 +18,48 @@
package org.apache.beam.runners.apex.translators;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.ApexRunnerResult;
import org.apache.beam.runners.apex.TestApexRunner;
-import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.lib.util.KryoCloneUtils;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.regex.Pattern;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
/**
* integration test for {@link ParDoBoundTranslator}.
@@ -181,9 +182,18 @@ public class ParDoBoundTranslatorTest {
public void testSerialization() throws Exception {
ApexPipelineOptions options = PipelineOptionsFactory.create()
.as(ApexPipelineOptions.class);
+ options.setRunner(TestApexRunner.class);
+ Pipeline pipeline = Pipeline.create(options);
+ Coder<WindowedValue<Integer>> coder = WindowedValue.getValueOnlyCoder(VarIntCoder.of());
+
+ PCollectionView<Integer> singletonView = pipeline.apply(Create.of(1))
+ .apply(Sum.integersGlobally().asSingletonView());
+
ApexParDoOperator<Integer, Integer> operator = new ApexParDoOperator<>(options,
new Add(0), new TupleTag<Integer>(), TupleTagList.empty().getAll(),
- WindowingStrategy.globalDefault(), Collections.<PCollectionView<?>> emptyList());
+ WindowingStrategy.globalDefault(),
+ Collections.<PCollectionView<?>>singletonList(singletonView),
+ coder);
operator.setup(null);
operator.beginWindow(0);
WindowedValue<Integer> wv = WindowedValue.valueInGlobalWindow(0);