You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/02 08:42:36 UTC
[2/6] flink git commit: [FLINK-5169] [network] Adjust tests to new
consumer logic
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 3972917..126a96e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -20,24 +20,17 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.taskmanager.TaskActions;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import java.io.IOException;
import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
+import java.util.ArrayDeque;
import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkElementIndex;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -73,28 +66,29 @@ public class TestSingleInputGate {
// Notify about late registrations (added for DataSinkTaskTest#testUnionDataSinkTask).
// After merging registerInputOutput and invoke, we have to make sure that the test
- // notifcations happen at the expected time. In real programs, this is guaranteed by
+ // notifications happen at the expected time. In real programs, this is guaranteed by
// the instantiation and request partition life cycle.
try {
Field f = realGate.getClass().getDeclaredField("inputChannelsWithData");
f.setAccessible(true);
- final BlockingQueue<InputChannel> notifications = (BlockingQueue<InputChannel>) f.get(realGate);
+ final ArrayDeque<InputChannel> notifications = (ArrayDeque<InputChannel>) f.get(realGate);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
invocation.callRealMethod();
- if (!notifications.isEmpty()) {
- EventListener<InputGate> listener = (EventListener<InputGate>) invocation.getArguments()[0];
- listener.onEvent(inputGate);
+ synchronized (notifications) {
+ if (!notifications.isEmpty()) {
+ InputGateListener listener = (InputGateListener) invocation.getArguments()[0];
+ listener.notifyInputGateNonEmpty(inputGate);
+ }
}
return null;
}
- }).when(inputGate).registerListener(any(EventListener.class));
- }
- catch (Exception e) {
+ }).when(inputGate).registerListener(any(InputGateListener.class));
+ } catch (Exception e) {
throw new RuntimeException(e);
}
@@ -108,81 +102,8 @@ public class TestSingleInputGate {
}
}
- public TestSingleInputGate read(Buffer buffer, int channelIndex) throws IOException, InterruptedException {
- checkElementIndex(channelIndex, inputGate.getNumberOfInputChannels());
-
- inputChannels[channelIndex].read(buffer);
-
- return this;
- }
-
- public TestSingleInputGate readBuffer() throws IOException, InterruptedException {
- return readBuffer(0);
- }
-
- public TestSingleInputGate readBuffer(int channelIndex) throws IOException, InterruptedException {
- inputChannels[channelIndex].readBuffer();
-
- return this;
- }
-
- public TestSingleInputGate readEvent() throws IOException, InterruptedException {
- return readEvent(0);
- }
-
- public TestSingleInputGate readEvent(int channelIndex) throws IOException, InterruptedException {
- inputChannels[channelIndex].readEvent();
-
- return this;
- }
-
- public TestSingleInputGate readEndOfSuperstepEvent() throws IOException, InterruptedException {
- for (TestInputChannel inputChannel : inputChannels) {
- inputChannel.readEndOfSuperstepEvent();
- }
-
- return this;
- }
-
- public TestSingleInputGate readEndOfSuperstepEvent(int channelIndex) throws IOException, InterruptedException {
- inputChannels[channelIndex].readEndOfSuperstepEvent();
-
- return this;
- }
-
- public TestSingleInputGate readEndOfPartitionEvent() throws IOException, InterruptedException {
- for (TestInputChannel inputChannel : inputChannels) {
- inputChannel.readEndOfPartitionEvent();
- }
-
- return this;
- }
-
- public TestSingleInputGate readEndOfPartitionEvent(int channelIndex) throws IOException, InterruptedException {
- inputChannels[channelIndex].readEndOfPartitionEvent();
-
- return this;
- }
-
public SingleInputGate getInputGate() {
return inputGate;
}
- // ------------------------------------------------------------------------
-
- public List<Integer> readAllChannels() throws IOException, InterruptedException {
- final List<Integer> readOrder = new ArrayList<Integer>(inputChannels.length);
-
- for (int i = 0; i < inputChannels.length; i++) {
- readOrder.add(i);
- }
-
- Collections.shuffle(readOrder);
-
- for (int channelIndex : readOrder) {
- inputChannels[channelIndex].readBuffer();
- }
-
- return readOrder;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index cba3199..84ec202 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.junit.Test;
@@ -73,22 +72,32 @@ public class UnionInputGateTest {
inputChannels[1][1].readEndOfPartitionEvent(); // 0 => 3
inputChannels[1][0].readEndOfPartitionEvent(); // 0 => 3
- SingleInputGateTest.verifyBufferOrEvent(union, true, 0);
- SingleInputGateTest.verifyBufferOrEvent(union, false, 0);
- SingleInputGateTest.verifyBufferOrEvent(union, true, 5);
- SingleInputGateTest.verifyBufferOrEvent(union, false, 5);
- SingleInputGateTest.verifyBufferOrEvent(union, true, 3);
- SingleInputGateTest.verifyBufferOrEvent(union, true, 4);
- SingleInputGateTest.verifyBufferOrEvent(union, true, 1);
- SingleInputGateTest.verifyBufferOrEvent(union, true, 6);
- SingleInputGateTest.verifyBufferOrEvent(union, false, 1);
- SingleInputGateTest.verifyBufferOrEvent(union, false, 6);
- SingleInputGateTest.verifyBufferOrEvent(union, true, 2);
- SingleInputGateTest.verifyBufferOrEvent(union, false, 2);
- SingleInputGateTest.verifyBufferOrEvent(union, true, 7);
- SingleInputGateTest.verifyBufferOrEvent(union, false, 7);
- SingleInputGateTest.verifyBufferOrEvent(union, false, 4);
- SingleInputGateTest.verifyBufferOrEvent(union, false, 3);
+ ig1.notifyChannelNonEmpty(inputChannels[0][0].getInputChannel());
+ ig1.notifyChannelNonEmpty(inputChannels[0][1].getInputChannel());
+ ig1.notifyChannelNonEmpty(inputChannels[0][2].getInputChannel());
+
+ ig2.notifyChannelNonEmpty(inputChannels[1][0].getInputChannel());
+ ig2.notifyChannelNonEmpty(inputChannels[1][1].getInputChannel());
+ ig2.notifyChannelNonEmpty(inputChannels[1][2].getInputChannel());
+ ig2.notifyChannelNonEmpty(inputChannels[1][3].getInputChannel());
+ ig2.notifyChannelNonEmpty(inputChannels[1][4].getInputChannel());
+
+ SingleInputGateTest.verifyBufferOrEvent(union, true, 0); // gate 1, channel 0
+ SingleInputGateTest.verifyBufferOrEvent(union, true, 3); // gate 2, channel 0
+ SingleInputGateTest.verifyBufferOrEvent(union, true, 1); // gate 1, channel 1
+ SingleInputGateTest.verifyBufferOrEvent(union, true, 4); // gate 2, channel 1
+ SingleInputGateTest.verifyBufferOrEvent(union, true, 2); // gate 1, channel 2
+ SingleInputGateTest.verifyBufferOrEvent(union, true, 5); // gate 2, channel 1
+ SingleInputGateTest.verifyBufferOrEvent(union, false, 0); // gate 1, channel 0
+ SingleInputGateTest.verifyBufferOrEvent(union, true, 6); // gate 2, channel 1
+ SingleInputGateTest.verifyBufferOrEvent(union, false, 1); // gate 1, channel 1
+ SingleInputGateTest.verifyBufferOrEvent(union, true, 7); // gate 2, channel 1
+ SingleInputGateTest.verifyBufferOrEvent(union, false, 2); // gate 1, channel 2
+ SingleInputGateTest.verifyBufferOrEvent(union, false, 3); // gate 2, channel 0
+ SingleInputGateTest.verifyBufferOrEvent(union, false, 4); // gate 2, channel 1
+ SingleInputGateTest.verifyBufferOrEvent(union, false, 5); // gate 2, channel 2
+ SingleInputGateTest.verifyBufferOrEvent(union, false, 6); // gate 2, channel 3
+ SingleInputGateTest.verifyBufferOrEvent(union, false, 7); // gate 2, channel 4
// Return null when the input gate has received all end-of-partition events
assertTrue(union.isFinished());
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
index 1b51805..676a304 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
@@ -22,26 +22,32 @@ import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicLong;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * A test subpartition view consumer.
+ * A test subpartition viewQueue consumer.
*
* <p> The behaviour of the consumer is customizable by specifying a callback.
*
* @see TestConsumerCallback
*/
-public class TestSubpartitionConsumer implements Callable<Boolean> {
+public class TestSubpartitionConsumer implements Callable<Boolean>, BufferAvailabilityListener {
private static final int MAX_SLEEP_TIME_MS = 20;
- /** The subpartition view to consume. */
- private final ResultSubpartitionView subpartitionView;
+ /** The subpartition viewQueue to consume. */
+ private volatile ResultSubpartitionView subpartitionView;
+
+ private BlockingQueue<ResultSubpartitionView> viewQueue = new ArrayBlockingQueue<>(1);
/**
* Flag indicating whether the consumer is slow. If true, the consumer will sleep a random
@@ -49,33 +55,43 @@ public class TestSubpartitionConsumer implements Callable<Boolean> {
*/
private final boolean isSlowConsumer;
- /** The callback to handle a read buffer. */
+ /** The callback to handle a notifyNonEmpty buffer. */
private final TestConsumerCallback callback;
/** Random source for sleeps. */
private final Random random;
+ private final AtomicLong numBuffersAvailable = new AtomicLong();
+
public TestSubpartitionConsumer(
- ResultSubpartitionView subpartitionView,
- boolean isSlowConsumer,
- TestConsumerCallback callback) {
+ boolean isSlowConsumer,
+ TestConsumerCallback callback) {
- this.subpartitionView = checkNotNull(subpartitionView);
this.isSlowConsumer = isSlowConsumer;
this.random = isSlowConsumer ? new Random() : null;
this.callback = checkNotNull(callback);
}
+ public void setSubpartitionView(ResultSubpartitionView subpartitionView) {
+ this.subpartitionView = checkNotNull(subpartitionView);
+ }
+
@Override
public Boolean call() throws Exception {
- final TestNotificationListener listener = new TestNotificationListener();
-
try {
while (true) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
+ if (numBuffersAvailable.get() == 0) {
+ synchronized (numBuffersAvailable) {
+ while (numBuffersAvailable.get() == 0) {
+ numBuffersAvailable.wait();
+ }
+ }
+ }
+
final Buffer buffer = subpartitionView.getNextBuffer();
if (isSlowConsumer) {
@@ -83,12 +99,13 @@ public class TestSubpartitionConsumer implements Callable<Boolean> {
}
if (buffer != null) {
+ numBuffersAvailable.decrementAndGet();
+
if (buffer.isBuffer()) {
callback.onBuffer(buffer);
- }
- else {
+ } else {
final AbstractEvent event = EventSerializer.fromBuffer(buffer,
- getClass().getClassLoader());
+ getClass().getClassLoader());
callback.onEvent(event);
@@ -100,22 +117,22 @@ public class TestSubpartitionConsumer implements Callable<Boolean> {
return true;
}
}
- }
- else {
- int current = listener.getNumberOfNotifications();
-
- if (subpartitionView.registerListener(listener)) {
- listener.waitForNotification(current);
- }
- else if (subpartitionView.isReleased()) {
- return true;
- }
+ } else if (subpartitionView.isReleased()) {
+ return true;
}
}
- }
- finally {
+ } finally {
subpartitionView.releaseAllResources();
}
}
+ @Override
+ public void notifyBuffersAvailable(long numBuffers) {
+ if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) == 0) {
+ synchronized (numBuffersAvailable) {
+ numBuffersAvailable.notifyAll();
+ }
+ ;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index 226dc91..f9aea89 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -129,7 +129,6 @@ public class DataSinkTaskTest extends TaskTestBase {
@Test
public void testUnionDataSinkTask() {
-
int keyCnt = 10;
int valCnt = 20;
@@ -147,9 +146,10 @@ public class DataSinkTaskTest extends TaskTestBase {
try {
// For the union reader to work, we need to start notifications *after* the union reader
- // has been initialized.
+ // has been initialized. This is accomplished via a mockito hack in TestSingleInputGate,
+ // which checks forwards existing notifications on registerListener calls.
for (IteratorWrappingTestSingleInputGate<?> reader : readers) {
- reader.read();
+ reader.notifyNonEmpty();
}
testTask.invoke();
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 0c9fd79..fb8ed68 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -27,7 +27,9 @@ import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.FlatMapDriver;
@@ -37,14 +39,20 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.testutils.TaskTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
-
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
-
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Task.class, ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class ChainTaskTest extends TaskTestBase {
private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 646c038..8f4bc77 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -67,9 +67,9 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MockEnvironment implements Environment {
-
+
private final TaskInfo taskInfo;
-
+
private final ExecutionConfig executionConfig;
private final MemoryManager memManager;
@@ -184,7 +184,7 @@ public class MockEnvironment implements Environment {
}
if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER
- || result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
+ || result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
break;
}
}
@@ -234,9 +234,9 @@ public class MockEnvironment implements Environment {
@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
return new TaskManagerRuntimeInfo(
- "localhost",
- new UnmodifiableConfiguration(new Configuration()),
- System.getProperty("java.io.tmpdir"));
+ "localhost",
+ new UnmodifiableConfiguration(new Configuration()),
+ System.getProperty("java.io.tmpdir"));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index eaf44db..53d75b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -67,7 +67,7 @@ public abstract class TaskTestBase extends TestLogger {
conf.setInputSerializer(RecordSerializerFactory.get(), groupId);
if (read) {
- reader.read();
+ reader.notifyNonEmpty();
}
return reader;
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index a093233..876e908 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.taskmanager;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.instance.ActorGateway;
@@ -43,6 +42,7 @@ import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
+import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@@ -129,18 +129,17 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
}
// Verify that async producer is in blocking request
- assertTrue("Producer thread is not blocked.", producerBlocked);
+ assertTrue("Producer thread is not blocked: " + Arrays.toString(ASYNC_CONSUMER_THREAD.getStackTrace()), producerBlocked);
- boolean consumerBlocked = false;
+ boolean consumerWaiting = false;
for (int i = 0; i < 50; i++) {
Thread thread = ASYNC_CONSUMER_THREAD;
if (thread != null && thread.isAlive()) {
- StackTraceElement[] stackTrace = thread.getStackTrace();
- consumerBlocked = isInBlockingQueuePoll(stackTrace);
+ consumerWaiting = thread.getState() == Thread.State.WAITING;
}
- if (consumerBlocked) {
+ if (consumerWaiting) {
break;
} else {
// Retry
@@ -149,7 +148,7 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
}
// Verify that async consumer is in blocking request
- assertTrue("Consumer thread is not blocked.", consumerBlocked);
+ assertTrue("Consumer thread is not blocked.", consumerWaiting);
msg = new CancelJob(jobGraph.getJobID());
Future<?> cancelFuture = jobManager.ask(msg, deadline.timeLeft());
@@ -186,27 +185,6 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
}
/**
- * Returns whether the stack trace represents a Thread in a blocking queue
- * poll call.
- *
- * @param stackTrace Stack trace of the Thread to check
- *
- * @return Flag indicating whether the Thread is in a blocking queue poll
- * call.
- */
- private boolean isInBlockingQueuePoll(StackTraceElement[] stackTrace) {
- for (StackTraceElement elem : stackTrace) {
- if (elem.getMethodName().equals("poll") &&
- elem.getClassName().equals("java.util.concurrent.LinkedBlockingQueue")) {
-
- return true;
- }
- }
-
- return false;
- }
-
- /**
* Invokable emitting records in a separate Thread (not the main Task
* thread).
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 322a0f0..6dcb56b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -28,10 +28,11 @@ import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -62,9 +63,9 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
@SuppressWarnings("unchecked")
public StreamTestSingleInputGate(
- int numInputChannels,
- int bufferSize,
- TypeSerializer<T> serializer) throws IOException, InterruptedException {
+ int numInputChannels,
+ int bufferSize,
+ TypeSerializer<T> serializer) throws IOException, InterruptedException {
super(numInputChannels, false);
this.bufferSize = bufferSize;
@@ -86,39 +87,36 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
final int channelIndex = i;
final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
- new SerializationDelegate<StreamElement>(new StreamElementSerializer<T>(serializer));
+ new SerializationDelegate<StreamElement>(new StreamElementSerializer<T>(serializer));
inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
- final Answer<Buffer> answer = new Answer<Buffer>() {
+ final Answer<BufferAndAvailability> answer = new Answer<BufferAndAvailability>() {
@Override
- public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
+ public BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable {
InputValue<Object> input = inputQueues[channelIndex].poll();
if (input != null && input.isStreamEnd()) {
when(inputChannels[channelIndex].getInputChannel().isReleased()).thenReturn(
- true);
- return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
- }
- else if (input != null && input.isStreamRecord()) {
+ true);
+ return new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false);
+ } else if (input != null && input.isStreamRecord()) {
Object inputElement = input.getStreamRecord();
final Buffer buffer = new Buffer(
- MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
- mock(BufferRecycler.class));
-
+ MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
+ mock(BufferRecycler.class));
+
recordSerializer.setNextBuffer(buffer);
delegate.setInstance(inputElement);
recordSerializer.addRecord(delegate);
// Call getCurrentBuffer to ensure size is set
- return recordSerializer.getCurrentBuffer();
- }
- else if (input != null && input.isEvent()) {
+ return new BufferAndAvailability(recordSerializer.getCurrentBuffer(), false);
+ } else if (input != null && input.isEvent()) {
AbstractEvent event = input.getEvent();
- return EventSerializer.toBuffer(event);
- }
- else {
+ return new BufferAndAvailability(EventSerializer.toBuffer(event), false);
+ } else {
synchronized (inputQueues[channelIndex]) {
inputQueues[channelIndex].wait();
return answer(invocationOnMock);
@@ -130,7 +128,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
when(inputChannels[channelIndex].getInputChannel().getNextBuffer()).thenAnswer(answer);
inputGate.setInputChannel(new IntermediateResultPartitionID(),
- inputChannels[channelIndex].getInputChannel());
+ inputChannels[channelIndex].getInputChannel());
}
}
@@ -139,7 +137,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
inputQueues[channel].add(InputValue.element(element));
inputQueues[channel].notifyAll();
}
- inputGate.onAvailableBuffer(inputChannels[channel].getInputChannel());
+ inputGate.notifyChannelNonEmpty(inputChannels[channel].getInputChannel());
}
public void sendEvent(AbstractEvent event, int channel) {
@@ -147,7 +145,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
inputQueues[channel].add(InputValue.event(event));
inputQueues[channel].notifyAll();
}
- inputGate.onAvailableBuffer(inputChannels[channel].getInputChannel());
+ inputGate.notifyChannelNonEmpty(inputChannels[channel].getInputChannel());
}
public void endInput() {
@@ -156,7 +154,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
inputQueues[i].add(InputValue.streamEnd());
inputQueues[i].notifyAll();
}
- inputGate.onAvailableBuffer(inputChannels[i].getInputChannel());
+ inputGate.notifyChannelNonEmpty(inputChannels[i].getInputChannel());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index a8a989b..0cf866a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -17,25 +17,24 @@
package org.apache.flink.streaming.runtime.io;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Random;
-
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
+import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
import org.junit.Test;
+import java.io.IOException;
+import java.util.Random;
+
+import static org.junit.Assert.fail;
+
/**
* The test generates two random streams (input channels) which independently
* and randomly generate checkpoint barriers. The two streams are very
@@ -165,7 +164,7 @@ public class BarrierBufferMassiveRandomTest {
public void sendTaskEvent(TaskEvent event) {}
@Override
- public void registerListener(EventListener<InputGate> listener) {}
+ public void registerListener(InputGateListener listener) {}
@Override
public int getPageSize() {
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index cb8a058..3e2a75a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
import java.util.ArrayDeque;
import java.util.List;
@@ -31,16 +31,15 @@ import java.util.Queue;
public class MockInputGate implements InputGate {
private final int pageSize;
-
+
private final int numChannels;
-
+
private final Queue<BufferOrEvent> boes;
private final boolean[] closed;
-
+
private int closedChannels;
-
public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> boes) {
this.pageSize = pageSize;
this.numChannels = numChannels;
@@ -52,7 +51,7 @@ public class MockInputGate implements InputGate {
public int getPageSize() {
return pageSize;
}
-
+
@Override
public int getNumberOfInputChannels() {
return numChannels;
@@ -69,11 +68,11 @@ public class MockInputGate implements InputGate {
if (next == null) {
return null;
}
-
+
int channelIdx = next.getChannelIndex();
if (closed[channelIdx]) {
throw new RuntimeException("Inconsistent: Channel " + channelIdx
- + " has data even though it is already closed.");
+ + " has data even though it is already closed.");
}
if (next.isEvent() && next.getEvent() instanceof EndOfPartitionEvent) {
closed[channelIdx] = true;
@@ -83,12 +82,15 @@ public class MockInputGate implements InputGate {
}
@Override
- public void requestPartitions() {}
+ public void requestPartitions() {
+ }
@Override
- public void sendTaskEvent(TaskEvent event) {}
+ public void sendTaskEvent(TaskEvent event) {
+ }
@Override
- public void registerListener(EventListener<InputGate> listener) {}
-
-}
\ No newline at end of file
+ public void registerListener(InputGateListener listener) {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index 3126d71..d86c809 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -45,7 +45,7 @@ import java.io.IOException;
*
* <p>
* When Elements or Events are offered to the Task they are put into a queue. The input gates
- * of the Task read from this queue. Use {@link #waitForInputProcessing()} to wait until all
+ * of the Task notifyNonEmpty from this queue. Use {@link #waitForInputProcessing()} to wait until all
* queues are empty. This must be used after entering some elements before checking the
* desired output.
*/
@@ -58,11 +58,13 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes
* Creates a test harness with the specified number of input gates and specified number
* of channels per input gate.
*/
- public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task,
- int numInputGates,
- int numInputChannelsPerGate,
- TypeInformation<IN> inputType,
- TypeInformation<OUT> outputType) {
+ public OneInputStreamTaskTestHarness(
+ OneInputStreamTask<IN, OUT> task,
+ int numInputGates,
+ int numInputChannelsPerGate,
+ TypeInformation<IN> inputType,
+ TypeInformation<OUT> outputType) {
+
super(task, outputType);
this.inputType = inputType;
@@ -75,9 +77,10 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes
/**
* Creates a test harness with one input gate that has one input channel.
*/
- public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task,
- TypeInformation<IN> inputType,
- TypeInformation<OUT> outputType) {
+ public OneInputStreamTaskTestHarness(
+ OneInputStreamTask<IN, OUT> task,
+ TypeInformation<IN> inputType,
+ TypeInformation<OUT> outputType) {
this(task, 1, 1, inputType, outputType);
}
@@ -87,9 +90,9 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes
for (int i = 0; i < numInputGates; i++) {
inputGates[i] = new StreamTestSingleInputGate<IN>(
- numInputChannelsPerGate,
- bufferSize,
- inputSerializer);
+ numInputChannelsPerGate,
+ bufferSize,
+ inputSerializer);
this.mockEnv.addInputGate(inputGates[i].getInputGate());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 52daf6f..17a0857 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -102,13 +102,13 @@ public class StreamMockEnvironment implements Environment {
private volatile boolean wasFailedExternally = false;
public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, ExecutionConfig executionConfig,
- long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
+ long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
this.taskInfo = new TaskInfo(
- "", /* task name */
- 1, /* num key groups / max parallelism */
- 0, /* index of this subtask */
- 1, /* num subtasks */
- 0 /* attempt number */);
+ "", /* task name */
+ 1, /* num key groups / max parallelism */
+ 0, /* index of this subtask */
+ 1, /* num subtasks */
+ 0 /* attempt number */);
this.jobConfiguration = jobConfig;
this.taskConfiguration = taskConfig;
this.inputs = new LinkedList<InputGate>();
@@ -146,8 +146,8 @@ public class StreamMockEnvironment implements Environment {
@Override
public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
return new Buffer(
- MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
- mock(BufferRecycler.class));
+ MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
+ mock(BufferRecycler.class));
}
});
@@ -175,7 +175,7 @@ public class StreamMockEnvironment implements Environment {
}
if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER
- || result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
+ || result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
break;
}
}
@@ -318,7 +318,7 @@ public class StreamMockEnvironment implements Environment {
@Override
public void acknowledgeCheckpoint(
- CheckpointMetaData checkpointMetaData, SubtaskState subtaskState) {
+ CheckpointMetaData checkpointMetaData, SubtaskState subtaskState) {
}
@Override
@@ -343,4 +343,3 @@ public class StreamMockEnvironment implements Environment {
return new UnregisteredTaskMetricsGroup();
}
}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index 95828f8..ebe5285 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -28,13 +28,11 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
-
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq;
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index b71e38d..b20b3a3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -92,7 +92,7 @@ public class StreamTaskTestHarness<OUT> {
// input related methods only need to be implemented once, in generic form
protected int numInputGates;
protected int numInputChannelsPerGate;
-
+
@SuppressWarnings("rawtypes")
protected StreamTestSingleInputGate[] inputGates;
@@ -329,7 +329,7 @@ public class StreamTaskTestHarness<OUT> {
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {}
-
+
if (allEmpty) {
break;
}
@@ -337,7 +337,7 @@ public class StreamTaskTestHarness<OUT> {
// then wait for the Task Thread to be in a blocked state
// Check whether the state is blocked, this should be the case if it cannot
- // read more input, i.e. all currently available input has been processed.
+ // notifyNonEmpty more input, i.e. all currently available input has been processed.
while (true) {
Thread.State state = taskThread.getState();
if (state == Thread.State.BLOCKED || state == Thread.State.TERMINATED ||
@@ -360,13 +360,13 @@ public class StreamTaskTestHarness<OUT> {
inputGates[i].endInput();
}
}
-
+
// ------------------------------------------------------------------------
-
+
private class TaskThread extends Thread {
-
+
private final AbstractInvokable task;
-
+
private volatile Throwable error;
TaskThread(AbstractInvokable task) {
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 1076eeb..3cd9c9a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -199,6 +199,8 @@ public class TwoInputStreamTaskTest {
testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 1);
expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
+ testHarness.waitForInputProcessing();
+
// These elements should be forwarded, since we did not yet receive a checkpoint barrier
// on that input, only add to same input, otherwise we would not know the ordering
// of the output since the Task might read the inputs in any order
@@ -221,8 +223,8 @@ public class TwoInputStreamTaskTest {
// we should not yet see the barrier, only the two elements from non-blocked input
TestHarnessUtil.assertOutputEquals("Output was not correct.",
- testHarness.getOutput(),
- expectedOutput);
+ expectedOutput,
+ testHarness.getOutput());
testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
index edb1642..f5b7566 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -124,7 +124,6 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest
bufferSize,
inputSerializer1);
-
StreamEdge streamEdge = new StreamEdge(sourceVertexDummy,
targetVertexDummy,
1,