You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/08 23:41:43 UTC

[1/2] beam git commit: This closes #2959

Repository: beam
Updated Branches:
  refs/heads/release-2.0.0 5493c6c8f -> 336b3dc29


This closes #2959


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/336b3dc2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/336b3dc2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/336b3dc2

Branch: refs/heads/release-2.0.0
Commit: 336b3dc29b5de0f8affc71fbe9886a3e0ee2a0fe
Parents: 5493c6c 392ed60
Author: Thomas Groh <tg...@google.com>
Authored: Mon May 8 16:41:25 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 8 16:41:25 2017 -0700

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     |  54 +++++-
 .../wrappers/streaming/io/DedupingOperator.java | 187 +++++++++++++++++++
 .../streaming/io/UnboundedSourceWrapper.java    |  15 +-
 .../flink/streaming/DedupingOperatorTest.java   | 131 +++++++++++++
 .../streaming/UnboundedSourceWrapperTest.java   |  29 +--
 5 files changed, 393 insertions(+), 23 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: [BEAM-1723] deduplication of UnboundedSource in Flink runner

Posted by tg...@apache.org.
[BEAM-1723] deduplication of UnboundedSource in Flink runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/392ed601
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/392ed601
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/392ed601

Branch: refs/heads/release-2.0.0
Commit: 392ed601392dbf5ace32577c3a4dee13488cedc4
Parents: 5493c6c
Author: JingsongLi <lz...@aliyun.com>
Authored: Wed Apr 19 19:42:59 2017 +0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 8 16:41:25 2017 -0700

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     |  54 +++++-
 .../wrappers/streaming/io/DedupingOperator.java | 187 +++++++++++++++++++
 .../streaming/io/UnboundedSourceWrapper.java    |  15 +-
 .../flink/streaming/DedupingOperatorTest.java   | 131 +++++++++++++
 .../streaming/UnboundedSourceWrapperTest.java   |  29 +--
 5 files changed, 393 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/392ed601/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 615eaea..9a93205 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -44,6 +44,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDo
 import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -73,12 +74,16 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.ValueWithRecordId;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -148,20 +153,37 @@ class FlinkStreamingTransformTranslators {
         FlinkStreamingTranslationContext context) {
       PCollection<T> output = context.getOutput(transform);
 
+      DataStream<WindowedValue<T>> source;
+      DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
       TypeInformation<WindowedValue<T>> outputTypeInfo =
           context.getTypeInfo(context.getOutput(transform));
 
-      DataStream<WindowedValue<T>> source;
+      Coder<T> coder = context.getOutput(transform).getCoder();
+
+      TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
+          new CoderTypeInformation<>(WindowedValue.getFullCoder(
+              ValueWithRecordId.ValueWithRecordIdCoder.of(coder),
+              output.getWindowingStrategy().getWindowFn().windowCoder()));
+
       try {
+
         UnboundedSourceWrapper<T, ?> sourceWrapper =
             new UnboundedSourceWrapper<>(
                 context.getCurrentTransform().getFullName(),
                 context.getPipelineOptions(),
                 transform.getSource(),
                 context.getExecutionEnvironment().getParallelism());
-        source = context
+        nonDedupSource = context
             .getExecutionEnvironment()
-            .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
+            .addSource(sourceWrapper).name(transform.getName()).returns(withIdTypeInfo);
+
+        if (transform.getSource().requiresDeduping()) {
+          source = nonDedupSource.keyBy(
+              new ValueWithRecordIdKeySelector<T>())
+              .transform("debuping", outputTypeInfo, new DedupingOperator<T>());
+        } else {
+          source = nonDedupSource.flatMap(new StripIdsMap<T>());
+        }
       } catch (Exception e) {
         throw new RuntimeException(
             "Error while translating UnboundedSource: " + transform.getSource(), e);
@@ -171,6 +193,32 @@ class FlinkStreamingTransformTranslators {
     }
   }
 
+  private static class ValueWithRecordIdKeySelector<T>
+      implements KeySelector<WindowedValue<ValueWithRecordId<T>>, ByteBuffer>,
+      ResultTypeQueryable<ByteBuffer> {
+
+    @Override
+    public ByteBuffer getKey(WindowedValue<ValueWithRecordId<T>> value) throws Exception {
+      return ByteBuffer.wrap(value.getValue().getId());
+    }
+
+    @Override
+    public TypeInformation<ByteBuffer> getProducedType() {
+      return new GenericTypeInfo<>(ByteBuffer.class);
+    }
+  }
+
+  public static class StripIdsMap<T> implements
+      FlatMapFunction<WindowedValue<ValueWithRecordId<T>>, WindowedValue<T>> {
+
+    @Override
+    public void flatMap(WindowedValue<ValueWithRecordId<T>> value,
+                        Collector<WindowedValue<T>> collector) throws Exception {
+      collector.collect(value.withValue(value.getValue().getValue()));
+    }
+
+  }
+
   private static class BoundedReadSourceTranslator<T>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/392ed601/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java
new file mode 100644
index 0000000..b8b40fe
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.ValueWithRecordId;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.joda.time.Duration;
+
+/**
+ * Remove values with duplicate ids.
+ */
+public class DedupingOperator<T> extends AbstractStreamOperator<WindowedValue<T>>
+    implements OneInputStreamOperator<WindowedValue<ValueWithRecordId<T>>, WindowedValue<T>>,
+    KeyGroupCheckpointedOperator {
+
+  private static final long MAX_RETENTION_SINCE_ACCESS = Duration.standardMinutes(10L).getMillis();
+  private static final long MAX_CACHE_SIZE = 100_000L;
+
+  private transient LoadingCache<Integer, LoadingCache<ByteBuffer, AtomicBoolean>> dedupingCache;
+  private transient KeyedStateBackend<ByteBuffer> keyedStateBackend;
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+    checkInitCache();
+    keyedStateBackend = getKeyedStateBackend();
+  }
+
+  private void checkInitCache() {
+    if (dedupingCache == null) {
+      dedupingCache = CacheBuilder.newBuilder().build(new KeyGroupLoader());
+    }
+  }
+
+  private static class KeyGroupLoader extends
+      CacheLoader<Integer, LoadingCache<ByteBuffer, AtomicBoolean>> {
+    @Override
+    public LoadingCache<ByteBuffer, AtomicBoolean> load(Integer ignore) throws Exception {
+      return CacheBuilder.newBuilder()
+          .expireAfterAccess(MAX_RETENTION_SINCE_ACCESS, TimeUnit.MILLISECONDS)
+          .maximumSize(MAX_CACHE_SIZE).build(new TrueBooleanLoader());
+    }
+  }
+
+  private static class TrueBooleanLoader extends CacheLoader<ByteBuffer, AtomicBoolean> {
+    @Override
+    public AtomicBoolean load(ByteBuffer ignore) throws Exception {
+      return new AtomicBoolean(true);
+    }
+  }
+
+  @Override
+  public void processElement(
+      StreamRecord<WindowedValue<ValueWithRecordId<T>>> streamRecord) throws Exception {
+    ByteBuffer currentKey = keyedStateBackend.getCurrentKey();
+    int groupIndex = keyedStateBackend.getCurrentKeyGroupIndex();
+    if (shouldOutput(groupIndex, currentKey)) {
+      WindowedValue<ValueWithRecordId<T>> value = streamRecord.getValue();
+      output.collect(streamRecord.replace(value.withValue(value.getValue().getValue())));
+    }
+  }
+
+  private boolean shouldOutput(int groupIndex, ByteBuffer id) throws ExecutionException {
+    return dedupingCache.get(groupIndex).getUnchecked(id).getAndSet(false);
+  }
+
+  @Override
+  public void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) throws Exception {
+    checkInitCache();
+    Integer size = VarIntCoder.of().decode(in, Context.NESTED);
+    for (int i = 0; i < size; i++) {
+      byte[] idBytes = ByteArrayCoder.of().decode(in, Context.NESTED);
+      // restore the ids which not expired.
+      shouldOutput(keyGroupIndex, ByteBuffer.wrap(idBytes));
+    }
+  }
+
+  @Override
+  public void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws Exception {
+    Set<ByteBuffer> ids = dedupingCache.get(keyGroupIndex).asMap().keySet();
+    VarIntCoder.of().encode(ids.size(), out, Context.NESTED);
+    for (ByteBuffer id : ids) {
+      ByteArrayCoder.of().encode(id.array(), out, Context.NESTED);
+    }
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    // copy from AbstractStreamOperator
+    if (getKeyedStateBackend() != null) {
+      KeyedStateCheckpointOutputStream out;
+
+      try {
+        out = context.getRawKeyedOperatorStateOutput();
+      } catch (Exception exception) {
+        throw new Exception("Could not open raw keyed operator state stream for "
+            + getOperatorName() + '.', exception);
+      }
+
+      try {
+        KeyGroupsList allKeyGroups = out.getKeyGroupList();
+        for (int keyGroupIdx : allKeyGroups) {
+          out.startNewKeyGroup(keyGroupIdx);
+
+          DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out);
+
+          // if (this instanceof KeyGroupCheckpointedOperator)
+          snapshotKeyGroupState(keyGroupIdx, dov);
+
+        }
+      } catch (Exception exception) {
+        throw new Exception("Could not write timer service of " + getOperatorName()
+            + " to checkpoint state stream.", exception);
+      } finally {
+        try {
+          out.close();
+        } catch (Exception closeException) {
+          LOG.warn("Could not close raw keyed operator state stream for {}. This "
+                  + "might have prevented deleting some state data.", getOperatorName(),
+              closeException);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws Exception {
+    if (getKeyedStateBackend() != null) {
+      KeyGroupsList localKeyGroupRange = getKeyedStateBackend().getKeyGroupRange();
+
+      for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) {
+        DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(streamProvider.getStream());
+
+        int keyGroupIdx = streamProvider.getKeyGroupId();
+        checkArgument(localKeyGroupRange.contains(keyGroupIdx),
+            "Key Group " + keyGroupIdx + " does not belong to the local range.");
+
+        // if (this instanceof KeyGroupRestoringOperator)
+        restoreKeyGroupState(keyGroupIdx, div);
+
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/392ed601/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index ee20fd5..a731e2b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.ValueWithRecordId;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.common.state.ListState;
@@ -60,7 +61,7 @@ import org.slf4j.LoggerFactory;
  */
 public class UnboundedSourceWrapper<
     OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
-    extends RichParallelSourceFunction<WindowedValue<OutputT>>
+    extends RichParallelSourceFunction<WindowedValue<ValueWithRecordId<OutputT>>>
     implements ProcessingTimeCallback, StoppableFunction,
     CheckpointListener, CheckpointedFunction {
 
@@ -113,7 +114,7 @@ public class UnboundedSourceWrapper<
    * Make it a field so that we can access it in {@link #onProcessingTime(long)} for emitting
    * watermarks.
    */
-  private transient SourceContext<WindowedValue<OutputT>> context;
+  private transient SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> context;
 
   /**
    * Pending checkpoints which have not been acknowledged yet.
@@ -210,7 +211,7 @@ public class UnboundedSourceWrapper<
   }
 
   @Override
-  public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
+  public void run(SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> ctx) throws Exception {
 
     context = ctx;
 
@@ -306,17 +307,19 @@ public class UnboundedSourceWrapper<
    * Emit the current element from the given Reader. The reader is guaranteed to have data.
    */
   private void emitElement(
-      SourceContext<WindowedValue<OutputT>> ctx,
+      SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> ctx,
       UnboundedSource.UnboundedReader<OutputT> reader) {
     // make sure that reader state update and element emission are atomic
     // with respect to snapshots
     synchronized (ctx.getCheckpointLock()) {
 
       OutputT item = reader.getCurrent();
+      byte[] recordId = reader.getCurrentRecordId();
       Instant timestamp = reader.getCurrentTimestamp();
 
-      WindowedValue<OutputT> windowedValue =
-          WindowedValue.of(item, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+      WindowedValue<ValueWithRecordId<OutputT>> windowedValue =
+          WindowedValue.of(new ValueWithRecordId<>(item, recordId), timestamp,
+              GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
       ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/392ed601/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
new file mode 100644
index 0000000..81efa34
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.ValueWithRecordId;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link DedupingOperator}.
+ */
+@RunWith(JUnit4.class)
+public class DedupingOperatorTest {
+
+  @Test
+  public void testDeduping() throws Exception {
+
+    KeyedOneInputStreamOperatorTestHarness<
+        ByteBuffer,
+        WindowedValue<ValueWithRecordId<String>>,
+        WindowedValue<String>> harness = getDebupingHarness();
+
+    harness.open();
+
+    String key1 = "key1";
+    String key2 = "key2";
+
+    harness.processElement(new StreamRecord<>(
+        WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key1, key1.getBytes()))));
+
+    harness.processElement(new StreamRecord<>(
+        WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key2, key2.getBytes()))));
+
+    harness.processElement(new StreamRecord<>(
+        WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key1, key1.getBytes()))));
+
+    assertThat(
+        this.<String>stripStreamRecordFromWindowedValue(harness.getOutput()),
+        contains(WindowedValue.valueInGlobalWindow(key1),
+            WindowedValue.valueInGlobalWindow(key2)));
+
+    OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+
+    harness.close();
+
+    harness = getDebupingHarness();
+    harness.setup();
+    harness.initializeState(snapshot);
+    harness.open();
+
+    String key3 = "key3";
+
+    harness.processElement(new StreamRecord<>(
+        WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key2, key2.getBytes()))));
+
+    harness.processElement(new StreamRecord<>(
+        WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key3, key3.getBytes()))));
+
+    assertThat(
+        this.<String>stripStreamRecordFromWindowedValue(harness.getOutput()),
+        contains(WindowedValue.valueInGlobalWindow(key3)));
+
+    harness.close();
+  }
+
+  private KeyedOneInputStreamOperatorTestHarness<ByteBuffer,
+      WindowedValue<ValueWithRecordId<String>>,
+      WindowedValue<String>> getDebupingHarness() throws Exception {
+    DedupingOperator<String> operator = new DedupingOperator<>();
+
+    return new KeyedOneInputStreamOperatorTestHarness<>(operator,
+        new KeySelector<WindowedValue<ValueWithRecordId<String>>, ByteBuffer>() {
+      @Override
+      public ByteBuffer getKey(WindowedValue<ValueWithRecordId<String>> value) throws Exception {
+        return ByteBuffer.wrap(value.getValue().getId());
+      }
+    }, TypeInformation.of(ByteBuffer.class));
+  }
+
+  private <T> Iterable<WindowedValue<T>> stripStreamRecordFromWindowedValue(
+      Iterable<Object> input) {
+
+    return FluentIterable.from(input).filter(new Predicate<Object>() {
+      @Override
+      public boolean apply(@Nullable Object o) {
+        return o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof WindowedValue;
+      }
+    }).transform(new Function<Object, WindowedValue<T>>() {
+      @Nullable
+      @Override
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      public WindowedValue<T> apply(@Nullable Object o) {
+        if (o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof WindowedValue) {
+          return (WindowedValue) ((StreamRecord) o).getValue();
+        }
+        throw new RuntimeException("unreachable");
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/392ed601/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 0cb528a..500fa66 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.ValueWithRecordId;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.state.ListState;
@@ -116,7 +117,7 @@ public class UnboundedSourceWrapperTest {
       assertEquals(numSplits, flinkWrapper.getSplitSources().size());
 
       StreamSource<WindowedValue<
-          KV<Integer, Integer>>,
+          ValueWithRecordId<KV<Integer, Integer>>>,
           UnboundedSourceWrapper<
               KV<Integer, Integer>,
               TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
@@ -126,7 +127,7 @@ public class UnboundedSourceWrapperTest {
       try {
         sourceOperator.open();
         sourceOperator.run(checkpointLock,
-            new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+            new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
               private int count = 0;
 
               @Override
@@ -138,8 +139,8 @@ public class UnboundedSourceWrapperTest {
               }
 
               @Override
-              public void collect(
-                  StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
+              public void collect(StreamRecord<WindowedValue<
+                  ValueWithRecordId<KV<Integer, Integer>>>> windowedValueStreamRecord) {
 
                 count++;
                 if (count >= numElements) {
@@ -184,7 +185,7 @@ public class UnboundedSourceWrapperTest {
       assertEquals(numSplits, flinkWrapper.getSplitSources().size());
 
       StreamSource<
-          WindowedValue<KV<Integer, Integer>>,
+          WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
           UnboundedSourceWrapper<
               KV<Integer, Integer>,
               TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
@@ -214,7 +215,7 @@ public class UnboundedSourceWrapperTest {
       try {
         sourceOperator.open();
         sourceOperator.run(checkpointLock,
-            new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+            new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
               private int count = 0;
 
               @Override
@@ -226,10 +227,10 @@ public class UnboundedSourceWrapperTest {
               }
 
               @Override
-              public void collect(
-                  StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
+              public void collect(StreamRecord<WindowedValue<
+                  ValueWithRecordId<KV<Integer, Integer>>>> windowedValueStreamRecord) {
 
-                emittedElements.add(windowedValueStreamRecord.getValue().getValue());
+                emittedElements.add(windowedValueStreamRecord.getValue().getValue().getValue());
                 count++;
                 if (count >= numElements / 2) {
                   throw new SuccessException();
@@ -275,7 +276,7 @@ public class UnboundedSourceWrapperTest {
       assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size());
 
       StreamSource<
-          WindowedValue<KV<Integer, Integer>>,
+          WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
           UnboundedSourceWrapper<
               KV<Integer, Integer>,
               TestCountingSource.CounterMark>> restoredSourceOperator =
@@ -292,7 +293,7 @@ public class UnboundedSourceWrapperTest {
       try {
         restoredSourceOperator.open();
         restoredSourceOperator.run(checkpointLock,
-            new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+            new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
               private int count = 0;
 
               @Override
@@ -304,9 +305,9 @@ public class UnboundedSourceWrapperTest {
               }
 
               @Override
-              public void collect(
-                  StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
-                emittedElements.add(windowedValueStreamRecord.getValue().getValue());
+              public void collect(StreamRecord<WindowedValue<
+                  ValueWithRecordId<KV<Integer, Integer>>>> windowedValueStreamRecord) {
+                emittedElements.add(windowedValueStreamRecord.getValue().getValue().getValue());
                 count++;
                 if (count >= numElements / 2) {
                   throw new SuccessException();