You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/12 02:28:25 UTC

[07/39] incubator-beam git commit: BEAM-261 Checkpointing for pushed back inputs.

BEAM-261 Checkpointing for pushed back inputs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fd7f46c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fd7f46c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fd7f46c1

Branch: refs/heads/master
Commit: fd7f46c19b9c95a63b522793bb6fb8a849167cbc
Parents: 047cff4
Author: Thomas Weise <th...@apache.org>
Authored: Thu Oct 13 00:56:37 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Mon Oct 17 09:22:16 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    |  3 +-
 .../translators/ParDoBoundMultiTranslator.java  | 10 ++-
 .../apex/translators/ParDoBoundTranslator.java  | 11 ++-
 .../functions/ApexParDoOperator.java            | 32 +++++---
 .../utils/ValueAndCoderKryoSerializable.java    | 81 ++++++++++++++++++++
 .../translators/ParDoBoundTranslatorTest.java   | 42 ++++++----
 6 files changed, 149 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index e2ebc29..ad49f08 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -72,7 +72,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
   private final ApexPipelineOptions options;
 
   /**
-   * TODO: this isn't thread sa
+   * TODO: this isn't thread safe and may cause issues when tests run in parallel
    * Holds any most resent assertion error that was raised while processing elements.
    * Used in the unit test driver in embedded to propagate the exception.
    */
@@ -89,7 +89,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
   @Override
   public <OutputT extends POutput, InputT extends PInput> OutputT apply(
       PTransform<InputT, OutputT> transform, InputT input) {
-//System.out.println("transform: " + transform);
 
     if (Window.Bound.class.equals(transform.getClass())) {
       return (OutputT) ((PCollection) input).apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
index 6488bf6..9c5f2b5 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java
@@ -22,8 +22,11 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -43,10 +46,15 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements TransformTran
   public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
     OldDoFn<InputT, OutputT> doFn = transform.getFn();
     PCollectionTuple output = context.getOutput();
+    PCollection<InputT> input = context.getInput();
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+    Coder<InputT> inputCoder = input.getCoder();
+    WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
+        input.getWindowingStrategy().getWindowFn().windowCoder());
+
     ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(context.getPipelineOptions(),
         doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(),
-        context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs);
+        context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs, wvInputCoder);
 
     Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
     Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
index fa3df7c..8a7dd4b 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
@@ -21,8 +21,12 @@ package org.apache.beam.runners.apex.translators;
 import java.util.List;
 
 import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -41,10 +45,15 @@ public class ParDoBoundTranslator<InputT, OutputT> implements
   public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
     OldDoFn<InputT, OutputT> doFn = transform.getFn();
     PCollection<OutputT> output = context.getOutput();
+    PCollection<InputT> input = context.getInput();
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+    Coder<InputT> inputCoder = input.getCoder();
+    WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
+        input.getWindowingStrategy().getWindowFn().windowCoder());
+
     ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(context.getPipelineOptions(),
         doFn, new TupleTag<OutputT>(), TupleTagList.empty().getAll() /*sideOutputTags*/,
-        output.getWindowingStrategy(), sideInputs);
+        output.getWindowingStrategy(), sideInputs, wvInputCoder);
     context.addOperator(operator, operator.output);
     context.addStream(context.getInput(), operator.input);
     if (!sideInputs.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
index 995fee1..a951ca7 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
@@ -26,11 +26,14 @@ import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translators.utils.NoOpStepContext;
 import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
+import org.apache.beam.runners.apex.translators.utils.ValueAndCoderKryoSerializable;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.repackaged.com.google.common.base.Throwables;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
@@ -84,8 +87,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
 // TODO: not Kryo serializable, integrate codec
 //@Bind(JavaSerializer.class)
 private transient StateInternals<Void> sideInputStateInternals = InMemoryStateInternals.forKey(null);
-  // TODO: not Kryo serializable, integrate codec
-  private List<WindowedValue<InputT>> pushedBack = new ArrayList<>();
+  private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> pushedBack;
   private LongMin pushedBackWatermark = new LongMin();
   private long currentInputWatermark = Long.MIN_VALUE;
   private long currentOutputWatermark = currentInputWatermark;
@@ -100,7 +102,8 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
       WindowingStrategy<?, ?> windowingStrategy,
-      List<PCollectionView<?>> sideInputs
+      List<PCollectionView<?>> sideInputs,
+      Coder<WindowedValue<InputT>> inputCoder
       )
   {
     this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
@@ -110,19 +113,28 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
 
-    if (sideOutputTags != null && sideOutputTags.size() > sideOutputPorts.length) {
+    if (sideOutputTags.size() > sideOutputPorts.length) {
       String msg = String.format("Too many side outputs (currently only supporting %s).",
           sideOutputPorts.length);
       throw new UnsupportedOperationException(msg);
     }
+
+    Coder<List<WindowedValue<InputT>>> coder = ListCoder.of(inputCoder);
+    this.pushedBack = new ValueAndCoderKryoSerializable<>(new ArrayList<WindowedValue<InputT>>(), coder);
+
   }
 
   @SuppressWarnings("unused") // for Kryo
   private ApexParDoOperator() {
-    this(null, null, null, null, null, null);
+    this.pipelineOptions = null;
+    this.doFn = null;
+    this.mainOutputTag = null;
+    this.sideOutputTags = null;
+    this.windowingStrategy = null;
+    this.sideInputs = null;
+    this.pushedBack = null;
   }
 
-
   public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>()
   {
     @Override
@@ -137,7 +149,7 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
         Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(t.getValue());
         for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
           pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
-          pushedBack.add(pushedBackValue);
+          pushedBack.get().add(pushedBackValue);
         }
       }
     }
