You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/21 17:14:26 UTC
[11/11] flink git commit: [FLINK-4877] Refactor Operator
TestHarnesses to use Common Base Class
[FLINK-4877] Refactor Operator TestHarnesses to use Common Base Class
This also introduces KeyedTwoInputStreamOperatorTestHarness which
is similar to KeyedOneInputStreamOperatorTestHarness
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f305baab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f305baab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f305baab
Branch: refs/heads/master
Commit: f305baabbcd9b035b696b8b7a4334deaf1b28d23
Parents: 0859a69
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Sep 29 16:04:29 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 21 19:03:05 2016 +0200
----------------------------------------------------------------------
.../runtime/streamrecord/StreamRecord.java | 2 +-
.../util/AbstractStreamOperatorTestHarness.java | 366 +++++++++++++++++++
.../KeyedOneInputStreamOperatorTestHarness.java | 18 +-
.../KeyedTwoInputStreamOperatorTestHarness.java | 144 ++++++++
.../util/OneInputStreamOperatorTestHarness.java | 328 +----------------
.../util/TwoInputStreamOperatorTestHarness.java | 130 +------
6 files changed, 537 insertions(+), 451 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
index 9f75161..da606a9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
@@ -171,7 +171,7 @@ public final class StreamRecord<T> extends StreamElement {
else if (o != null && getClass() == o.getClass()) {
StreamRecord<?> that = (StreamRecord<?>) o;
return this.hasTimestamp == that.hasTimestamp &&
- this.timestamp == that.timestamp &&
+ (!this.hasTimestamp || this.timestamp == that.timestamp) &&
(this.value == null ? that.value == null : this.value.equals(that.value));
}
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
new file mode 100644
index 0000000..a61d995
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ClosableRegistry;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+/**
+ * Base class for {@code AbstractStreamOperator} test harnesses.
+ */
+public class AbstractStreamOperatorTestHarness<OUT> {
+
+ public static final int MAX_PARALLELISM = 10;
+
+ final protected StreamOperator<OUT> operator;
+
+ final protected ConcurrentLinkedQueue<Object> outputList;
+
+ final protected StreamConfig config;
+
+ final protected ExecutionConfig executionConfig;
+
+ final protected TestProcessingTimeService processingTimeService;
+
+ final protected StreamTask<?, ?> mockTask;
+
+ ClosableRegistry closableRegistry;
+
+ // use this as default for tests
+ protected AbstractStateBackend stateBackend = new MemoryStateBackend();
+
+ private final Object checkpointLock;
+
+ /**
+ * Whether setup() was called on the operator. This is reset when calling close().
+ */
+ private boolean setupCalled = false;
+ private boolean initializeCalled = false;
+
+ private volatile boolean wasFailedExternally = false;
+
+ public AbstractStreamOperatorTestHarness(StreamOperator<OUT> operator) throws Exception {
+ this(operator, new ExecutionConfig());
+ }
+
+ public AbstractStreamOperatorTestHarness(
+ StreamOperator<OUT> operator,
+ ExecutionConfig executionConfig) throws Exception {
+ this.operator = operator;
+ this.outputList = new ConcurrentLinkedQueue<>();
+ Configuration underlyingConfig = new Configuration();
+ this.config = new StreamConfig(underlyingConfig);
+ this.config.setCheckpointingEnabled(true);
+ this.executionConfig = executionConfig;
+ this.closableRegistry = new ClosableRegistry();
+ this.checkpointLock = new Object();
+
+ final Environment env = new MockEnvironment(
+ "MockTask",
+ 3 * 1024 * 1024,
+ new MockInputSplitProvider(),
+ 1024,
+ underlyingConfig,
+ executionConfig,
+ MAX_PARALLELISM,
+ 1, 0);
+
+ mockTask = mock(StreamTask.class);
+ processingTimeService = new TestProcessingTimeService();
+ processingTimeService.setCurrentTime(0);
+
+ when(mockTask.getName()).thenReturn("Mock Task");
+ when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
+ when(mockTask.getConfiguration()).thenReturn(config);
+ when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
+ when(mockTask.getEnvironment()).thenReturn(env);
+ when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
+ when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
+ when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ wasFailedExternally = true;
+ return null;
+ }
+ }).when(mockTask).handleAsyncException(any(String.class), any(Throwable.class));
+
+ try {
+ doAnswer(new Answer<CheckpointStreamFactory>() {
+ @Override
+ public CheckpointStreamFactory answer(InvocationOnMock invocationOnMock) throws Throwable {
+
+ final StreamOperator<?> operator = (StreamOperator<?>) invocationOnMock.getArguments()[0];
+ return stateBackend.createStreamFactory(new JobID(), operator.getClass().getSimpleName());
+ }
+ }).when(mockTask).createCheckpointStreamFactory(any(StreamOperator.class));
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+
+ try {
+ doAnswer(new Answer<OperatorStateBackend>() {
+ @Override
+ public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
+ final StreamOperator<?> operator = (StreamOperator<?>) invocationOnMock.getArguments()[0];
+ final Collection<OperatorStateHandle> stateHandles = (Collection<OperatorStateHandle>) invocationOnMock.getArguments()[1];
+ OperatorStateBackend osb;
+ if (null == stateHandles) {
+ osb = stateBackend.createOperatorStateBackend(env, operator.getClass().getSimpleName());
+ } else {
+ osb = stateBackend.restoreOperatorStateBackend(env, operator.getClass().getSimpleName(), stateHandles);
+ }
+ mockTask.getCancelables().registerClosable(osb);
+ return osb;
+ }
+ }).when(mockTask).createOperatorStateBackend(any(StreamOperator.class), any(Collection.class));
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+
+ doAnswer(new Answer<ProcessingTimeService>() {
+ @Override
+ public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable {
+ return processingTimeService;
+ }
+ }).when(mockTask).getProcessingTimeService();
+ }
+
+ public void setStateBackend(AbstractStateBackend stateBackend) {
+ this.stateBackend = stateBackend;
+ }
+
+ public Object getCheckpointLock() {
+ return mockTask.getCheckpointLock();
+ }
+
+ public Environment getEnvironment() {
+ return this.mockTask.getEnvironment();
+ }
+
+ /**
+ * Get all the output from the task. This contains StreamRecords and Events interleaved.
+ */
+ public ConcurrentLinkedQueue<Object> getOutput() {
+ return outputList;
+ }
+
+ /**
+ * Get all the output from the task and clear the output buffer.
+ * This contains only StreamRecords.
+ */
+ @SuppressWarnings("unchecked")
+ public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() {
+ List<StreamRecord<? extends OUT>> resultElements = new LinkedList<>();
+ for (Object e: getOutput()) {
+ if (e instanceof StreamRecord) {
+ resultElements.add((StreamRecord<OUT>) e);
+ }
+ }
+ return resultElements;
+ }
+
+ /**
+ * Calls
+ * {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
+ */
+ public void setup() throws Exception {
+ operator.setup(mockTask, config, new MockOutput());
+ setupCalled = true;
+ }
+
+ /**
+ * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
+ * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)}
+ * if it was not called before.
+ */
+ public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception {
+ if (!setupCalled) {
+ setup();
+ }
+ operator.initializeState(operatorStateHandles);
+ initializeCalled = true;
+ }
+
+
+ /**
+ * Calls {@link StreamOperator#open()}. This also
+ * calls {@link StreamOperator#setup(StreamTask, StreamConfig, Output)}
+ * if it was not called before.
+ */
+ public void open() throws Exception {
+ if (!initializeCalled) {
+ initializeState(null);
+ }
+ operator.open();
+ }
+
+ /**
+ * Calls {@link StreamOperator#snapshotState(long, long, CheckpointStreamFactory)}.
+ */
+ public OperatorSnapshotResult snapshot(long checkpointId, long timestamp) throws Exception {
+
+ CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory(
+ new JobID(),
+ "test_op");
+
+ return operator.snapshotState(checkpointId, timestamp, streamFactory);
+ }
+
+ /**
+ * Calls {@link StreamCheckpointedOperator#snapshotState(FSDataOutputStream, long, long)} if
+ * the operator implements this interface.
+ */
+ @Deprecated
+ public StreamStateHandle snapshotLegacy(long checkpointId, long timestamp) throws Exception {
+
+ CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory(
+ new JobID(),
+ "test_op").createCheckpointStateOutputStream(checkpointId, timestamp);
+ if(operator instanceof StreamCheckpointedOperator) {
+ ((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp);
+ return outStream.closeAndGetHandle();
+ } else {
+ throw new RuntimeException("Operator is not StreamCheckpointedOperator");
+ }
+ }
+
+ /**
+ * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)} ()}
+ */
+ public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
+ operator.notifyOfCompletedCheckpoint(checkpointId);
+ }
+
+ /**
+ * Calls {@link StreamCheckpointedOperator#restoreState(FSDataInputStream)} if
+ * the operator implements this interface.
+ */ @Deprecated
+ public void restore(StreamStateHandle snapshot) throws Exception {
+ if(operator instanceof StreamCheckpointedOperator) {
+ try (FSDataInputStream in = snapshot.openInputStream()) {
+ ((StreamCheckpointedOperator) operator).restoreState(in);
+ }
+ } else {
+ throw new RuntimeException("Operator is not StreamCheckpointedOperator");
+ }
+ }
+
+ /**
+ * Calls close and dispose on the operator.
+ */
+ public void close() throws Exception {
+ operator.close();
+ operator.dispose();
+ if (processingTimeService != null) {
+ processingTimeService.shutdownService();
+ }
+ setupCalled = false;
+ }
+
+ public void setProcessingTime(long time) throws Exception {
+ processingTimeService.setCurrentTime(time);
+ }
+
+ public long getProcessingTime() {
+ return processingTimeService.getCurrentProcessingTime();
+ }
+
+ public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
+ this.config.setTimeCharacteristic(timeCharacteristic);
+ }
+
+ public TimeCharacteristic getTimeCharacteristic() {
+ return this.config.getTimeCharacteristic();
+ }
+
+ public boolean wasFailedExternally() {
+ return wasFailedExternally;
+ }
+
+ private class MockOutput implements Output<StreamRecord<OUT>> {
+
+ private TypeSerializer<OUT> outputSerializer;
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ outputList.add(mark);
+ }
+
+ @Override
+ public void emitLatencyMarker(LatencyMarker latencyMarker) {
+ outputList.add(latencyMarker);
+ }
+
+ @Override
+ public void collect(StreamRecord<OUT> element) {
+ if (outputSerializer == null) {
+ outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
+ }
+ if (element.hasTimestamp()) {
+ outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()),element.getTimestamp()));
+ } else {
+ outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue())));
+ }
+ }
+
+ @Override
+ public void close() {
+ // ignore
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 9c9d11b..99527e7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -50,7 +50,6 @@ import static org.mockito.Mockito.doAnswer;
/**
* Extension of {@link OneInputStreamOperatorTestHarness} that allows the operator to get
* a {@link KeyedStateBackend}.
- *
*/
public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
extends OneInputStreamOperatorTestHarness<IN, OUT> {
@@ -171,7 +170,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
}
/**
- *
+ *
*/
@Override
public void restore(StreamStateHandle snapshot) throws Exception {
@@ -189,21 +188,12 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
}
}
- /**
- * Calls close and dispose on the operator.
- */
- public void close() throws Exception {
- super.close();
- if (keyedStateBackend != null) {
- keyedStateBackend.dispose();
- }
- }
-
@Override
public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception {
- if (null != operatorStateHandles) {
- this.restoredKeyedState = operatorStateHandles.getManagedKeyedState();
+ if (operatorStateHandles != null) {
+ restoredKeyedState = operatorStateHandles.getManagedKeyedState();
}
+
super.initializeState(operatorStateHandles);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
new file mode 100644
index 0000000..2e9885c
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Collection;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doAnswer;
+
+/**
+ * Extension of {@link TwoInputStreamOperatorTestHarness} that allows the operator to get
+ * a {@link KeyedStateBackend}.
+ */
+public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
+ extends TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
+
+ // in case the operator creates one we store it here so that we
+ // can snapshot its state
+ private AbstractKeyedStateBackend<?> keyedStateBackend = null;
+
+ // when we restore we keep the state here so that we can call restore
+ // when the operator requests the keyed state backend
+ private Collection<KeyGroupsStateHandle> restoredKeyedState = null;
+
+ public KeyedTwoInputStreamOperatorTestHarness(
+ TwoInputStreamOperator<IN1, IN2, OUT> operator,
+ final KeySelector<IN1, K> keySelector1,
+ final KeySelector<IN2, K> keySelector2,
+ TypeInformation<K> keyType) throws Exception {
+ super(operator);
+
+ ClosureCleaner.clean(keySelector1, false);
+ ClosureCleaner.clean(keySelector2, false);
+ config.setStatePartitioner(0, keySelector1);
+ config.setStatePartitioner(1, keySelector2);
+ config.setStateKeySerializer(keyType.createSerializer(executionConfig));
+ config.setNumberOfKeyGroups(MAX_PARALLELISM);
+
+ setupMockTaskCreateKeyedBackend();
+ }
+
+ public KeyedTwoInputStreamOperatorTestHarness(
+ TwoInputStreamOperator<IN1, IN2, OUT> operator,
+ ExecutionConfig executionConfig,
+ KeySelector<IN1, K> keySelector1,
+ KeySelector<IN2, K> keySelector2,
+ TypeInformation<K> keyType) throws Exception {
+ super(operator, executionConfig);
+
+ ClosureCleaner.clean(keySelector1, false);
+ ClosureCleaner.clean(keySelector2, false);
+ config.setStatePartitioner(0, keySelector1);
+ config.setStatePartitioner(1, keySelector2);
+ config.setStateKeySerializer(keyType.createSerializer(executionConfig));
+ config.setNumberOfKeyGroups(MAX_PARALLELISM);
+
+ setupMockTaskCreateKeyedBackend();
+ }
+
+ private void setupMockTaskCreateKeyedBackend() {
+
+ try {
+ doAnswer(new Answer<KeyedStateBackend>() {
+ @Override
+ public KeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
+
+ final TypeSerializer keySerializer = (TypeSerializer) invocationOnMock.getArguments()[0];
+ final int numberOfKeyGroups = (Integer) invocationOnMock.getArguments()[1];
+ final KeyGroupRange keyGroupRange = (KeyGroupRange) invocationOnMock.getArguments()[2];
+
+ if(keyedStateBackend != null) {
+ keyedStateBackend.close();
+ }
+
+ if (restoredKeyedState == null) {
+ keyedStateBackend = stateBackend.createKeyedStateBackend(
+ mockTask.getEnvironment(),
+ new JobID(),
+ "test_op",
+ keySerializer,
+ numberOfKeyGroups,
+ keyGroupRange,
+ mockTask.getEnvironment().getTaskKvStateRegistry());
+ return keyedStateBackend;
+ } else {
+ keyedStateBackend = stateBackend.restoreKeyedStateBackend(
+ mockTask.getEnvironment(),
+ new JobID(),
+ "test_op",
+ keySerializer,
+ numberOfKeyGroups,
+ keyGroupRange,
+ restoredKeyedState,
+ mockTask.getEnvironment().getTaskKvStateRegistry());
+ restoredKeyedState = null;
+ return keyedStateBackend;
+ }
+ }
+ }).when(mockTask).createKeyedStateBackend(any(TypeSerializer.class), anyInt(), any(KeyGroupRange.class));
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception {
+ if (restoredKeyedState != null) {
+ restoredKeyedState = operatorStateHandles.getManagedKeyedState();
+ }
+
+ super.initializeState(operatorStateHandles);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 5b277bf..a3e095a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -18,89 +18,23 @@
package org.apache.flink.streaming.util;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.ClosableRegistry;
-import org.apache.flink.runtime.state.OperatorStateBackend;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.util.InstantiationUtil;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.RunnableFuture;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
/**
* A test harness for testing a {@link OneInputStreamOperator}.
*
- * <p>
- * This mock task provides the operator with a basic runtime context and allows pushing elements
+ * <p>This mock task provides the operator with a basic runtime context and allows pushing elements
* and watermarks into the operator. {@link java.util.Deque}s containing the emitted elements
* and watermarks can be retrieved. You are free to modify these.
*/
-public class OneInputStreamOperatorTestHarness<IN, OUT> {
-
- public static final int MAX_PARALLELISM = 10;
-
- final OneInputStreamOperator<IN, OUT> operator;
-
- final ConcurrentLinkedQueue<Object> outputList;
-
- final StreamConfig config;
-
- final ExecutionConfig executionConfig;
-
- final TestProcessingTimeService processingTimeService;
-
- StreamTask<?, ?> mockTask;
-
- ClosableRegistry closableRegistry;
-
- // use this as default for tests
- AbstractStateBackend stateBackend = new MemoryStateBackend();
+public class OneInputStreamOperatorTestHarness<IN, OUT>
+ extends AbstractStreamOperatorTestHarness<OUT> {
- private final Object checkpointLock;
-
- /**
- * Whether setup() was called on the operator. This is reset when calling close().
- */
- private boolean setupCalled = false;
- private boolean initializeCalled = false;
-
- private volatile boolean wasFailedExternally = false;
+ private final OneInputStreamOperator<IN, OUT> oneInputOperator;
public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) throws Exception {
this(operator, new ExecutionConfig());
@@ -109,268 +43,24 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
public OneInputStreamOperatorTestHarness(
OneInputStreamOperator<IN, OUT> operator,
ExecutionConfig executionConfig) throws Exception {
- this.operator = operator;
- this.outputList = new ConcurrentLinkedQueue<>();
- Configuration underlyingConfig = new Configuration();
- this.config = new StreamConfig(underlyingConfig);
- this.config.setCheckpointingEnabled(true);
- this.executionConfig = executionConfig;
- this.closableRegistry = new ClosableRegistry();
-
- this.checkpointLock = new Object();
-
- final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig, executionConfig, MAX_PARALLELISM, 1, 0);
- mockTask = mock(StreamTask.class);
- processingTimeService = new TestProcessingTimeService();
- processingTimeService.setCurrentTime(0);
-
- when(mockTask.getName()).thenReturn("Mock Task");
- when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
- when(mockTask.getConfiguration()).thenReturn(config);
- when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
- when(mockTask.getEnvironment()).thenReturn(env);
- when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
- when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
- when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- wasFailedExternally = true;
- return null;
- }
- }).when(mockTask).handleAsyncException(any(String.class), any(Throwable.class));
-
- try {
- doAnswer(new Answer<CheckpointStreamFactory>() {
- @Override
- public CheckpointStreamFactory answer(InvocationOnMock invocationOnMock) throws Throwable {
-
- final StreamOperator<?> operator = (StreamOperator<?>) invocationOnMock.getArguments()[0];
- return stateBackend.createStreamFactory(new JobID(), operator.getClass().getSimpleName());
- }
- }).when(mockTask).createCheckpointStreamFactory(any(StreamOperator.class));
- } catch (Exception e) {
- throw new RuntimeException(e.getMessage(), e);
- }
-
- try {
- doAnswer(new Answer<OperatorStateBackend>() {
- @Override
- public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
- final StreamOperator<?> operator = (StreamOperator<?>) invocationOnMock.getArguments()[0];
- final Collection<OperatorStateHandle> stateHandles = (Collection<OperatorStateHandle>) invocationOnMock.getArguments()[1];
- OperatorStateBackend osb;
- if (null == stateHandles) {
- osb = stateBackend.createOperatorStateBackend(env, operator.getClass().getSimpleName());
- } else {
- osb = stateBackend.restoreOperatorStateBackend(env, operator.getClass().getSimpleName(), stateHandles);
- }
- mockTask.getCancelables().registerClosable(osb);
- return osb;
- }
- }).when(mockTask).createOperatorStateBackend(any(StreamOperator.class), any(Collection.class));
- } catch (Exception e) {
- throw new RuntimeException(e.getMessage(), e);
- }
-
- doAnswer(new Answer<ProcessingTimeService>() {
- @Override
- public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable {
- return processingTimeService;
- }
- }).when(mockTask).getProcessingTimeService();
- }
-
- public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
- this.config.setTimeCharacteristic(timeCharacteristic);
- }
-
- public TimeCharacteristic getTimeCharacteristic() {
- return this.config.getTimeCharacteristic();
- }
-
- public boolean wasFailedExternally() {
- return wasFailedExternally;
- }
-
- public void setStateBackend(AbstractStateBackend stateBackend) {
- this.stateBackend = stateBackend;
- }
-
- public Object getCheckpointLock() {
- return mockTask.getCheckpointLock();
- }
-
- public Environment getEnvironment() {
- return this.mockTask.getEnvironment();
- }
-
- /**
- * Get all the output from the task. This contains StreamRecords and Events interleaved.
- */
- public ConcurrentLinkedQueue<Object> getOutput() {
- return outputList;
- }
-
- /**
- * Get all the output from the task and clear the output buffer.
- * This contains only StreamRecords.
- */
- @SuppressWarnings("unchecked")
- public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() {
- List<StreamRecord<? extends OUT>> resultElements = new LinkedList<>();
- for (Object e: getOutput()) {
- if (e instanceof StreamRecord) {
- resultElements.add((StreamRecord<OUT>) e);
- }
- }
- return resultElements;
- }
-
- /**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
- */
- public void setup() throws Exception {
- operator.setup(mockTask, config, new MockOutput());
- setupCalled = true;
- }
-
- /**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)}
- * if it was not called before.
- */
- public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception {
- if (!setupCalled) {
- setup();
- }
- operator.initializeState(operatorStateHandles);
- initializeCalled = true;
- }
-
- /**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}.
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)} if it
- * was not called before.
- */
- public void open() throws Exception {
- if (!initializeCalled) {
- initializeState(null);
- }
- operator.open();
- }
-
- /**
- *
- */
- public OperatorSnapshotResult snapshot(long checkpointId, long timestamp) throws Exception {
-
- CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory(
- new JobID(),
- "test_op");
-
- return operator.snapshotState(checkpointId, timestamp, streamFactory);
- }
-
- /**
- *
- */
- @Deprecated
- public StreamStateHandle snapshotLegacy(long checkpointId, long timestamp) throws Exception {
-
- CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory(
- new JobID(),
- "test_op").createCheckpointStateOutputStream(checkpointId, timestamp);
- if(operator instanceof StreamCheckpointedOperator) {
- ((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp);
- return outStream.closeAndGetHandle();
- } else {
- throw new RuntimeException("Operator is not StreamCheckpointedOperator");
- }
- }
-
- /**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)} ()}
- */
- public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
- operator.notifyOfCompletedCheckpoint(checkpointId);
- }
+ super(operator, executionConfig);
- /**
- *
- */
- @Deprecated
- public void restore(StreamStateHandle snapshot) throws Exception {
- if(operator instanceof StreamCheckpointedOperator) {
- try (FSDataInputStream in = snapshot.openInputStream()) {
- ((StreamCheckpointedOperator) operator).restoreState(in);
- }
- } else {
- throw new RuntimeException("Operator is not StreamCheckpointedOperator");
- }
- }
-
- /**
- * Calls close and dispose on the operator.
- */
- public void close() throws Exception {
- operator.close();
- operator.dispose();
- if (processingTimeService != null) {
- processingTimeService.shutdownService();
- }
- setupCalled = false;
+ this.oneInputOperator = operator;
}
public void processElement(StreamRecord<IN> element) throws Exception {
operator.setKeyContextElement1(element);
- operator.processElement(element);
+ oneInputOperator.processElement(element);
}
public void processElements(Collection<StreamRecord<IN>> elements) throws Exception {
for (StreamRecord<IN> element: elements) {
operator.setKeyContextElement1(element);
- operator.processElement(element);
- }
- }
-
- public void setProcessingTime(long time) throws Exception {
- synchronized (checkpointLock) {
- processingTimeService.setCurrentTime(time);
+ oneInputOperator.processElement(element);
}
}
public void processWatermark(Watermark mark) throws Exception {
- operator.processWatermark(mark);
- }
-
- private class MockOutput implements Output<StreamRecord<OUT>> {
-
- private TypeSerializer<OUT> outputSerializer;
-
- @Override
- public void emitWatermark(Watermark mark) {
- outputList.add(mark);
- }
-
- @Override
- public void emitLatencyMarker(LatencyMarker latencyMarker) {
- outputList.add(latencyMarker);
- }
-
- @Override
- public void collect(StreamRecord<OUT> element) {
- if (outputSerializer == null) {
- outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
- }
- outputList.add(new StreamRecord<OUT>(outputSerializer.copy(element.getValue()),
- element.getTimestamp()));
- }
-
- @Override
- public void close() {
- // ignore
- }
+ oneInputOperator.processWatermark(mark);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index 7df6848..95eea98 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -19,26 +19,9 @@
package org.apache.flink.streaming.util;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.ClosableRegistry;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
/**
* A test harness for testing a {@link TwoInputStreamOperator}.
@@ -48,122 +31,35 @@ import static org.mockito.Mockito.when;
* and watermarks into the operator. {@link java.util.Deque}s containing the emitted elements
* and watermarks can be retrieved. you are free to modify these.
*/
-public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
-
- TwoInputStreamOperator<IN1, IN2, OUT> operator;
-
- final ConcurrentLinkedQueue<Object> outputList;
-
- final ExecutionConfig executionConfig;
-
- final Object checkpointLock;
+public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT>extends AbstractStreamOperatorTestHarness<OUT> {
- final ClosableRegistry closableRegistry;
+ private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator;
- boolean initializeCalled = false;
-
- public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator) {
- this(operator, new StreamConfig(new Configuration()));
+ public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator) throws Exception {
+ this(operator, new ExecutionConfig());
}
- public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator, StreamConfig config) {
- this.operator = operator;
- this.outputList = new ConcurrentLinkedQueue<Object>();
- this.executionConfig = new ExecutionConfig();
- this.checkpointLock = new Object();
- this.closableRegistry = new ClosableRegistry();
-
- Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024);
- StreamTask<?, ?> mockTask = mock(StreamTask.class);
- when(mockTask.getName()).thenReturn("Mock Task");
- when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
- when(mockTask.getConfiguration()).thenReturn(config);
- when(mockTask.getEnvironment()).thenReturn(env);
- when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
- when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
-
- operator.setup(mockTask, new StreamConfig(new Configuration()), new MockOutput());
- }
-
- /**
- * Get all the output from the task. This contains StreamRecords and Events interleaved. Use
- * {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
- * to extract only the StreamRecords.
- */
- public ConcurrentLinkedQueue<Object> getOutput() {
- return outputList;
- }
+ public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator, ExecutionConfig executionConfig) throws Exception {
+ super(operator, executionConfig);
- /**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
- */
- public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception {
- operator.initializeState(operatorStateHandles);
- initializeCalled = true;
- }
-
- /**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}.
- */
- public void open() throws Exception {
- if(!initializeCalled) {
- initializeState(mock(OperatorStateHandles.class));
- }
-
- operator.open();
- }
-
- /**
- * Calls close on the operator.
- */
- public void close() throws Exception {
- operator.close();
+ this.twoInputOperator = operator;
}
public void processElement1(StreamRecord<IN1> element) throws Exception {
- operator.processElement1(element);
+ twoInputOperator.setKeyContextElement1(element);
+ twoInputOperator.processElement1(element);
}
public void processElement2(StreamRecord<IN2> element) throws Exception {
- operator.processElement2(element);
+ twoInputOperator.setKeyContextElement2(element);
+ twoInputOperator.processElement2(element);
}
public void processWatermark1(Watermark mark) throws Exception {
- operator.processWatermark1(mark);
+ twoInputOperator.processWatermark1(mark);
}
public void processWatermark2(Watermark mark) throws Exception {
- operator.processWatermark2(mark);
- }
-
- private class MockOutput implements Output<StreamRecord<OUT>> {
-
- private TypeSerializer<OUT> outputSerializer;
-
- @Override
- @SuppressWarnings("unchecked")
- public void emitWatermark(Watermark mark) {
- outputList.add(mark);
- }
-
- @Override
- public void emitLatencyMarker(LatencyMarker latencyMarker) {
- outputList.add(latencyMarker);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void collect(StreamRecord<OUT> element) {
- if (outputSerializer == null) {
- outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
- }
- outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()),
- element.getTimestamp()));
- }
-
- @Override
- public void close() {
- // ignore
- }
+ twoInputOperator.processWatermark2(mark);
}
}