You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/17 14:14:52 UTC
[4/6] flink git commit: [FLINK-2462] [streaming] Major cleanup of
operator structure for exception handling and code simplication
http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index 8c354be..2ca2862 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -45,28 +45,32 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
* Implementation of the {@link RuntimeContext}, created by runtime stream UDF
* operators.
*/
-@SuppressWarnings("rawtypes")
public class StreamingRuntimeContext extends RuntimeUDFContext {
private final Environment env;
- private final Map<String, StreamOperatorState> states;
- private final List<PartitionedStreamOperatorState> partitionedStates;
+ private final Map<String, StreamOperatorState<?, ?>> states;
+ private final List<PartitionedStreamOperatorState<?, ?, ?>> partitionedStates;
private final KeySelector<?, ?> statePartitioner;
private final StateHandleProvider<Serializable> provider;
- private final ClassLoader cl;
-
+
+
@SuppressWarnings("unchecked")
- public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
- ExecutionConfig executionConfig, KeySelector<?, ?> statePartitioner,
- StateHandleProvider<?> provider, Map<String, Accumulator<?, ?>> accumulatorMap) {
- super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader,
- executionConfig, env.getDistributedCacheEntries(), accumulatorMap);
+ public StreamingRuntimeContext(
+ Environment env,
+ ExecutionConfig executionConfig,
+ KeySelector<?, ?> statePartitioner,
+ StateHandleProvider<?> provider,
+ Map<String, Accumulator<?, ?>> accumulatorMap) {
+
+ super(env.getTaskName(), env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(),
+ env.getUserClassLoader(), executionConfig,
+ env.getDistributedCacheEntries(), accumulatorMap);
+
this.env = env;
this.statePartitioner = statePartitioner;
- this.states = new HashMap<String, StreamOperatorState>();
- this.partitionedStates = new LinkedList<PartitionedStreamOperatorState>();
+ this.states = new HashMap<>();
+ this.partitionedStates = new LinkedList<>();
this.provider = (StateHandleProvider<Serializable>) provider;
- this.cl = userCodeClassLoader;
}
/**
@@ -121,14 +125,14 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
public StreamOperatorState<?, ?> getState(String name, boolean partitioned) {
// Try fetching state from the map
- StreamOperatorState state = states.get(name);
+ StreamOperatorState<?, ?> state = states.get(name);
if (state == null) {
// If not found, create empty state and add to the map
state = createRawState(partitioned);
states.put(name, state);
// We keep a reference to all partitioned states for registering input
if (state instanceof PartitionedStreamOperatorState) {
- partitionedStates.add((PartitionedStreamOperatorState) state);
+ partitionedStates.add((PartitionedStreamOperatorState<?, ?, ?>) state);
}
}
return state;
@@ -139,11 +143,11 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
*
* @return An empty operator state.
*/
- @SuppressWarnings("unchecked")
- public StreamOperatorState createRawState(boolean partitioned) {
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public StreamOperatorState<?, ?> createRawState(boolean partitioned) {
if (partitioned) {
if (statePartitioner != null) {
- return new PartitionedStreamOperatorState(provider, statePartitioner, cl);
+ return new PartitionedStreamOperatorState(provider, statePartitioner, getUserCodeClassLoader());
} else {
throw new RuntimeException(
"Partitioned state can only be used with KeyedDataStreams.");
@@ -158,7 +162,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
*
* @return All the states for the underlying operator.
*/
- public Map<String, StreamOperatorState> getOperatorStates() {
+ public Map<String, StreamOperatorState<?, ?>> getOperatorStates() {
return states;
}
@@ -169,7 +173,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
* @param nextRecord
* Next input of the operator.
*/
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "rawtypes"})
public void setNextInput(StreamRecord<?> nextRecord) {
if (statePartitioner != null) {
for (PartitionedStreamOperatorState state : partitionedStates) {
http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 8cf5a40..5d0497d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.runtime.tasks;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -27,114 +26,65 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputStreamOperator<IN1, IN2, OUT>> {
- private static final Logger LOG = LoggerFactory.getLogger(TwoInputStreamTask.class);
-
private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
+
+ private volatile boolean running = true;
@Override
- public void registerInputOutput() {
- try {
- super.registerInputOutput();
+ public void init() throws Exception {
+ TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
+ TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
- TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
- TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
+ int numberOfInputs = configuration.getNumberOfInputs();
- int numberOfInputs = configuration.getNumberOfInputs();
+ ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
+ ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
- ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
- ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
+ List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
- List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
-
- for (int i = 0; i < numberOfInputs; i++) {
- int inputType = inEdges.get(i).getTypeNumber();
- InputGate reader = getEnvironment().getInputGate(i);
- switch (inputType) {
- case 1:
- inputList1.add(reader);
- break;
- case 2:
- inputList2.add(reader);
- break;
- default:
- throw new RuntimeException("Invalid input type number: " + inputType);
- }
+ for (int i = 0; i < numberOfInputs; i++) {
+ int inputType = inEdges.get(i).getTypeNumber();
+ InputGate reader = getEnvironment().getInputGate(i);
+ switch (inputType) {
+ case 1:
+ inputList1.add(reader);
+ break;
+ case 2:
+ inputList2.add(reader);
+ break;
+ default:
+ throw new RuntimeException("Invalid input type number: " + inputType);
}
-
- this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
- inputDeserializer1, inputDeserializer2,
- getCheckpointBarrierListener(),
- configuration.getCheckpointMode(),
- getEnvironment().getIOManager(),
- getExecutionConfig().areTimestampsEnabled());
-
- // make sure that stream tasks report their I/O statistics
- AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
- AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
- this.inputProcessor.setReporter(reporter);
- }
- catch (Exception e) {
- throw new RuntimeException("Failed to initialize stream operator: " + e.getMessage(), e);
}
+
+ this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
+ inputDeserializer1, inputDeserializer2,
+ getCheckpointBarrierListener(),
+ configuration.getCheckpointMode(),
+ getEnvironment().getIOManager(),
+ getExecutionConfig().areTimestampsEnabled());
+
+ // make sure that stream tasks report their I/O statistics
+ AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+ AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
+ this.inputProcessor.setReporter(reporter);
}
@Override
- public void invoke() throws Exception {
- boolean operatorOpen = false;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Task {} invoked", getName());
- }
-
- try {
-
- openOperator();
- operatorOpen = true;
-
- while (inputProcessor.processInput(streamOperator)) {
- // do nothing, just keep processing
- }
-
- closeOperator();
- operatorOpen = false;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Task {} invocation finished", getName());
- }
-
- }
- catch (Exception e) {
- LOG.error(getEnvironment().getTaskNameWithSubtasks() + " failed", e);
-
- if (operatorOpen) {
- try {
- closeOperator();
- }
- catch (Throwable t) {
- LOG.warn("Exception while closing operator.", t);
- }
- }
-
- throw e;
- }
- finally {
- this.isRunning = false;
- // Cleanup
- outputHandler.flushOutputs();
- clearBuffers();
- }
-
+ protected void run() throws Exception {
+ while (running && inputProcessor.processInput(streamOperator));
}
@Override
- public void clearBuffers() throws IOException {
- super.clearBuffers();
- inputProcessor.clearBuffers();
+ protected void cleanup() throws Exception {
inputProcessor.cleanup();
}
+
+ @Override
+ protected void cancelTask() {
+ running = false;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 6ca38b7..60db798 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -141,9 +141,12 @@ public class StatefulOperatorTest {
KeySelector<Integer, Serializable> partitioner, byte[] serializedState) throws Exception {
final List<String> outputList = output;
- StreamingRuntimeContext context = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024,
- new MockInputSplitProvider(), 1024), null, new ExecutionConfig(), partitioner,
- new LocalStateHandleProvider<Serializable>(), new HashMap<String, Accumulator<?, ?>>());
+ StreamingRuntimeContext context = new StreamingRuntimeContext(
+ new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
+ new ExecutionConfig(),
+ partitioner,
+ new LocalStateHandleProvider<Serializable>(),
+ new HashMap<String, Accumulator<?, ?>>());
StreamMap<Integer, String> op = new StreamMap<Integer, String>(new StatefulMapper());
@@ -217,14 +220,15 @@ public class StatefulOperatorTest {
}
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ @SuppressWarnings("unchecked")
@Override
public void close() throws Exception {
- Map<String, StreamOperatorState> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
+ Map<String, StreamOperatorState<?, ?>> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
PartitionedStreamOperatorState<Integer, Integer, Integer> groupCounter = (PartitionedStreamOperatorState<Integer, Integer, Integer>) states.get("groupCounter");
for (Entry<Serializable, Integer> count : groupCounter.getPartitionedState().entrySet()) {
Integer key = (Integer) count.getKey();
Integer expected = key < 3 ? 2 : 1;
+
assertEquals(new MutableInt(expected), count.getValue());
}
}
@@ -257,11 +261,12 @@ public class StatefulOperatorTest {
groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true);
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ @SuppressWarnings("unchecked")
@Override
public void close() throws Exception {
- Map<String, StreamOperatorState> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
- PartitionedStreamOperatorState<Integer, Integer, Integer> groupCounter = (PartitionedStreamOperatorState<Integer, Integer, Integer>) states.get("groupCounter");
+ Map<String, StreamOperatorState<?, ?>> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
+ PartitionedStreamOperatorState<Integer, Integer, Integer> groupCounter =
+ (PartitionedStreamOperatorState<Integer, Integer, Integer>) states.get("groupCounter");
for (Entry<Serializable, Integer> count : groupCounter.getPartitionedState().entrySet()) {
Integer key = (Integer) count.getKey();
Integer expected = key < 3 ? 2 : 1;
http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
index f07e3a5..7cc1958 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
@@ -85,12 +85,7 @@ public class StreamRecordWriterTest {
}
finally {
if (testWriter != null) {
- try {
- testWriter.close();
- }
- catch (IOException e) {
- // ignore in tests
- }
+ testWriter.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 296324a..4c6957b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -168,7 +168,6 @@ public class OneInputStreamTaskTest {
* This test verifies that checkpoint barriers are correctly forwarded.
*/
@Test
- @SuppressWarnings("unchecked")
public void testCheckpointBarriers() throws Exception {
final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
@@ -226,7 +225,6 @@ public class OneInputStreamTaskTest {
* then all inputs receive barriers from a later checkpoint.
*/
@Test
- @SuppressWarnings("unchecked")
public void testOvertakingCheckpointBarriers() throws Exception {
final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
@@ -282,7 +280,6 @@ public class OneInputStreamTaskTest {
testHarness.waitForInputProcessing();
-
testHarness.endInput();
testHarness.waitForTaskCompletion();
http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index 4f07fdb..7fb8ba3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -62,7 +62,6 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes
*/
public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task,
int numInputGates,
-
int numInputChannelsPerGate,
TypeInformation<IN> inputType,
TypeInformation<OUT> outputType) {
http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 0f372cb..987d0bc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -83,7 +83,7 @@ public class StreamTaskTestHarness<OUT> {
private ConcurrentLinkedQueue<Object> outputList;
- protected Thread taskThread;
+ protected TaskThread taskThread;
// These don't get initialized, the one-input/two-input specific test harnesses
// must initialize these if they want to simulate input. We have them here so that all the
@@ -161,32 +161,19 @@ public class StreamTaskTestHarness<OUT> {
task.registerInputOutput();
- taskThread = new Thread(new Runnable() {
- @Override
- public void run() {
-
-
-
- try {
- task.invoke();
- shutdownIOManager();
- shutdownMemoryManager();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- }
- });
-
+ taskThread = new TaskThread(task);
taskThread.start();
}
- public void waitForTaskCompletion() throws InterruptedException {
+ public void waitForTaskCompletion() throws Exception {
if (taskThread == null) {
throw new IllegalStateException("Task thread was not started.");
}
taskThread.join();
+ if (taskThread.getError() != null) {
+ throw new Exception("error in task", taskThread.getError());
+ }
}
/**
@@ -300,5 +287,36 @@ public class StreamTaskTestHarness<OUT> {
inputGates[i].endInput();
}
}
+
+ // ------------------------------------------------------------------------
+
+ private class TaskThread extends Thread {
+
+ private final AbstractInvokable task;
+
+ private volatile Throwable error;
+
+
+ TaskThread(AbstractInvokable task) {
+ super("Task Thread");
+ this.task = task;
+ }
+
+ @Override
+ public void run() {
+ try {
+ task.invoke();
+ shutdownIOManager();
+ shutdownMemoryManager();
+ }
+ catch (Throwable t) {
+ this.error = t;
+ }
+ }
+
+ public Throwable getError() {
+ return error;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 45ae88f..f9b0b09 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -57,9 +57,11 @@ public class MockContext<IN, OUT> {
public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> operator, List<IN> inputs) {
MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
- StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask",
- new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
- new ExecutionConfig(), null, null, new HashMap<String, Accumulator<?, ?>>());
+ StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext(
+ new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
+ new ExecutionConfig(),
+ null, null,
+ new HashMap<String, Accumulator<?, ?>>());
operator.setup(mockContext.output, runtimeContext);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 6652fde..f404d01 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -60,9 +60,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
executionConfig = new ExecutionConfig();
StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext(
- "MockTwoInputTask",
- new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
- getClass().getClassLoader(),
+ new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
executionConfig,
null,
new LocalStateHandle.LocalStateHandleProvider<Serializable>(),
http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index 2d7f6b5..711dd41 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -42,8 +42,13 @@ public class SourceFunctionUtil<T> {
public static <T> List<T> runSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
List<T> outputs = new ArrayList<T>();
if (sourceFunction instanceof RichFunction) {
- RuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
- new ExecutionConfig(), null, new LocalStateHandle.LocalStateHandleProvider<Serializable>(), new HashMap<String, Accumulator<?, ?>>());
+ RuntimeContext runtimeContext = new StreamingRuntimeContext(
+ new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
+ new ExecutionConfig(),
+ null,
+ new LocalStateHandle.LocalStateHandleProvider<Serializable>(),
+ new HashMap<String, Accumulator<?, ?>>());
+
((RichFunction) sourceFunction).setRuntimeContext(runtimeContext);
((RichFunction) sourceFunction).open(new Configuration());
http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index a9ebd0b..09db1f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -66,7 +66,7 @@ public class StreamingMultipleProgramsTestBase extends TestBaseUtils {
// ------------------------------------------------------------------------
public StreamingMultipleProgramsTestBase() {
- TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, 4);
+ TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, DEFAULT_PARALLELISM);
clusterEnv.setAsContext();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index 1e8b5c6..21e2e1e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.util;
import org.apache.flink.api.common.ExecutionConfig;
@@ -33,7 +34,6 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import java.io.Serializable;
import java.util.HashMap;
-import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
@@ -60,9 +60,7 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
executionConfig = new ExecutionConfig();
StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext(
- "MockTwoInputTask",
- new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
- getClass().getClassLoader(),
+ new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
new ExecutionConfig(),
null,
new LocalStateHandle.LocalStateHandleProvider<Serializable>(),