You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/08 23:41:44 UTC
[2/2] beam git commit: [BEAM-1723] deduplication of UnboundedSource
in Flink runner
[BEAM-1723] deduplication of UnboundedSource in Flink runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/392ed601
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/392ed601
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/392ed601
Branch: refs/heads/release-2.0.0
Commit: 392ed601392dbf5ace32577c3a4dee13488cedc4
Parents: 5493c6c
Author: JingsongLi <lz...@aliyun.com>
Authored: Wed Apr 19 19:42:59 2017 +0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 8 16:41:25 2017 -0700
----------------------------------------------------------------------
.../FlinkStreamingTransformTranslators.java | 54 +++++-
.../wrappers/streaming/io/DedupingOperator.java | 187 +++++++++++++++++++
.../streaming/io/UnboundedSourceWrapper.java | 15 +-
.../flink/streaming/DedupingOperatorTest.java | 131 +++++++++++++
.../streaming/UnboundedSourceWrapperTest.java | 29 +--
5 files changed, 393 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/392ed601/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 615eaea..9a93205 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -44,6 +44,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDo
import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -73,12 +74,16 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -148,20 +153,37 @@ class FlinkStreamingTransformTranslators {
FlinkStreamingTranslationContext context) {
PCollection<T> output = context.getOutput(transform);
+ DataStream<WindowedValue<T>> source;
+ DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
TypeInformation<WindowedValue<T>> outputTypeInfo =
context.getTypeInfo(context.getOutput(transform));
- DataStream<WindowedValue<T>> source;
+ Coder<T> coder = context.getOutput(transform).getCoder();
+
+ TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
+ new CoderTypeInformation<>(WindowedValue.getFullCoder(
+ ValueWithRecordId.ValueWithRecordIdCoder.of(coder),
+ output.getWindowingStrategy().getWindowFn().windowCoder()));
+
try {
+
UnboundedSourceWrapper<T, ?> sourceWrapper =
new UnboundedSourceWrapper<>(
context.getCurrentTransform().getFullName(),
context.getPipelineOptions(),
transform.getSource(),
context.getExecutionEnvironment().getParallelism());
- source = context
+ nonDedupSource = context
.getExecutionEnvironment()
- .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
+ .addSource(sourceWrapper).name(transform.getName()).returns(withIdTypeInfo);
+
+ if (transform.getSource().requiresDeduping()) {
+ source = nonDedupSource.keyBy(
+ new ValueWithRecordIdKeySelector<T>())
+ .transform("debuping", outputTypeInfo, new DedupingOperator<T>());
+ } else {
+ source = nonDedupSource.flatMap(new StripIdsMap<T>());
+ }
} catch (Exception e) {
throw new RuntimeException(
"Error while translating UnboundedSource: " + transform.getSource(), e);
@@ -171,6 +193,32 @@ class FlinkStreamingTransformTranslators {
}
}
+ private static class ValueWithRecordIdKeySelector<T>
+ implements KeySelector<WindowedValue<ValueWithRecordId<T>>, ByteBuffer>,
+ ResultTypeQueryable<ByteBuffer> {
+
+ @Override
+ public ByteBuffer getKey(WindowedValue<ValueWithRecordId<T>> value) throws Exception {
+ return ByteBuffer.wrap(value.getValue().getId());
+ }
+
+ @Override
+ public TypeInformation<ByteBuffer> getProducedType() {
+ return new GenericTypeInfo<>(ByteBuffer.class);
+ }
+ }
+
+ public static class StripIdsMap<T> implements
+ FlatMapFunction<WindowedValue<ValueWithRecordId<T>>, WindowedValue<T>> {
+
+ @Override
+ public void flatMap(WindowedValue<ValueWithRecordId<T>> value,
+ Collector<WindowedValue<T>> collector) throws Exception {
+ collector.collect(value.withValue(value.getValue().getValue()));
+ }
+
+ }
+
private static class BoundedReadSourceTranslator<T>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> {
http://git-wip-us.apache.org/repos/asf/beam/blob/392ed601/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java
new file mode 100644
index 0000000..b8b40fe
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.ValueWithRecordId;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.joda.time.Duration;
+
+/**
+ * Remove values with duplicate ids.
+ */
+public class DedupingOperator<T> extends AbstractStreamOperator<WindowedValue<T>>
+ implements OneInputStreamOperator<WindowedValue<ValueWithRecordId<T>>, WindowedValue<T>>,
+ KeyGroupCheckpointedOperator {
+
+ private static final long MAX_RETENTION_SINCE_ACCESS = Duration.standardMinutes(10L).getMillis();
+ private static final long MAX_CACHE_SIZE = 100_000L;
+
+ private transient LoadingCache<Integer, LoadingCache<ByteBuffer, AtomicBoolean>> dedupingCache;
+ private transient KeyedStateBackend<ByteBuffer> keyedStateBackend;
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ checkInitCache();
+ keyedStateBackend = getKeyedStateBackend();
+ }
+
+ private void checkInitCache() {
+ if (dedupingCache == null) {
+ dedupingCache = CacheBuilder.newBuilder().build(new KeyGroupLoader());
+ }
+ }
+
+ private static class KeyGroupLoader extends
+ CacheLoader<Integer, LoadingCache<ByteBuffer, AtomicBoolean>> {
+ @Override
+ public LoadingCache<ByteBuffer, AtomicBoolean> load(Integer ignore) throws Exception {
+ return CacheBuilder.newBuilder()
+ .expireAfterAccess(MAX_RETENTION_SINCE_ACCESS, TimeUnit.MILLISECONDS)
+ .maximumSize(MAX_CACHE_SIZE).build(new TrueBooleanLoader());
+ }
+ }
+
+ private static class TrueBooleanLoader extends CacheLoader<ByteBuffer, AtomicBoolean> {
+ @Override
+ public AtomicBoolean load(ByteBuffer ignore) throws Exception {
+ return new AtomicBoolean(true);
+ }
+ }
+
+ @Override
+ public void processElement(
+ StreamRecord<WindowedValue<ValueWithRecordId<T>>> streamRecord) throws Exception {
+ ByteBuffer currentKey = keyedStateBackend.getCurrentKey();
+ int groupIndex = keyedStateBackend.getCurrentKeyGroupIndex();
+ if (shouldOutput(groupIndex, currentKey)) {
+ WindowedValue<ValueWithRecordId<T>> value = streamRecord.getValue();
+ output.collect(streamRecord.replace(value.withValue(value.getValue().getValue())));
+ }
+ }
+
+ private boolean shouldOutput(int groupIndex, ByteBuffer id) throws ExecutionException {
+ return dedupingCache.get(groupIndex).getUnchecked(id).getAndSet(false);
+ }
+
+ @Override
+ public void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) throws Exception {
+ checkInitCache();
+ Integer size = VarIntCoder.of().decode(in, Context.NESTED);
+ for (int i = 0; i < size; i++) {
+ byte[] idBytes = ByteArrayCoder.of().decode(in, Context.NESTED);
+ // restore the ids which not expired.
+ shouldOutput(keyGroupIndex, ByteBuffer.wrap(idBytes));
+ }
+ }
+
+ @Override
+ public void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws Exception {
+ Set<ByteBuffer> ids = dedupingCache.get(keyGroupIndex).asMap().keySet();
+ VarIntCoder.of().encode(ids.size(), out, Context.NESTED);
+ for (ByteBuffer id : ids) {
+ ByteArrayCoder.of().encode(id.array(), out, Context.NESTED);
+ }
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ // copy from AbstractStreamOperator
+ if (getKeyedStateBackend() != null) {
+ KeyedStateCheckpointOutputStream out;
+
+ try {
+ out = context.getRawKeyedOperatorStateOutput();
+ } catch (Exception exception) {
+ throw new Exception("Could not open raw keyed operator state stream for "
+ + getOperatorName() + '.', exception);
+ }
+
+ try {
+ KeyGroupsList allKeyGroups = out.getKeyGroupList();
+ for (int keyGroupIdx : allKeyGroups) {
+ out.startNewKeyGroup(keyGroupIdx);
+
+ DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out);
+
+ // if (this instanceof KeyGroupCheckpointedOperator)
+ snapshotKeyGroupState(keyGroupIdx, dov);
+
+ }
+ } catch (Exception exception) {
+ throw new Exception("Could not write timer service of " + getOperatorName()
+ + " to checkpoint state stream.", exception);
+ } finally {
+ try {
+ out.close();
+ } catch (Exception closeException) {
+ LOG.warn("Could not close raw keyed operator state stream for {}. This "
+ + "might have prevented deleting some state data.", getOperatorName(),
+ closeException);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws Exception {
+ if (getKeyedStateBackend() != null) {
+ KeyGroupsList localKeyGroupRange = getKeyedStateBackend().getKeyGroupRange();
+
+ for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) {
+ DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(streamProvider.getStream());
+
+ int keyGroupIdx = streamProvider.getKeyGroupId();
+ checkArgument(localKeyGroupRange.contains(keyGroupIdx),
+ "Key Group " + keyGroupIdx + " does not belong to the local range.");
+
+ // if (this instanceof KeyGroupRestoringOperator)
+ restoreKeyGroupState(keyGroupIdx, div);
+
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/392ed601/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index ee20fd5..a731e2b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.common.state.ListState;
@@ -60,7 +61,7 @@ import org.slf4j.LoggerFactory;
*/
public class UnboundedSourceWrapper<
OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
- extends RichParallelSourceFunction<WindowedValue<OutputT>>
+ extends RichParallelSourceFunction<WindowedValue<ValueWithRecordId<OutputT>>>
implements ProcessingTimeCallback, StoppableFunction,
CheckpointListener, CheckpointedFunction {
@@ -113,7 +114,7 @@ public class UnboundedSourceWrapper<
* Make it a field so that we can access it in {@link #onProcessingTime(long)} for emitting
* watermarks.
*/
- private transient SourceContext<WindowedValue<OutputT>> context;
+ private transient SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> context;
/**
* Pending checkpoints which have not been acknowledged yet.
@@ -210,7 +211,7 @@ public class UnboundedSourceWrapper<
}
@Override
- public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
+ public void run(SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> ctx) throws Exception {
context = ctx;
@@ -306,17 +307,19 @@ public class UnboundedSourceWrapper<
* Emit the current element from the given Reader. The reader is guaranteed to have data.
*/
private void emitElement(
- SourceContext<WindowedValue<OutputT>> ctx,
+ SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> ctx,
UnboundedSource.UnboundedReader<OutputT> reader) {
// make sure that reader state update and element emission are atomic
// with respect to snapshots
synchronized (ctx.getCheckpointLock()) {
OutputT item = reader.getCurrent();
+ byte[] recordId = reader.getCurrentRecordId();
Instant timestamp = reader.getCurrentTimestamp();
- WindowedValue<OutputT> windowedValue =
- WindowedValue.of(item, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+ WindowedValue<ValueWithRecordId<OutputT>> windowedValue =
+ WindowedValue.of(new ValueWithRecordId<>(item, recordId), timestamp,
+ GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/392ed601/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
new file mode 100644
index 0000000..81efa34
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.ValueWithRecordId;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link DedupingOperator}.
+ */
+@RunWith(JUnit4.class)
+public class DedupingOperatorTest {
+
+ @Test
+ public void testDeduping() throws Exception {
+
+ KeyedOneInputStreamOperatorTestHarness<
+ ByteBuffer,
+ WindowedValue<ValueWithRecordId<String>>,
+ WindowedValue<String>> harness = getDebupingHarness();
+
+ harness.open();
+
+ String key1 = "key1";
+ String key2 = "key2";
+
+ harness.processElement(new StreamRecord<>(
+ WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key1, key1.getBytes()))));
+
+ harness.processElement(new StreamRecord<>(
+ WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key2, key2.getBytes()))));
+
+ harness.processElement(new StreamRecord<>(
+ WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key1, key1.getBytes()))));
+
+ assertThat(
+ this.<String>stripStreamRecordFromWindowedValue(harness.getOutput()),
+ contains(WindowedValue.valueInGlobalWindow(key1),
+ WindowedValue.valueInGlobalWindow(key2)));
+
+ OperatorStateHandles snapshot = harness.snapshot(0L, 0L);
+
+ harness.close();
+
+ harness = getDebupingHarness();
+ harness.setup();
+ harness.initializeState(snapshot);
+ harness.open();
+
+ String key3 = "key3";
+
+ harness.processElement(new StreamRecord<>(
+ WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key2, key2.getBytes()))));
+
+ harness.processElement(new StreamRecord<>(
+ WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key3, key3.getBytes()))));
+
+ assertThat(
+ this.<String>stripStreamRecordFromWindowedValue(harness.getOutput()),
+ contains(WindowedValue.valueInGlobalWindow(key3)));
+
+ harness.close();
+ }
+
+ private KeyedOneInputStreamOperatorTestHarness<ByteBuffer,
+ WindowedValue<ValueWithRecordId<String>>,
+ WindowedValue<String>> getDebupingHarness() throws Exception {
+ DedupingOperator<String> operator = new DedupingOperator<>();
+
+ return new KeyedOneInputStreamOperatorTestHarness<>(operator,
+ new KeySelector<WindowedValue<ValueWithRecordId<String>>, ByteBuffer>() {
+ @Override
+ public ByteBuffer getKey(WindowedValue<ValueWithRecordId<String>> value) throws Exception {
+ return ByteBuffer.wrap(value.getValue().getId());
+ }
+ }, TypeInformation.of(ByteBuffer.class));
+ }
+
+ private <T> Iterable<WindowedValue<T>> stripStreamRecordFromWindowedValue(
+ Iterable<Object> input) {
+
+ return FluentIterable.from(input).filter(new Predicate<Object>() {
+ @Override
+ public boolean apply(@Nullable Object o) {
+ return o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof WindowedValue;
+ }
+ }).transform(new Function<Object, WindowedValue<T>>() {
+ @Nullable
+ @Override
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public WindowedValue<T> apply(@Nullable Object o) {
+ if (o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof WindowedValue) {
+ return (WindowedValue) ((StreamRecord) o).getValue();
+ }
+ throw new RuntimeException("unreachable");
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/392ed601/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 0cb528a..500fa66 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.state.ListState;
@@ -116,7 +117,7 @@ public class UnboundedSourceWrapperTest {
assertEquals(numSplits, flinkWrapper.getSplitSources().size());
StreamSource<WindowedValue<
- KV<Integer, Integer>>,
+ ValueWithRecordId<KV<Integer, Integer>>>,
UnboundedSourceWrapper<
KV<Integer, Integer>,
TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
@@ -126,7 +127,7 @@ public class UnboundedSourceWrapperTest {
try {
sourceOperator.open();
sourceOperator.run(checkpointLock,
- new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+ new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
private int count = 0;
@Override
@@ -138,8 +139,8 @@ public class UnboundedSourceWrapperTest {
}
@Override
- public void collect(
- StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
+ public void collect(StreamRecord<WindowedValue<
+ ValueWithRecordId<KV<Integer, Integer>>>> windowedValueStreamRecord) {
count++;
if (count >= numElements) {
@@ -184,7 +185,7 @@ public class UnboundedSourceWrapperTest {
assertEquals(numSplits, flinkWrapper.getSplitSources().size());
StreamSource<
- WindowedValue<KV<Integer, Integer>>,
+ WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
UnboundedSourceWrapper<
KV<Integer, Integer>,
TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
@@ -214,7 +215,7 @@ public class UnboundedSourceWrapperTest {
try {
sourceOperator.open();
sourceOperator.run(checkpointLock,
- new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+ new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
private int count = 0;
@Override
@@ -226,10 +227,10 @@ public class UnboundedSourceWrapperTest {
}
@Override
- public void collect(
- StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
+ public void collect(StreamRecord<WindowedValue<
+ ValueWithRecordId<KV<Integer, Integer>>>> windowedValueStreamRecord) {
- emittedElements.add(windowedValueStreamRecord.getValue().getValue());
+ emittedElements.add(windowedValueStreamRecord.getValue().getValue().getValue());
count++;
if (count >= numElements / 2) {
throw new SuccessException();
@@ -275,7 +276,7 @@ public class UnboundedSourceWrapperTest {
assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size());
StreamSource<
- WindowedValue<KV<Integer, Integer>>,
+ WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
UnboundedSourceWrapper<
KV<Integer, Integer>,
TestCountingSource.CounterMark>> restoredSourceOperator =
@@ -292,7 +293,7 @@ public class UnboundedSourceWrapperTest {
try {
restoredSourceOperator.open();
restoredSourceOperator.run(checkpointLock,
- new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+ new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
private int count = 0;
@Override
@@ -304,9 +305,9 @@ public class UnboundedSourceWrapperTest {
}
@Override
- public void collect(
- StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
- emittedElements.add(windowedValueStreamRecord.getValue().getValue());
+ public void collect(StreamRecord<WindowedValue<
+ ValueWithRecordId<KV<Integer, Integer>>>> windowedValueStreamRecord) {
+ emittedElements.add(windowedValueStreamRecord.getValue().getValue().getValue());
count++;
if (count >= numElements / 2) {
throw new SuccessException();