@@ -162,16 +174,16 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn
       sideInputHandler.addSideInputValue(sideInput, t.getValue());
 
       List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
-      for (WindowedValue<InputT> elem : pushedBack) {
+      for (WindowedValue<InputT> elem : pushedBack.get()) {
         Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(elem);
         Iterables.addAll(newPushedBack, justPushedBack);
       }
 
-      pushedBack.clear();
+      pushedBack.get().clear();
       pushedBackWatermark.clear();
       for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
         pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
-        pushedBack.add(pushedBackValue);
+        pushedBack.get().add(pushedBackValue);
       }
 
       // potentially emit watermark

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
new file mode 100644
index 0000000..2de737d
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ValueAndCoderKryoSerializable.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translators.utils;
+
+import java.io.IOException;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Throwables;
+
+
+/**
+ * A {@link KryoSerializable} holder that uses the specified {@link Coder}.
+ * @param <T>
+ */
+public class ValueAndCoderKryoSerializable<T> implements KryoSerializable
+{
+  private static JavaSerializer JAVA_SERIALIZER = new JavaSerializer();
+  private T value;
+  private Coder<T> coder;
+
+  public ValueAndCoderKryoSerializable(T value, Coder<T> coder) {
+    this.value = value;
+    this.coder = coder;
+  }
+
+  @SuppressWarnings("unused") // for Kryo
+  private ValueAndCoderKryoSerializable() {
+  }
+
+  public T get() {
+    return value;
+  }
+
+  @Override
+  public void write(Kryo kryo, Output output)
+  {
+    try {
+      kryo.writeClass(output, coder.getClass());
+      kryo.writeObject(output, coder, JAVA_SERIALIZER);
+      coder.encode(value, output, Context.OUTER);
+    } catch (IOException e) {
+      Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public void read(Kryo kryo, Input input)
+  {
+    try {
+      @SuppressWarnings("unchecked")
+      Class<Coder<T>> type = kryo.readClass(input).getType();
+      coder = kryo.readObject(input, type, JAVA_SERIALIZER);
+      value = coder.decode(input, Context.OUTER);
+    } catch (IOException e) {
+      Throwables.propagate(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd7f46c1/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
index 301f6f8..b9748ee 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
@@ -18,47 +18,48 @@
 
 package org.apache.beam.runners.apex.translators;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
 import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.ApexRunnerResult;
 import org.apache.beam.runners.apex.TestApexRunner;
-import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
 import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
 import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.lib.util.KryoCloneUtils;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.regex.Pattern;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * integration test for {@link ParDoBoundTranslator}.
@@ -181,9 +182,18 @@ public class ParDoBoundTranslatorTest {
   public void testSerialization() throws Exception {
     ApexPipelineOptions options = PipelineOptionsFactory.create()
         .as(ApexPipelineOptions.class);
+    options.setRunner(TestApexRunner.class);
+    Pipeline pipeline = Pipeline.create(options);
+    Coder<WindowedValue<Integer>> coder = WindowedValue.getValueOnlyCoder(VarIntCoder.of());
+
+    PCollectionView<Integer> singletonView = pipeline.apply(Create.of(1))
+            .apply(Sum.integersGlobally().asSingletonView());
+
     ApexParDoOperator<Integer, Integer> operator = new ApexParDoOperator<>(options,
         new Add(0), new TupleTag<Integer>(), TupleTagList.empty().getAll(),
-        WindowingStrategy.globalDefault(), Collections.<PCollectionView<?>> emptyList());
+        WindowingStrategy.globalDefault(),
+        Collections.<PCollectionView<?>>singletonList(singletonView),
+        coder);
     operator.setup(null);
     operator.beginWindow(0);
     WindowedValue<Integer> wv = WindowedValue.valueInGlobalWindow(0);