You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/06/07 17:49:58 UTC
[2/4] beam git commit: [BEAM-1779] Port UnboundedSourceWrapperTest to
use Flink operator test harness
[BEAM-1779] Port UnboundedSourceWrapperTest to use Flink operator test harness
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c1dc8f53
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c1dc8f53
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c1dc8f53
Branch: refs/heads/master
Commit: c1dc8f53c5438b575a7e84e9f680616ead49d61e
Parents: 62b942a
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Mar 22 11:43:30 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jun 7 19:43:11 2017 +0200
----------------------------------------------------------------------
.../flink/streaming/TestCountingSource.java | 48 +++--
.../streaming/UnboundedSourceWrapperTest.java | 198 +++++++------------
2 files changed, 110 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c1dc8f53/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
index 3a08088..edf548a 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
@@ -133,18 +133,8 @@ public class TestCountingSource
public Coder<CounterMark> getCheckpointMarkCoder() {
return DelegateCoder.of(
VarIntCoder.of(),
- new DelegateCoder.CodingFunction<CounterMark, Integer>() {
- @Override
- public Integer apply(CounterMark input) {
- return input.current;
- }
- },
- new DelegateCoder.CodingFunction<Integer, CounterMark>() {
- @Override
- public CounterMark apply(Integer input) {
- return new CounterMark(input);
- }
- });
+ new FromCounterMark(),
+ new ToCounterMark());
}
@Override
@@ -251,4 +241,38 @@ public class TestCountingSource
public Coder<KV<Integer, Integer>> getDefaultOutputCoder() {
return KvCoder.of(VarIntCoder.of(), VarIntCoder.of());
}
+
+ private class FromCounterMark implements DelegateCoder.CodingFunction<CounterMark, Integer> {
+ @Override
+ public Integer apply(CounterMark input) {
+ return input.current;
+ }
+
+ @Override
+ public int hashCode() {
+ return FromCounterMark.class.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof FromCounterMark;
+ }
+ }
+
+ private class ToCounterMark implements DelegateCoder.CodingFunction<Integer, CounterMark> {
+ @Override
+ public CounterMark apply(Integer input) {
+ return new CounterMark(input);
+ }
+
+ @Override
+ public int hashCode() {
+ return ToCounterMark.class.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof ToCounterMark;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1dc8f53/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index e3875bc..716e71d 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -20,36 +20,20 @@ package org.apache.beam.runners.flink.streaming;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.ValueWithRecordId;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -57,15 +41,14 @@ import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.OutputTag;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.mockito.Matchers;
/**
* Tests for {@link UnboundedSourceWrapper}.
@@ -125,10 +108,18 @@ public class UnboundedSourceWrapperTest {
KV<Integer, Integer>,
TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
- setupSourceOperator(sourceOperator, numTasks);
+ AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
+ testHarness =
+ new AbstractStreamOperatorTestHarness<>(
+ sourceOperator,
+ numTasks /* max parallelism */,
+ numTasks /* parallelism */,
+ 0 /* subtask index */);
+
+ testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
try {
- sourceOperator.open();
+ testHarness.open();
sourceOperator.run(checkpointLock,
new TestStreamStatusMaintainer(),
new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
@@ -200,29 +191,22 @@ public class UnboundedSourceWrapperTest {
TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
- OperatorStateStore backend = mock(OperatorStateStore.class);
-
- TestingListState<KV<UnboundedSource, TestCountingSource.CounterMark>>
- listState = new TestingListState<>();
-
- when(backend.getOperatorState(Matchers.any(ListStateDescriptor.class)))
- .thenReturn(listState);
-
- StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
- when(initializationContext.getOperatorStateStore()).thenReturn(backend);
- when(initializationContext.isRestored()).thenReturn(false, true);
+ AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
+ testHarness =
+ new AbstractStreamOperatorTestHarness<>(
+ sourceOperator,
+ numTasks /* max parallelism */,
+ numTasks /* parallelism */,
+ 0 /* subtask index */);
- flinkWrapper.initializeState(initializationContext);
-
- setupSourceOperator(sourceOperator, numTasks);
+ testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
final Set<KV<Integer, Integer>> emittedElements = new HashSet<>();
boolean readFirstBatchOfElements = false;
try {
- sourceOperator.open();
+ testHarness.open();
sourceOperator.run(checkpointLock,
new TestStreamStatusMaintainer(),
new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
@@ -265,21 +249,12 @@ public class UnboundedSourceWrapperTest {
assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements);
// draw a snapshot
- flinkWrapper.snapshotState(new StateSnapshotContextSynchronousImpl(0, 0));
-
- // test snapshot offsets
- assertEquals(flinkWrapper.getLocalSplitSources().size(),
- listState.getList().size());
- int totalEmit = 0;
- for (KV<UnboundedSource, TestCountingSource.CounterMark> kv : listState.get()) {
- totalEmit += kv.getValue().current + 1;
- }
- assertEquals(numElements / 2, totalEmit);
+ OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
// test that finalizeCheckpoint on CheckpointMark is called
final ArrayList<Integer> finalizeList = new ArrayList<>();
TestCountingSource.setFinalizeTracker(finalizeList);
- flinkWrapper.notifyCheckpointComplete(0);
+ testHarness.notifyOfCompletedCheckpoint(0);
assertEquals(flinkWrapper.getLocalSplitSources().size(), finalizeList.size());
// create a completely new source but restore from the snapshot
@@ -297,16 +272,25 @@ public class UnboundedSourceWrapperTest {
TestCountingSource.CounterMark>> restoredSourceOperator =
new StreamSource<>(restoredFlinkWrapper);
- setupSourceOperator(restoredSourceOperator, numTasks);
+ // set parallelism to 1 to ensure that our testing operator gets all checkpointed state
+ AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
+ restoredTestHarness =
+ new AbstractStreamOperatorTestHarness<>(
+ restoredSourceOperator,
+ numTasks /* max parallelism */,
+ 1 /* parallelism */,
+ 0 /* subtask index */);
+
+ restoredTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
// restore snapshot
- restoredFlinkWrapper.initializeState(initializationContext);
+ restoredTestHarness.initializeState(snapshot);
boolean readSecondBatchOfElements = false;
// run again and verify that we see the other elements
try {
- restoredSourceOperator.open();
+ restoredTestHarness.open();
restoredSourceOperator.run(checkpointLock,
new TestStreamStatusMaintainer(),
new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
@@ -345,7 +329,9 @@ public class UnboundedSourceWrapperTest {
readSecondBatchOfElements = true;
}
- assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
+ assertEquals(
+ Math.max(1, numSplits / numTasks),
+ restoredFlinkWrapper.getLocalSplitSources().size());
assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements);
@@ -364,68 +350,57 @@ public class UnboundedSourceWrapperTest {
return null;
}
};
+
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
new UnboundedSourceWrapper<>("stepName", options, source, numSplits);
- OperatorStateStore backend = mock(OperatorStateStore.class);
-
- TestingListState<KV<UnboundedSource, TestCountingSource.CounterMark>>
- listState = new TestingListState<>();
-
- when(backend.getOperatorState(Matchers.any(ListStateDescriptor.class)))
- .thenReturn(listState);
-
- StateInitializationContext initializationContext = mock(StateInitializationContext.class);
-
- when(initializationContext.getOperatorStateStore()).thenReturn(backend);
- when(initializationContext.isRestored()).thenReturn(false, true);
+ StreamSource<
+ WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
+ UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark>>
+ sourceOperator = new StreamSource<>(flinkWrapper);
- flinkWrapper.initializeState(initializationContext);
+ AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
+ testHarness =
+ new AbstractStreamOperatorTestHarness<>(
+ sourceOperator,
+ numTasks /* max parallelism */,
+ numTasks /* parallelism */,
+ 0 /* subtask index */);
- StreamSource sourceOperator = new StreamSource<>(flinkWrapper);
- setupSourceOperator(sourceOperator, numTasks);
- sourceOperator.open();
+ testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
- flinkWrapper.snapshotState(new StateSnapshotContextSynchronousImpl(0, 0));
+ testHarness.open();
- assertEquals(0, listState.getList().size());
+ OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
UnboundedSourceWrapper<
KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
- new UnboundedSourceWrapper<>("stepName", options, new TestCountingSource(numElements),
- numSplits);
-
- StreamSource restoredSourceOperator = new StreamSource<>(flinkWrapper);
- setupSourceOperator(restoredSourceOperator, numTasks);
- sourceOperator.open();
-
- restoredFlinkWrapper.initializeState(initializationContext);
-
- assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
-
- }
+ new UnboundedSourceWrapper<>(
+ "stepName", options, new TestCountingSource(numElements), numSplits);
- @SuppressWarnings("unchecked")
- private static <T> void setupSourceOperator(StreamSource<T, ?> operator, int numSubTasks) {
- ExecutionConfig executionConfig = new ExecutionConfig();
- StreamConfig cfg = new StreamConfig(new Configuration());
+ StreamSource<
+ WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
+ UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark>>
+ restoredSourceOperator =
+ new StreamSource<>(restoredFlinkWrapper);
- cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
+ // set parallelism to 1 to ensure that our testing operator gets all checkpointed state
+ AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
+ restoredTestHarness =
+ new AbstractStreamOperatorTestHarness<>(
+ restoredSourceOperator,
+ numTasks /* max parallelism */,
+ 1 /* parallelism */,
+ 0 /* subtask index */);
- Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 0);
+ restoredTestHarness.setup();
+ restoredTestHarness.initializeState(snapshot);
+ restoredTestHarness.open();
- StreamTask<?, ?> mockTask = mock(StreamTask.class);
- when(mockTask.getName()).thenReturn("Mock Task");
- when(mockTask.getCheckpointLock()).thenReturn(new Object());
- when(mockTask.getConfiguration()).thenReturn(cfg);
- when(mockTask.getEnvironment()).thenReturn(env);
- when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
- when(mockTask.getAccumulatorMap())
- .thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
- TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
- when(mockTask.getProcessingTimeService()).thenReturn(testProcessingTimeService);
+ // when the source checkpointed a null we don't re-initialize the splits, that is we
+ // will have no splits.
+ assertEquals(0, restoredFlinkWrapper.getLocalSplitSources().size());
- operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
}
/**
@@ -458,31 +433,6 @@ public class UnboundedSourceWrapperTest {
}
- private static final class TestingListState<T> implements ListState<T> {
-
- private final List<T> list = new ArrayList<>();
-
- @Override
- public void clear() {
- list.clear();
- }
-
- @Override
- public Iterable<T> get() throws Exception {
- return list;
- }
-
- @Override
- public void add(T value) throws Exception {
- list.add(value);
- }
-
- public List<T> getList() {
- return list;
- }
-
- }
-
private static final class TestStreamStatusMaintainer implements StreamStatusMaintainer {
StreamStatus currentStreamStatus = StreamStatus.ACTIVE;