You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/02/22 17:20:48 UTC
[1/2] flink git commit: [FLINK-5701] [kafka] FlinkKafkaProducer
should check asyncException on checkpoints
Repository: flink
Updated Branches:
refs/heads/master d6aed38b3 -> b0f0f3722
[FLINK-5701] [kafka] FlinkKafkaProducer should check asyncException on checkpoints
This closes #3278.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/646490c4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/646490c4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/646490c4
Branch: refs/heads/master
Commit: 646490c4e93eca315e4bf41704f149390f8639cc
Parents: d6aed38
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Feb 7 00:37:13 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Feb 23 01:16:57 2017 +0800
----------------------------------------------------------------------
.../kafka/FlinkKafkaProducerBase.java | 15 +-
.../kafka/FlinkKafkaProducerBaseTest.java | 391 ++++++++++++-------
2 files changed, 272 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/646490c4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 679b731..6a7b17f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.java.ClosureCleaner;
@@ -348,6 +349,9 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
@Override
public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
+ // check for asynchronous errors and fail the checkpoint if necessary
+ checkErroneous();
+
if (flushOnCheckpoint) {
// flushing is activated: We need to wait until pendingRecords is 0
flush();
@@ -355,7 +359,9 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
if (pendingRecords != 0) {
throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecords);
}
- // pending records count is 0. We can now confirm the checkpoint
+
+ // if the flushed requests has errors, we should propagate it also and fail the checkpoint
+ checkErroneous();
}
}
}
@@ -383,4 +389,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
return props;
}
+
+ @VisibleForTesting
+ protected long numPendingRecords() {
+ synchronized (pendingRecordsLock) {
+ return pendingRecords;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/646490c4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
index 2e06160..1f16d8e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
@@ -18,38 +18,36 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.Assert;
import org.junit.Test;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -88,201 +86,328 @@ public class FlinkKafkaProducerBaseTest {
@Test
public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
+
RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
-
- DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
+
+ // out-of-order list of 4 partitions
+ List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
+ mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
+ mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
+ mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
+ mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
+
+ final DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
FakeStandardProducerConfig.get(), mockPartitioner);
producer.setRuntimeContext(mockRuntimeContext);
+ final KafkaProducer mockProducer = producer.getMockKafkaProducer();
+ when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
+ when(mockProducer.metrics()).thenReturn(null);
+
producer.open(new Configuration());
- // the internal mock KafkaProducer will return an out-of-order list of 4 partitions,
- // which should be sorted before provided to the custom partitioner's open() method
+ // the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method
int[] correctPartitionList = {0, 1, 2, 3};
verify(mockPartitioner).open(0, 1, correctPartitionList);
}
/**
- * Test ensuring that the producer is not dropping buffered records.;
- * we set a timeout because the test will not finish if the logic is broken
+ * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown
*/
- @Test(timeout=5000)
- public void testAtLeastOnceProducer() throws Throwable {
- runAtLeastOnceTest(true);
+ @Test
+ public void testAsyncErrorRethrownOnInvoke() throws Throwable {
+ final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
+ FakeStandardProducerConfig.get(), null);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+
+ // let the message request return an async exception
+ producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
+
+ try {
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ } catch (Exception e) {
+ // the next invoke should rethrow the async exception
+ Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
+
+ // test succeeded
+ return;
+ }
+
+ Assert.fail();
}
/**
- * Ensures that the at least once producing test fails if the flushing is disabled
+ * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown
*/
- @Test(expected = AssertionError.class, timeout=5000)
- public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable {
- runAtLeastOnceTest(false);
- }
-
- private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable {
- final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
+ @Test
+ public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(), null, snapshottingFinished);
- producer.setFlushOnCheckpoint(flushOnCheckpoint);
+ FakeStandardProducerConfig.get(), null);
OneInputStreamOperatorTestHarness<String, Object> testHarness =
- new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
- for (int i = 0; i < 100; i++) {
- testHarness.processElement(new StreamRecord<>("msg-" + i));
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+
+ // let the message request return an async exception
+ producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
+
+ try {
+ testHarness.snapshot(123L, 123L);
+ } catch (Exception e) {
+ // the next invoke should rethrow the async exception
+ Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
+
+ // test succeeded
+ return;
}
- // start a thread confirming all pending records
- final Tuple1<Throwable> runnableError = new Tuple1<>(null);
- final Thread threadA = Thread.currentThread();
+ Assert.fail();
+ }
+
+ /**
+ * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
+ * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
+ *
+ * Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
+ * The test for that is covered in testAtLeastOnceProducer.
+ */
+ @SuppressWarnings("unchecked")
+ @Test(timeout=5000)
+ public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
+ final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
+ FakeStandardProducerConfig.get(), null);
+ producer.setFlushOnCheckpoint(true);
+
+ final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
+
+ final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ testHarness.processElement(new StreamRecord<>("msg-3"));
+
+ verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
+
+ // only let the first callback succeed for now
+ producer.getPendingCallbacks().get(0).onCompletion(null, null);
- Runnable confirmer = new Runnable() {
+ CheckedThread snapshotThread = new CheckedThread() {
@Override
- public void run() {
- try {
- MockProducer mp = producer.getProducerInstance();
- List<Callback> pending = mp.getPending();
-
- // we need to find out if the snapshot() method blocks forever
- // this is not possible. If snapshot() is running, it will
- // start removing elements from the pending list.
- synchronized (threadA) {
- threadA.wait(500L);
- }
- // we now check that no records have been confirmed yet
- Assert.assertEquals(100, pending.size());
- Assert.assertFalse("Snapshot method returned before all records were confirmed",
- snapshottingFinished.get());
-
- // now confirm all checkpoints
- for (Callback c: pending) {
- c.onCompletion(null, null);
- }
- pending.clear();
- } catch(Throwable t) {
- runnableError.f0 = t;
- }
+ public void go() throws Exception {
+ // this should block at first, since there are still two pending records that needs to be flushed
+ testHarness.snapshot(123L, 123L);
}
};
- Thread threadB = new Thread(confirmer);
- threadB.start();
+ snapshotThread.start();
- // this should block:
- testHarness.snapshot(0, 0);
+ // let the 2nd message fail with an async exception
+ producer.getPendingCallbacks().get(1).onCompletion(null, new Exception("artificial async failure for 2nd message"));
+ producer.getPendingCallbacks().get(2).onCompletion(null, null);
- synchronized (threadA) {
- threadA.notifyAll(); // just in case, to let the test fail faster
- }
- Assert.assertEquals(0, producer.getProducerInstance().getPending().size());
- Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
- while (deadline.hasTimeLeft() && threadB.isAlive()) {
- threadB.join(500);
- }
- Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive());
- if (runnableError.f0 != null) {
- throw runnableError.f0;
+ try {
+ snapshotThread.sync();
+ } catch (Exception e) {
+ // the snapshot should have failed with the async exception
+ Assert.assertTrue(e.getCause().getMessage().contains("artificial async failure for 2nd message"));
+
+ // test succeeded
+ return;
}
+ Assert.fail();
+ }
+
+ /**
+ * Test ensuring that the producer is not dropping buffered records;
+ * we set a timeout because the test will not finish if the logic is broken
+ */
+ @SuppressWarnings("unchecked")
+ @Test(timeout=10000)
+ public void testAtLeastOnceProducer() throws Throwable {
+ final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
+ FakeStandardProducerConfig.get(), null);
+ producer.setFlushOnCheckpoint(true);
+
+ final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
+
+ final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ testHarness.processElement(new StreamRecord<>("msg-3"));
+
+ verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
+ Assert.assertEquals(3, producer.getPendingSize());
+
+ // start a thread to perform checkpointing
+ CheckedThread snapshotThread = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ // this should block until all records are flushed;
+ // if the snapshot implementation returns before pending records are flushed,
+ testHarness.snapshot(123L, 123L);
+ }
+ };
+ snapshotThread.start();
+
+ // before proceeding, make sure that flushing has started and that the snapshot is still blocked;
+ // this would block forever if the snapshot didn't perform a flush
+ producer.waitUntilFlushStarted();
+ Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
+
+ // now, complete the callbacks
+ producer.getPendingCallbacks().get(0).onCompletion(null, null);
+ Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
+ Assert.assertEquals(2, producer.getPendingSize());
+
+ producer.getPendingCallbacks().get(1).onCompletion(null, null);
+ Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
+ Assert.assertEquals(1, producer.getPendingSize());
+
+ producer.getPendingCallbacks().get(2).onCompletion(null, null);
+ Assert.assertEquals(0, producer.getPendingSize());
+
+ // this would fail with an exception if flushing wasn't completed before the snapshot method returned
+ snapshotThread.sync();
+
testHarness.close();
}
+ /**
+ * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
+ * the snapshot method does indeed finishes without waiting for pending records;
+ * we set a timeout because the test will not finish if the logic is broken
+ */
+ @SuppressWarnings("unchecked")
+ @Test(timeout=5000)
+ public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
+ final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
+ FakeStandardProducerConfig.get(), null);
+ producer.setFlushOnCheckpoint(false);
+
+ final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
+
+ final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("msg"));
+
+ // make sure that all callbacks have not been completed
+ verify(mockProducer, times(1)).send(any(ProducerRecord.class), any(Callback.class));
+
+ // should return even if there are pending records
+ testHarness.snapshot(123L, 123L);
+
+ testHarness.close();
+ }
// ------------------------------------------------------------------------
private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
private static final long serialVersionUID = 1L;
+
+ private final static String DUMMY_TOPIC = "dummy-topic";
- private transient MockProducer prod;
- private AtomicBoolean snapshottingFinished;
+ private transient KafkaProducer<?, ?> mockProducer;
+ private transient List<Callback> pendingCallbacks;
+ private transient MultiShotLatch flushLatch;
+ private boolean isFlushed;
@SuppressWarnings("unchecked")
- public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner, AtomicBoolean snapshottingFinished) {
- super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
- this.snapshottingFinished = snapshottingFinished;
- }
+ DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) {
- // constructor variant for test irrelated to snapshotting
- @SuppressWarnings("unchecked")
- public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) {
- super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
- this.snapshottingFinished = new AtomicBoolean(true);
- }
+ super(DUMMY_TOPIC, (KeyedSerializationSchema<T>) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
- @Override
- protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
- this.prod = new MockProducer();
- return this.prod;
- }
+ this.mockProducer = mock(KafkaProducer.class);
+ when(mockProducer.send(any(ProducerRecord.class), any(Callback.class))).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ pendingCallbacks.add(invocationOnMock.getArgumentAt(1, Callback.class));
+ return null;
+ }
+ });
- @Override
- public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
- // call the actual snapshot state
- super.snapshotState(ctx);
- // notify test that snapshotting has been done
- snapshottingFinished.set(true);
+ this.pendingCallbacks = new ArrayList<>();
+ this.flushLatch = new MultiShotLatch();
}
- @Override
- protected void flush() {
- this.prod.flush();
+ long getPendingSize() {
+ if (flushOnCheckpoint) {
+ return numPendingRecords();
+ } else {
+ // when flushing is disabled, the implementation does not
+ // maintain the current number of pending records to reduce
+ // the extra locking overhead required to do so
+ throw new UnsupportedOperationException("getPendingSize not supported when flushing is disabled");
+ }
}
- public MockProducer getProducerInstance() {
- return this.prod;
+ List<Callback> getPendingCallbacks() {
+ return pendingCallbacks;
}
- }
-
- private static class MockProducer<K, V> extends KafkaProducer<K, V> {
- List<Callback> pendingCallbacks = new ArrayList<>();
- public MockProducer() {
- super(FakeStandardProducerConfig.get());
+ KafkaProducer<?, ?> getMockKafkaProducer() {
+ return mockProducer;
}
@Override
- public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
- throw new UnsupportedOperationException("Unexpected");
- }
+ public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
+ isFlushed = false;
- @Override
- public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
- pendingCallbacks.add(callback);
- return null;
+ super.snapshotState(ctx);
+
+ // if the snapshot implementation doesn't wait until all pending records are flushed, we should fail the test
+ if (flushOnCheckpoint && !isFlushed) {
+ throw new RuntimeException("Flushing is enabled; snapshots should be blocked until all pending records are flushed");
+ }
}
- @Override
- public List<PartitionInfo> partitionsFor(String topic) {
- List<PartitionInfo> list = new ArrayList<>();
- // deliberately return an out-of-order partition list
- list.add(new PartitionInfo(topic, 3, null, null, null));
- list.add(new PartitionInfo(topic, 1, null, null, null));
- list.add(new PartitionInfo(topic, 0, null, null, null));
- list.add(new PartitionInfo(topic, 2, null, null, null));
- return list;
+ public void waitUntilFlushStarted() throws Exception {
+ flushLatch.await();
}
+ @SuppressWarnings("unchecked")
@Override
- public Map<MetricName, ? extends Metric> metrics() {
- return null;
+ protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
+ return (KafkaProducer<K, V>) mockProducer;
}
+ @Override
+ protected void flush() {
+ flushLatch.trigger();
- public List<Callback> getPending() {
- return this.pendingCallbacks;
- }
-
- public void flush() {
- while (pendingCallbacks.size() > 0) {
+ // simply wait until the producer's pending records become zero.
+ // This relies on the fact that the producer's Callback implementation
+ // and pending records tracking logic is implemented correctly, otherwise
+ // we will loop forever.
+ while (numPendingRecords() > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException("Unable to flush producer, task was interrupted");
}
}
+
+ isFlushed = true;
}
}
}
[2/2] flink git commit: [FLINK-5716] [streaming] Make
StreamSourceContexts aware of source idleness
Posted by tz...@apache.org.
[FLINK-5716] [streaming] Make StreamSourceContexts aware of source idleness
This closes #3347.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0f0f372
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0f0f372
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0f0f372
Branch: refs/heads/master
Commit: b0f0f3722fac4726fba879736c7ee85993b392db
Parents: 646490c
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri Feb 17 02:43:44 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Feb 23 01:19:58 2017 +0800
----------------------------------------------------------------------
.../connectors/kafka/Kafka010FetcherTest.java | 7 +-
.../connectors/kafka/Kafka09FetcherTest.java | 5 +
.../AbstractFetcherTimestampsTest.java | 6 +
.../connectors/rabbitmq/RMQSourceTest.java | 6 +
.../flink/storm/wrappers/TestContext.java | 7 +-
.../hdfstests/ContinuousFileProcessingTest.java | 4 +
.../source/ContinuousFileReaderOperator.java | 8 +-
.../api/functions/source/SourceFunction.java | 14 +
.../streaming/api/operators/StreamSource.java | 22 +-
.../api/operators/StreamSourceContexts.java | 350 +++++++++++++++----
.../runtime/io/StreamInputProcessor.java | 10 +-
.../runtime/io/StreamTwoInputProcessor.java | 20 +-
.../streamstatus/StreamStatusMaintainer.java | 36 ++
.../runtime/tasks/OneInputStreamTask.java | 6 +-
.../streaming/runtime/tasks/OperatorChain.java | 6 +-
.../runtime/tasks/SourceStreamTask.java | 2 +-
.../streaming/runtime/tasks/StreamTask.java | 5 +
.../runtime/tasks/TwoInputStreamTask.java | 6 +-
.../api/functions/ListSourceContext.java | 7 +-
.../functions/StatefulSequenceSourceTest.java | 6 +
.../source/FileMonitoringFunctionTest.java | 5 +-
.../source/InputFormatSourceFunctionTest.java | 5 +
.../source/SocketTextStreamFunctionTest.java | 11 +-
.../AbstractUdfStreamOperatorLifecycleTest.java | 7 +-
.../StreamSourceContextIdleDetectionTests.java | 325 +++++++++++++++++
.../operators/StreamSourceOperatorTest.java | 25 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 5 +-
.../util/AbstractStreamOperatorTestHarness.java | 19 +
.../streaming/util/CollectingSourceContext.java | 5 +
29 files changed, 814 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 3bc154e..5718986 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -447,7 +447,12 @@ public class Kafka010FetcherTest {
block();
}
- @Override
+ @Override
+ public void markAsTemporarilyIdle() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public Object getCheckpointLock() {
return new Object();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 4526aa0..abd75cc 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -447,6 +447,11 @@ public class Kafka09FetcherTest {
}
@Override
+ public void markAsTemporarilyIdle() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public Object getCheckpointLock() {
return new Object();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index f2091f0..6887518 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -271,6 +271,12 @@ public class AbstractFetcherTimestampsTest {
}
}
+
+ @Override
+ public void markAsTemporarilyIdle() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public Object getCheckpointLock() {
return checkpointLock;
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 8474f8a..26434ed 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -443,6 +443,12 @@ public class RMQSourceTest {
@Override
public void emitWatermark(Watermark mark) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void markAsTemporarilyIdle() {
+ throw new UnsupportedOperationException();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
index 4c4749a..58aad7b 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
@@ -41,7 +41,12 @@ class TestContext implements SourceContext<Tuple1<Integer>> {
@Override
public void emitWatermark(Watermark mark) {
- // ignore it
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void markAsTemporarilyIdle() {
+ throw new UnsupportedOperationException();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index cc5cb8e..f579345 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -1001,6 +1001,10 @@ public class ContinuousFileProcessingTest {
}
@Override
+ public void markAsTemporarilyIdle() {
+ }
+
+ @Override
public Object getCheckpointLock() {
return lock;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index ab1ad1d..b86d97c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -136,7 +136,13 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
this.readerContext = StreamSourceContexts.getSourceContext(
- timeCharacteristic, getProcessingTimeService(), checkpointLock, output, watermarkInterval);
+ timeCharacteristic,
+ getProcessingTimeService(),
+ checkpointLock,
+ getContainingTask().getStreamStatusMaintainer(),
+ output,
+ watermarkInterval,
+ -1);
// and initialize the split reading thread
this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, restoredReaderState);
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index f1619b2..fc7f793 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -216,6 +216,20 @@ public interface SourceFunction<T> extends Function, Serializable {
@PublicEvolving
void emitWatermark(Watermark mark);
+ /**
+ * Marks the source to be temporarily idle. This tells the system that this source will
+ * temporarily stop emitting records and watermarks for an indefinite amount of time. This
+ * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
+ * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
+ * watermarks without the need to wait for watermarks from this source while it is idle.
+ *
+ * <p>Source functions should make a best effort to call this method as soon as they
+ * acknowledge themselves to be idle. The system will consider the source to resume activity
+ * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
+ * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
+ */
+ @PublicEvolving
+ void markAsTemporarilyIdle();
/**
* Returns the checkpoint lock. Please refer to the class-level comment in
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 84330b6..36f7c6a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -51,12 +52,15 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
this.chainingStrategy = ChainingStrategy.HEAD;
}
- public void run(final Object lockingObject) throws Exception {
- run(lockingObject, output);
+ public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
+ run(lockingObject, streamStatusMaintainer, output);
}
- public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception {
+ public void run(final Object lockingObject,
+ final StreamStatusMaintainer streamStatusMaintainer,
+ final Output<StreamRecord<OUT>> collector) throws Exception {
+
final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
LatencyMarksEmitter latencyEmitter = null;
@@ -68,11 +72,17 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
getOperatorConfig().getVertexID(),
getRuntimeContext().getIndexOfThisSubtask());
}
-
+
final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
this.ctx = StreamSourceContexts.getSourceContext(
- timeCharacteristic, getProcessingTimeService(), lockingObject, collector, watermarkInterval);
+ timeCharacteristic,
+ getProcessingTimeService(),
+ lockingObject,
+ streamStatusMaintainer,
+ collector,
+ watermarkInterval,
+ -1);
try {
userFunction.run(ctx);
@@ -108,7 +118,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
/**
* Marks this source as canceled or stopped.
*
- * <p>This indicates that any exit of the {@link #run(Object, Output)} method
+ * <p>This indicates that any exit of the {@link #run(Object, StreamStatusMaintainer, Output)} method
* cannot be interpreted as the result of a finite source.
*/
protected void markCanceledOrStopped() {
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index a6a273f..98281c4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -42,16 +44,34 @@ public class StreamSourceContexts {
* </ul>
* */
public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
- TimeCharacteristic timeCharacteristic, ProcessingTimeService processingTimeService,
- Object checkpointLock, Output<StreamRecord<OUT>> output, long watermarkInterval) {
+ TimeCharacteristic timeCharacteristic,
+ ProcessingTimeService processingTimeService,
+ Object checkpointLock,
+ StreamStatusMaintainer streamStatusMaintainer,
+ Output<StreamRecord<OUT>> output,
+ long watermarkInterval,
+ long idleTimeout) {
final SourceFunction.SourceContext<OUT> ctx;
switch (timeCharacteristic) {
case EventTime:
- ctx = new ManualWatermarkContext<>(checkpointLock, output);
+ ctx = new ManualWatermarkContext<>(
+ output,
+ processingTimeService,
+ checkpointLock,
+ streamStatusMaintainer,
+ idleTimeout);
+
break;
case IngestionTime:
- ctx = new AutomaticWatermarkContext<>(processingTimeService, checkpointLock, output, watermarkInterval);
+ ctx = new AutomaticWatermarkContext<>(
+ output,
+ watermarkInterval,
+ processingTimeService,
+ checkpointLock,
+ streamStatusMaintainer,
+ idleTimeout);
+
break;
case ProcessingTime:
ctx = new NonTimestampContext<>(checkpointLock, output);
@@ -97,6 +117,11 @@ public class StreamSourceContexts {
}
@Override
+ public void markAsTemporarilyIdle() {
+ // do nothing
+ }
+
+ @Override
public Object getCheckpointLock() {
return lock;
}
@@ -109,10 +134,8 @@ public class StreamSourceContexts {
* {@link SourceFunction.SourceContext} to be used for sources with automatic timestamps
* and watermark emission.
*/
- private static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {
+ private static class AutomaticWatermarkContext<T> extends WatermarkContext<T> {
- private final ProcessingTimeService timeService;
- private final Object lock;
private final Output<StreamRecord<T>> output;
private final StreamRecord<T> reuse;
@@ -121,14 +144,18 @@ public class StreamSourceContexts {
private volatile ScheduledFuture<?> nextWatermarkTimer;
private volatile long nextWatermarkTime;
+ private long lastRecordTime;
+
private AutomaticWatermarkContext(
- final ProcessingTimeService timeService,
- final Object checkpointLock,
- final Output<StreamRecord<T>> output,
- final long watermarkInterval) {
+ final Output<StreamRecord<T>> output,
+ final long watermarkInterval,
+ final ProcessingTimeService timeService,
+ final Object checkpointLock,
+ final StreamStatusMaintainer streamStatusMaintainer,
+ final long idleTimeout) {
+
+ super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout);
- this.timeService = Preconditions.checkNotNull(timeService, "Time Service cannot be null.");
- this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null.");
this.output = Preconditions.checkNotNull(output, "The output cannot be null.");
Preconditions.checkArgument(watermarkInterval >= 1L, "The watermark interval cannot be smaller than 1 ms.");
@@ -136,63 +163,62 @@ public class StreamSourceContexts {
this.reuse = new StreamRecord<>(null);
+ this.lastRecordTime = Long.MIN_VALUE;
+
long now = this.timeService.getCurrentProcessingTime();
this.nextWatermarkTimer = this.timeService.registerTimer(now + watermarkInterval,
- new WatermarkEmittingTask(this.timeService, lock, output));
+ new WatermarkEmittingTask(this.timeService, checkpointLock, output));
}
@Override
- public void collect(T element) {
- synchronized (lock) {
- final long currentTime = this.timeService.getCurrentProcessingTime();
- output.collect(reuse.replace(element, currentTime));
-
- // this is to avoid lock contention in the lockingObject by
- // sending the watermark before the firing of the watermark
- // emission task.
-
- if (currentTime > nextWatermarkTime) {
- // in case we jumped some watermarks, recompute the next watermark time
- final long watermarkTime = currentTime - (currentTime % watermarkInterval);
- nextWatermarkTime = watermarkTime + watermarkInterval;
- output.emitWatermark(new Watermark(watermarkTime));
-
- // we do not need to register another timer here
- // because the emitting task will do so.
- }
+ protected void processAndCollect(T element) {
+ lastRecordTime = this.timeService.getCurrentProcessingTime();
+ output.collect(reuse.replace(element, lastRecordTime));
+
+ // this is to avoid lock contention in the lockingObject by
+ // sending the watermark before the firing of the watermark
+ // emission task.
+ if (lastRecordTime > nextWatermarkTime) {
+ // in case we jumped some watermarks, recompute the next watermark time
+ final long watermarkTime = lastRecordTime - (lastRecordTime % watermarkInterval);
+ nextWatermarkTime = watermarkTime + watermarkInterval;
+ output.emitWatermark(new Watermark(watermarkTime));
+
+ // we do not need to register another timer here
+ // because the emitting task will do so.
}
}
@Override
- public void collectWithTimestamp(T element, long timestamp) {
- collect(element);
+ protected void processAndCollectWithTimestamp(T element, long timestamp) {
+ processAndCollect(element);
}
@Override
- public void emitWatermark(Watermark mark) {
-
- if (mark.getTimestamp() == Long.MAX_VALUE) {
- // allow it since this is the special end-watermark that for example the Kafka source emits
- synchronized (lock) {
- nextWatermarkTime = Long.MAX_VALUE;
- output.emitWatermark(mark);
- }
-
- // we can shutdown the timer now, no watermarks will be needed any more
- final ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer;
- if (nextWatermarkTimer != null) {
- nextWatermarkTimer.cancel(true);
- }
- }
+ protected boolean allowWatermark(Watermark mark) {
+ // allow Long.MAX_VALUE since this is the special end-watermark that for example the Kafka source emits
+ return mark.getTimestamp() == Long.MAX_VALUE && nextWatermarkTime != Long.MAX_VALUE;
}
+ /** This will only be called if allowWatermark returned {@code true} */
@Override
- public Object getCheckpointLock() {
- return lock;
+ protected void processAndEmitWatermark(Watermark mark) {
+ nextWatermarkTime = Long.MAX_VALUE;
+ output.emitWatermark(mark);
+
+ // we can shutdown the watermark timer now, no watermarks will be needed any more.
+ // Note that this procedure actually doesn't need to be synchronized with the lock,
+ // but since it's only a one-time thing, doesn't hurt either
+ final ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer;
+ if (nextWatermarkTimer != null) {
+ nextWatermarkTimer.cancel(true);
+ }
}
@Override
public void close() {
+ super.close();
+
final ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer;
if (nextWatermarkTimer != null) {
nextWatermarkTimer.cancel(true);
@@ -218,14 +244,23 @@ public class StreamSourceContexts {
public void onProcessingTime(long timestamp) {
final long currentTime = timeService.getCurrentProcessingTime();
- if (currentTime > nextWatermarkTime) {
- // align the watermarks across all machines. this will ensure that we
- // don't have watermarks that creep along at different intervals because
- // the machine clocks are out of sync
- final long watermarkTime = currentTime - (currentTime % watermarkInterval);
+ synchronized (lock) {
+ // we should continue to automatically emit watermarks if we are active
+ if (streamStatusMaintainer.getStreamStatus().isActive()) {
+ if (idleTimeout != -1 && currentTime - lastRecordTime > idleTimeout) {
+ // if we are configured to detect idleness, piggy-back the idle detection check on the
+ // watermark interval, so that we may possibly discover idle sources faster before waiting
+ // for the next idle check to fire
+ markAsTemporarilyIdle();
+
+ // no need to finish the next check, as we are now idle.
+ cancelNextIdleDetectionTask();
+ } else if (currentTime > nextWatermarkTime) {
+ // align the watermarks across all machines. this will ensure that we
+ // don't have watermarks that creep along at different intervals because
+ // the machine clocks are out of sync
+ final long watermarkTime = currentTime - (currentTime % watermarkInterval);
- synchronized (lock) {
- if (currentTime > nextWatermarkTime) {
output.emitWatermark(new Watermark(watermarkTime));
nextWatermarkTime = watermarkTime + watermarkInterval;
}
@@ -247,45 +282,220 @@ public class StreamSourceContexts {
* Streaming topologies can use timestamp assigner functions to override the timestamps
* assigned here.
*/
- private static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> {
+ private static class ManualWatermarkContext<T> extends WatermarkContext<T> {
- private final Object lock;
private final Output<StreamRecord<T>> output;
private final StreamRecord<T> reuse;
- private ManualWatermarkContext(Object checkpointLock, Output<StreamRecord<T>> output) {
- this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null.");
+ private ManualWatermarkContext(
+ final Output<StreamRecord<T>> output,
+ final ProcessingTimeService timeService,
+ final Object checkpointLock,
+ final StreamStatusMaintainer streamStatusMaintainer,
+ final long idleTimeout) {
+
+ super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout);
+
this.output = Preconditions.checkNotNull(output, "The output cannot be null.");
this.reuse = new StreamRecord<>(null);
}
@Override
+ protected void processAndCollect(T element) {
+ output.collect(reuse.replace(element));
+ }
+
+ @Override
+ protected void processAndCollectWithTimestamp(T element, long timestamp) {
+ output.collect(reuse.replace(element, timestamp));
+ }
+
+ @Override
+ protected void processAndEmitWatermark(Watermark mark) {
+ output.emitWatermark(mark);
+ }
+
+ @Override
+ protected boolean allowWatermark(Watermark mark) {
+ return true;
+ }
+ }
+
+ /**
+ * An abstract {@link SourceFunction.SourceContext} that should be used as the base for
+ * stream source contexts that are relevant with {@link Watermark}s.
+ *
+ * Stream source contexts that are relevant with watermarks are responsible of manipulating
+ * the current {@link StreamStatus}, so that stream status can be correctly propagated
+ * downstream. Please refer to the class-level documentation of {@link StreamStatus} for
+ * information on how stream status affects watermark advancement at downstream tasks.
+ *
+ * This class implements the logic of idleness detection. It fires idleness detection
+ * tasks at a given interval; if no records or watermarks were collected by the source context
+ * between 2 consecutive checks, it determines the source to be IDLE and correspondingly
+ * toggles the status. ACTIVE status resumes as soon as some record or watermark is collected
+ * again.
+ */
+ private static abstract class WatermarkContext<T> implements SourceFunction.SourceContext<T> {
+
+ protected final ProcessingTimeService timeService;
+ protected final Object checkpointLock;
+ protected final StreamStatusMaintainer streamStatusMaintainer;
+ protected final long idleTimeout;
+
+ private ScheduledFuture<?> nextCheck;
+
+ /**
+ * This flag will be reset to {@code true} every time the next check is scheduled.
+ * Whenever a record or watermark is collected, the flag will be set to {@code false}.
+ *
+ * When the scheduled check is fired, if the flag remains to be {@code true}, the check will fail,
+ * and our current status will determined to be IDLE.
+ */
+ private volatile boolean failOnNextCheck;
+
+ /**
+ * Create a watermark context.
+ *
+ * @param timeService the time service to schedule idleness detection tasks
+ * @param checkpointLock the checkpoint lock
+ * @param streamStatusMaintainer the stream status maintainer to toggle and retrieve current status
+ * @param idleTimeout (-1 if idleness checking is disabled)
+ */
+ public WatermarkContext(
+ final ProcessingTimeService timeService,
+ final Object checkpointLock,
+ final StreamStatusMaintainer streamStatusMaintainer,
+ final long idleTimeout) {
+
+ this.timeService = Preconditions.checkNotNull(timeService, "Time Service cannot be null.");
+ this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "Checkpoint Lock cannot be null.");
+ this.streamStatusMaintainer = Preconditions.checkNotNull(streamStatusMaintainer, "Stream Status Maintainer cannot be null.");
+
+ if (idleTimeout != -1) {
+ Preconditions.checkArgument(idleTimeout >= 1, "The idle timeout cannot be smaller than 1 ms.");
+ }
+ this.idleTimeout = idleTimeout;
+
+ scheduleNextIdleDetectionTask();
+ }
+
+ @Override
public void collect(T element) {
- synchronized (lock) {
- output.collect(reuse.replace(element));
+ synchronized (checkpointLock) {
+ streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
+
+ if (nextCheck != null) {
+ this.failOnNextCheck = false;
+ } else {
+ scheduleNextIdleDetectionTask();
+ }
+
+ processAndCollect(element);
}
}
@Override
public void collectWithTimestamp(T element, long timestamp) {
- synchronized (lock) {
- output.collect(reuse.replace(element, timestamp));
+ synchronized (checkpointLock) {
+ streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
+
+ if (nextCheck != null) {
+ this.failOnNextCheck = false;
+ } else {
+ scheduleNextIdleDetectionTask();
+ }
+
+ processAndCollectWithTimestamp(element, timestamp);
}
}
@Override
public void emitWatermark(Watermark mark) {
- synchronized (lock) {
- output.emitWatermark(mark);
+ if (allowWatermark(mark)) {
+ synchronized (checkpointLock) {
+ streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
+
+ if (nextCheck != null) {
+ this.failOnNextCheck = false;
+ } else {
+ scheduleNextIdleDetectionTask();
+ }
+
+ processAndEmitWatermark(mark);
+ }
+ }
+ }
+
+ @Override
+ public void markAsTemporarilyIdle() {
+ synchronized (checkpointLock) {
+ streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
}
}
@Override
public Object getCheckpointLock() {
- return lock;
+ return checkpointLock;
}
@Override
- public void close() {}
+ public void close() {
+ cancelNextIdleDetectionTask();
+ }
+
+ private class IdlenessDetectionTask implements ProcessingTimeCallback {
+ @Override
+ public void onProcessingTime(long timestamp) throws Exception {
+ synchronized (checkpointLock) {
+ // set this to null now;
+ // the next idleness detection will be scheduled again
+ // depending on the below failOnNextCheck condition
+ nextCheck = null;
+
+ if (failOnNextCheck) {
+ markAsTemporarilyIdle();
+ } else {
+ scheduleNextIdleDetectionTask();
+ }
+ }
+ }
+ }
+
+ private void scheduleNextIdleDetectionTask() {
+ if (idleTimeout != -1) {
+ // reset flag; if it remains true when task fires, we have detected idleness
+ failOnNextCheck = true;
+ nextCheck = this.timeService.registerTimer(
+ this.timeService.getCurrentProcessingTime() + idleTimeout,
+ new IdlenessDetectionTask());
+ }
+ }
+
+ protected void cancelNextIdleDetectionTask() {
+ final ScheduledFuture<?> nextCheck = this.nextCheck;
+ if (nextCheck != null) {
+ nextCheck.cancel(true);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Abstract methods for concrete subclasses to implement.
+ // These methods are guaranteed to be synchronized on the checkpoint lock,
+ // so implementations don't need to do so.
+ // ------------------------------------------------------------------------
+
+ /** Process and collect record. */
+ protected abstract void processAndCollect(T element);
+
+ /** Process and collect record with timestamp. */
+ protected abstract void processAndCollectWithTimestamp(T element, long timestamp);
+
+ /** Whether or not a watermark should be allowed */
+ protected abstract boolean allowWatermark(Watermark mark);
+
+ /** Process and emit watermark. Only called if {@link WatermarkContext#allowWatermark(Watermark)} returns {@code true} */
+ protected abstract void processAndEmitWatermark(Watermark mark);
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index e2061c3..3feaa52 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -49,7 +49,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -95,7 +95,7 @@ public class StreamInputProcessor<IN> {
*/
private int currentChannel = -1;
- private final OperatorChain<?, OneInputStreamOperator<IN, ?>> operatorChain;
+ private final StreamStatusMaintainer streamStatusMaintainer;
private final OneInputStreamOperator<IN, ?> streamOperator;
@@ -115,7 +115,7 @@ public class StreamInputProcessor<IN> {
Object lock,
IOManager ioManager,
Configuration taskManagerConfig,
- OperatorChain<?, OneInputStreamOperator<IN, ?>> operatorChain,
+ StreamStatusMaintainer streamStatusMaintainer,
OneInputStreamOperator<IN, ?> streamOperator) throws IOException {
InputGate inputGate = InputGateUtil.createInputGate(inputGates);
@@ -157,7 +157,7 @@ public class StreamInputProcessor<IN> {
this.lastEmittedWatermark = Long.MIN_VALUE;
- this.operatorChain = checkNotNull(operatorChain);
+ this.streamStatusMaintainer = checkNotNull(streamStatusMaintainer);
this.streamOperator = checkNotNull(streamOperator);
this.statusWatermarkValve = new StatusWatermarkValve(
@@ -297,7 +297,7 @@ public class StreamInputProcessor<IN> {
public void handleStreamStatus(StreamStatus streamStatus) {
try {
synchronized (lock) {
- operatorChain.setStreamStatus(streamStatus);
+ streamStatusMaintainer.toggleStreamStatus(streamStatus);
}
} catch (Exception e) {
throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index a295395..a8ec972 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -45,7 +45,7 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
-import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import java.io.IOException;
import java.util.Collection;
@@ -107,7 +107,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
*/
private int currentChannel = -1;
- private final OperatorChain<?, TwoInputStreamOperator<IN1, IN2, ?>> operatorChain;
+ private final StreamStatusMaintainer streamStatusMaintainer;
private final TwoInputStreamOperator<IN1, IN2, ?> streamOperator;
@@ -129,7 +129,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
Object lock,
IOManager ioManager,
Configuration taskManagerConfig,
- OperatorChain<?, TwoInputStreamOperator<IN1, IN2, ?>> operatorChain,
+ StreamStatusMaintainer streamStatusMaintainer,
TwoInputStreamOperator<IN1, IN2, ?> streamOperator) throws IOException {
final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);
@@ -185,7 +185,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
this.firstStatus = StreamStatus.ACTIVE;
this.secondStatus = StreamStatus.ACTIVE;
- this.operatorChain = checkNotNull(operatorChain);
+ this.streamStatusMaintainer = checkNotNull(streamStatusMaintainer);
this.streamOperator = checkNotNull(streamOperator);
this.statusWatermarkValve1 = new StatusWatermarkValve(numInputChannels1, new ForwardingValveOutputHandler1(streamOperator, lock));
@@ -355,13 +355,13 @@ public class StreamTwoInputProcessor<IN1, IN2> {
firstStatus = streamStatus;
// check if we need to toggle the task's stream status
- if (!streamStatus.equals(operatorChain.getStreamStatus())) {
+ if (!streamStatus.equals(streamStatusMaintainer.getStreamStatus())) {
if (streamStatus.isActive()) {
// we're no longer idle if at least one input has become active
- operatorChain.setStreamStatus(StreamStatus.ACTIVE);
+ streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
} else if (secondStatus.isIdle()) {
// we're idle once both inputs are idle
- operatorChain.setStreamStatus(StreamStatus.IDLE);
+ streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
}
}
}
@@ -399,13 +399,13 @@ public class StreamTwoInputProcessor<IN1, IN2> {
secondStatus = streamStatus;
// check if we need to toggle the task's stream status
- if (!streamStatus.equals(operatorChain.getStreamStatus())) {
+ if (!streamStatus.equals(streamStatusMaintainer.getStreamStatus())) {
if (streamStatus.isActive()) {
// we're no longer idle if at least one input has become active
- operatorChain.setStreamStatus(StreamStatus.ACTIVE);
+ streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
} else if (firstStatus.isIdle()) {
// we're idle once both inputs are idle
- operatorChain.setStreamStatus(StreamStatus.IDLE);
+ streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusMaintainer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusMaintainer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusMaintainer.java
new file mode 100644
index 0000000..d964cef
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusMaintainer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.streamstatus;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface that allows toggling the current {@link StreamStatus} as well as retrieving it.
+ */
+@Internal
+public interface StreamStatusMaintainer extends StreamStatusProvider {
+
+ /**
+ * Toggles the current stream status. This method should only have effect
+ * if the supplied stream status is different from the current status.
+ *
+ * @param streamStatus the new status to toggle to
+ */
+ void toggleStreamStatus(StreamStatus streamStatus);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index e559ad0..e04d316 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -42,10 +42,6 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
if (numberOfInputs > 0) {
InputGate[] inputGates = getEnvironment().getAllInputGates();
- @SuppressWarnings("unchecked")
- OperatorChain<?, OneInputStreamOperator<IN, ?>> operatorChain =
- (OperatorChain) this.operatorChain;
-
inputProcessor = new StreamInputProcessor<>(
inputGates,
inSerializer,
@@ -54,7 +50,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
- operatorChain,
+ getStreamStatusMaintainer(),
this.headOperator);
// make sure that stream tasks report their I/O statistics
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 591ed3c..4f07182 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -43,6 +43,7 @@ import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitio
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
import org.apache.flink.util.XORShiftRandom;
import org.slf4j.Logger;
@@ -63,7 +64,7 @@ import java.util.Random;
* head operator.
*/
@Internal
-public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusProvider {
+public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {
private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
@@ -151,7 +152,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
return streamStatus;
}
- public void setStreamStatus(StreamStatus status) throws IOException {
+ @Override
+ public void toggleStreamStatus(StreamStatus status) {
if (!status.equals(this.streamStatus)) {
this.streamStatus = status;
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 7ae99f6..63b40ad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -53,7 +53,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
@Override
protected void run() throws Exception {
- headOperator.run(getCheckpointLock());
+ headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 60afd60..62cfb8f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -57,6 +57,7 @@ import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FutureUtil;
@@ -497,6 +498,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
return accumulatorMap;
}
+ public StreamStatusMaintainer getStreamStatusMaintainer() {
+ return operatorChain;
+ }
+
Output<StreamRecord<OUT>> getHeadOutput() {
return operatorChain.getChainEntryPoint();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 175bd76..71346b8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -65,10 +65,6 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
}
}
- @SuppressWarnings("unchecked")
- OperatorChain<?, TwoInputStreamOperator<IN1, IN2, ?>> operatorChain =
- (OperatorChain) this.operatorChain;
-
this.inputProcessor = new StreamTwoInputProcessor<>(
inputList1, inputList2,
inputDeserializer1, inputDeserializer2,
@@ -77,7 +73,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
- operatorChain,
+ getStreamStatusMaintainer(),
this.headOperator);
// make sure that stream tasks report their I/O statistics
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
index e4dadf0..a4c1bea 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
@@ -67,7 +67,12 @@ public class ListSourceContext<T> implements SourceFunction.SourceContext<T> {
@Override
public void emitWatermark(Watermark mark) {
- // don't do anything
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void markAsTemporarilyIdle() {
+ throw new UnsupportedOperationException();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
index 8332cb3..9030e9d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
@@ -228,6 +228,12 @@ public class StatefulSequenceSourceTest {
@Override
public void emitWatermark(Watermark mark) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void markAsTemporarilyIdle() {
+ throw new UnsupportedOperationException();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
index 6b36419..d81b440 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
@@ -58,10 +58,13 @@ public class FileMonitoringFunctionTest {
public void emitWatermark(Watermark mark) {}
@Override
+ public void markAsTemporarilyIdle() {}
+
+ @Override
public Object getCheckpointLock() { return null; }
@Override
public void close() {}
});
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
index d1131b4..bb80228 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
@@ -225,6 +225,11 @@ public class InputFormatSourceFunctionTest {
}
@Override
+ public void markAsTemporarilyIdle() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public Object getCheckpointLock() {
return null;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
index 3e274cf..87376e7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
@@ -283,7 +283,14 @@ public class SocketTextStreamFunctionTest {
}
@Override
- public void emitWatermark(Watermark mark) {}
+ public void emitWatermark(Watermark mark) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void markAsTemporarilyIdle() {
+ throw new UnsupportedOperationException();
+ }
@Override
public Object getCheckpointLock() {
@@ -346,4 +353,4 @@ public class SocketTextStreamFunctionTest {
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index 357163c..c4ddea8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
@@ -219,9 +220,11 @@ public class AbstractUdfStreamOperatorLifecycleTest {
}
@Override
- public void run(Object lockingObject, Output<StreamRecord<OUT>> collector) throws Exception {
+ public void run(Object lockingObject,
+ StreamStatusMaintainer streamStatusMaintainer,
+ Output<StreamRecord<OUT>> collector) throws Exception {
ACTUAL_ORDER_TRACKING.add("OPERATOR::run");
- super.run(lockingObject, collector);
+ super.run(lockingObject, streamStatusMaintainer, collector);
runStarted.trigger();
runFinish.await();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java
new file mode 100644
index 0000000..3695120
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.CollectorOutput;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class StreamSourceContextIdleDetectionTests {
+
+ /** The tests in this class will be parameterized with these enumerations.*/
+ private enum TestMethod {
+
+ /** test idleness detection using the {@link SourceFunction.SourceContext#collect(Object)} method */
+ COLLECT,
+
+ /** test idleness detection using the {@link SourceFunction.SourceContext#collectWithTimestamp(Object, long)} method */
+ COLLECT_WITH_TIMESTAMP,
+
+ /** test idleness detection using the {@link SourceFunction.SourceContext#emitWatermark(Watermark)} method */
+ EMIT_WATERMARK
+ }
+
+ private TestMethod testMethod;
+
+ public StreamSourceContextIdleDetectionTests(TestMethod testMethod) {
+ this.testMethod = testMethod;
+ }
+
+ /**
+ * Test scenario (idleTimeout = 100):
+ * (1) Start from 0 as initial time.
+ * (2) As soon as time reaches 100, status should have been toggled to IDLE.
+ * (3) After some arbitrary time (until 300), the status should remain IDLE.
+ * (4) Emit a record at 310. Status should become ACTIVE. This should fire a idleness detection at 410.
+ * (5) Emit another record at 320 (which is before the next check). This should make the idleness check pass.
+ * (6) Advance time to 410 and trigger idleness detection.
+ * The status should still be ACTIVE due to step (5). Another idleness detection should be fired at 510.
+ * (7) Advance time to 510 and trigger idleness detection. Since no records were collected in-between the two
+ * idleness detections, status should have been toggle back to IDLE.
+ *
+ * Inline comments will refer to the corresponding tested steps in the scenario.
+ */
+ @Test
+ public void testManualWatermarkContext() throws Exception {
+ long idleTimeout = 100;
+
+ long initialTime = 0;
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+ processingTimeService.setCurrentTime(initialTime);
+
+ final List<StreamElement> output = new ArrayList<>();
+
+ MockStreamStatusMaintainer mockStreamStatusMaintainer = new MockStreamStatusMaintainer();
+
+ SourceFunction.SourceContext<String> context = StreamSourceContexts.getSourceContext(
+ TimeCharacteristic.EventTime,
+ processingTimeService,
+ new Object(),
+ mockStreamStatusMaintainer,
+ new CollectorOutput<String>(output),
+ 0,
+ idleTimeout);
+
+ // -------------------------- begin test scenario --------------------------
+
+ // corresponds to step (2) of scenario (please see method-level Javadoc comment)
+ processingTimeService.setCurrentTime(initialTime + idleTimeout);
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+
+ // corresponds to step (3) of scenario (please see method-level Javadoc comment)
+ processingTimeService.setCurrentTime(initialTime + 2*idleTimeout);
+ processingTimeService.setCurrentTime(initialTime + 3*idleTimeout);
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+
+ // corresponds to step (4) of scenario (please see method-level Javadoc comment)
+ processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + idleTimeout/10);
+ switch (testMethod) {
+ case COLLECT:
+ context.collect("msg");
+ break;
+ case COLLECT_WITH_TIMESTAMP:
+ context.collectWithTimestamp("msg", processingTimeService.getCurrentProcessingTime());
+ break;
+ case EMIT_WATERMARK:
+ context.emitWatermark(new Watermark(processingTimeService.getCurrentProcessingTime()));
+ break;
+ }
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+
+ // corresponds to step (5) of scenario (please see method-level Javadoc comment)
+ processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + 2*idleTimeout/10);
+ switch (testMethod) {
+ case COLLECT:
+ context.collect("msg");
+ break;
+ case COLLECT_WITH_TIMESTAMP:
+ context.collectWithTimestamp("msg", processingTimeService.getCurrentProcessingTime());
+ break;
+ case EMIT_WATERMARK:
+ context.emitWatermark(new Watermark(processingTimeService.getCurrentProcessingTime()));
+ break;
+ }
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+
+ // corresponds to step (6) of scenario (please see method-level Javadoc comment)
+ processingTimeService.setCurrentTime(initialTime + 4*idleTimeout + idleTimeout/10);
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+
+ // corresponds to step (7) of scenario (please see method-level Javadoc comment)
+ processingTimeService.setCurrentTime(initialTime + 5*idleTimeout + idleTimeout/10);
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+ }
+
+ /**
+ * Test scenario (idleTimeout = 100, watermarkInterval = 40):
+ * (1) Start from 20 as initial time.
+ * (2) As soon as time reaches 120, status should have been toggled to IDLE.
+ * (3) After some arbitrary time (until 320), the status should remain IDLE, and no watermarks should have been emitted.
+ * (4) Emit a record at 330. Status should become ACTIVE. This should schedule a idleness detection to be fired at 430.
+ * (5) Emit another record at 350 (which is before the next check). This should make the idleness check pass.
+ * (6) Advance time to 430 and trigger idleness detection. The status should still be ACTIVE due to step (5).
+ * This should schedule a idleness detection to be fired at 530.
+ * (7) Advance time to 460, in which a watermark emission task should be fired. Idleness detection
+ * should have been "piggy-backed" in the task, allowing the status to be toggled to IDLE before the next
+ * actual idle detection task at 530.
+ *
+ * Inline comments will refer to the corresponding tested steps in the scenario.
+ */
+ @Test
+ public void testAutomaticWatermarkContext() throws Exception {
+ long watermarkInterval = 40;
+ long idleTimeout = 100;
+ long initialTime = 20;
+
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+ processingTimeService.setCurrentTime(initialTime);
+
+ MockStreamStatusMaintainer mockStreamStatusMaintainer = new MockStreamStatusMaintainer();
+
+ final List<StreamElement> output = new ArrayList<>();
+ final List<StreamElement> expectedOutput = new ArrayList<>();
+
+ SourceFunction.SourceContext<String> context = StreamSourceContexts.getSourceContext(
+ TimeCharacteristic.IngestionTime,
+ processingTimeService,
+ new Object(),
+ mockStreamStatusMaintainer,
+ new CollectorOutput<String>(output),
+ watermarkInterval,
+ idleTimeout);
+
+ // -------------------------- begin test scenario --------------------------
+
+ // corresponds to step (2) of scenario (please see method-level Javadoc comment)
+ processingTimeService.setCurrentTime(initialTime + watermarkInterval);
+ expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
+ processingTimeService.setCurrentTime(initialTime + 2*watermarkInterval);
+ expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
+ processingTimeService.setCurrentTime(initialTime + idleTimeout);
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+ assertEquals(expectedOutput, output);
+
+ // corresponds to step (3) of scenario (please see method-level Javadoc comment)
+ processingTimeService.setCurrentTime(initialTime + 3*watermarkInterval);
+ processingTimeService.setCurrentTime(initialTime + 4*watermarkInterval);
+ processingTimeService.setCurrentTime(initialTime + 2*idleTimeout);
+ processingTimeService.setCurrentTime(initialTime + 6*watermarkInterval);
+ processingTimeService.setCurrentTime(initialTime + 7*watermarkInterval);
+ processingTimeService.setCurrentTime(initialTime + 3*idleTimeout);
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+ assertEquals(expectedOutput, output);
+
+ // corresponds to step (4) of scenario (please see method-level Javadoc comment)
+ processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + idleTimeout/10);
+ switch (testMethod) {
+ case COLLECT:
+ context.collect("msg");
+ expectedOutput.add(new StreamRecord<>("msg", processingTimeService.getCurrentProcessingTime()));
+ expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+ assertEquals(expectedOutput, output);
+ break;
+ case COLLECT_WITH_TIMESTAMP:
+ context.collectWithTimestamp("msg", processingTimeService.getCurrentProcessingTime());
+ expectedOutput.add(new StreamRecord<>("msg", processingTimeService.getCurrentProcessingTime()));
+ expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+ assertEquals(expectedOutput, output);
+ break;
+ case EMIT_WATERMARK:
+ // for emitWatermark, since the watermark will be blocked,
+ // it should not make the status become active;
+ // from here on, the status should remain idle for the emitWatermark variant test
+ context.emitWatermark(new Watermark(processingTimeService.getCurrentProcessingTime()));
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+ assertEquals(expectedOutput, output);
+ }
+
+ // corresponds to step (5) of scenario (please see method-level Javadoc comment)
+ processingTimeService.setCurrentTime(initialTime + 8*watermarkInterval);
+ processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + 3*idleTimeout/10);
+ switch (testMethod) {
+ case COLLECT:
+ context.collect("msg");
+ expectedOutput.add(new StreamRecord<>("msg", processingTimeService.getCurrentProcessingTime()));
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+ assertEquals(expectedOutput, output);
+ break;
+ case COLLECT_WITH_TIMESTAMP:
+ context.collectWithTimestamp("msg", processingTimeService.getCurrentProcessingTime());
+ expectedOutput.add(new StreamRecord<>("msg", processingTimeService.getCurrentProcessingTime()));
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+ assertEquals(expectedOutput, output);
+ break;
+ case EMIT_WATERMARK:
+ context.emitWatermark(new Watermark(processingTimeService.getCurrentProcessingTime()));
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+ assertEquals(expectedOutput, output);
+ }
+
+ processingTimeService.setCurrentTime(initialTime + 9 * watermarkInterval);
+ switch (testMethod) {
+ case COLLECT:
+ case COLLECT_WITH_TIMESTAMP:
+ expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+ assertEquals(expectedOutput, output);
+ break;
+ case EMIT_WATERMARK:
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+ assertEquals(expectedOutput, output);
+ }
+
+ processingTimeService.setCurrentTime(initialTime + 10*watermarkInterval);
+ switch (testMethod) {
+ case COLLECT:
+ case COLLECT_WITH_TIMESTAMP:
+ expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+ assertEquals(expectedOutput, output);
+ break;
+ case EMIT_WATERMARK:
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+ assertEquals(expectedOutput, output);
+ }
+
+ // corresponds to step (6) of scenario (please see method-level Javadoc comment)
+ processingTimeService.setCurrentTime(initialTime + 4*idleTimeout + idleTimeout/10);
+ switch (testMethod) {
+ case COLLECT:
+ case COLLECT_WITH_TIMESTAMP:
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+ assertEquals(expectedOutput, output);
+ break;
+ case EMIT_WATERMARK:
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+ assertEquals(expectedOutput, output);
+ }
+
+ // corresponds to step (7) of scenario (please see method-level Javadoc comment)
+ processingTimeService.setCurrentTime(initialTime + 11*watermarkInterval);
+ assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+ assertEquals(expectedOutput, output);
+ }
+
+ private static class MockStreamStatusMaintainer implements StreamStatusMaintainer {
+ StreamStatus currentStreamStatus = StreamStatus.ACTIVE;
+
+ @Override
+ public void toggleStreamStatus(StreamStatus streamStatus) {
+ if (!currentStreamStatus.equals(streamStatus)) {
+ currentStreamStatus = streamStatus;
+ }
+ }
+
+ @Override
+ public StreamStatus getStreamStatus() {
+ return currentStreamStatus;
+ }
+ }
+
+ @Parameterized.Parameters(name = "TestMethod = {0}")
+ @SuppressWarnings("unchecked")
+ public static Collection<TestMethod[]> timeCharacteristic(){
+ return Arrays.asList(
+ new TestMethod[]{TestMethod.COLLECT},
+ new TestMethod[]{TestMethod.COLLECT_WITH_TIMESTAMP},
+ new TestMethod[]{TestMethod.EMIT_WATERMARK});
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index b153de9..ae74c9c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -35,6 +36,8 @@ import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -69,7 +72,7 @@ public class StreamSourceOperatorTest {
final List<StreamElement> output = new ArrayList<>();
setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
- operator.run(new Object(), new CollectorOutput<String>(output));
+ operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output));
assertEquals(1, output.size());
assertEquals(Watermark.MAX_WATERMARK, output.get(0));
@@ -89,7 +92,7 @@ public class StreamSourceOperatorTest {
operator.cancel();
// run and exit
- operator.run(new Object(), new CollectorOutput<String>(output));
+ operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output));
assertTrue(output.isEmpty());
}
@@ -121,7 +124,7 @@ public class StreamSourceOperatorTest {
// run and wait to be canceled
try {
- operator.run(new Object(), new CollectorOutput<String>(output));
+ operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output));
}
catch (InterruptedException ignored) {}
@@ -142,7 +145,7 @@ public class StreamSourceOperatorTest {
operator.stop();
// run and stop
- operator.run(new Object(), new CollectorOutput<String>(output));
+ operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output));
assertTrue(output.isEmpty());
}
@@ -171,7 +174,7 @@ public class StreamSourceOperatorTest {
}.start();
// run and wait to be stopped
- operator.run(new Object(), new CollectorOutput<String>(output));
+ operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output));
assertTrue(output.isEmpty());
}
@@ -198,7 +201,7 @@ public class StreamSourceOperatorTest {
setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, latencyMarkInterval, testProcessingTimeService);
// run and wait to be stopped
- operator.run(new Object(), new CollectorOutput<Long>(output));
+ operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<Long>(output));
int numberLatencyMarkers = (int) (maxProcessingTime / latencyMarkInterval) + 1;
@@ -224,11 +227,6 @@ public class StreamSourceOperatorTest {
}
@Test
- public void testLatencyMarksEmitterLifecycleIntegration() {
-
- }
-
- @Test
public void testAutomaticWatermarkContext() throws Exception {
// regular stream source operator
@@ -246,8 +244,10 @@ public class StreamSourceOperatorTest {
StreamSourceContexts.getSourceContext(TimeCharacteristic.IngestionTime,
operator.getContainingTask().getProcessingTimeService(),
operator.getContainingTask().getCheckpointLock(),
+ operator.getContainingTask().getStreamStatusMaintainer(),
new CollectorOutput<String>(output),
- operator.getExecutionConfig().getAutoWatermarkInterval());
+ operator.getExecutionConfig().getAutoWatermarkInterval(),
+ -1);
// periodically emit the watermarks
// even though we start from 1 the watermark are still
@@ -302,6 +302,7 @@ public class StreamSourceOperatorTest {
when(mockTask.getEnvironment()).thenReturn(env);
when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
+ when(mockTask.getStreamStatusMaintainer()).thenReturn(mock(StreamStatusMaintainer.class));
doAnswer(new Answer<ProcessingTimeService>() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index d33d1b6..1e74c3e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -84,6 +84,7 @@ import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
@@ -759,7 +760,9 @@ public class StreamTaskTest extends TestLogger {
}
@Override
- public void run(Object lockingObject, Output<StreamRecord<Long>> collector) throws Exception {
+ public void run(Object lockingObject,
+ StreamStatusMaintainer streamStatusMaintainer,
+ Output<StreamRecord<Long>> collector) throws Exception {
while (!canceled) {
try {
Thread.sleep(500);
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 2df4efd..01afec6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -54,6 +54,8 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -157,6 +159,22 @@ public class AbstractStreamOperatorTestHarness<OUT> {
processingTimeService = new TestProcessingTimeService();
processingTimeService.setCurrentTime(0);
+ StreamStatusMaintainer mockStreamStatusMaintainer = new StreamStatusMaintainer() {
+ StreamStatus currentStreamStatus = StreamStatus.ACTIVE;
+
+ @Override
+ public void toggleStreamStatus(StreamStatus streamStatus) {
+ if (!currentStreamStatus.equals(streamStatus)) {
+ currentStreamStatus = streamStatus;
+ }
+ }
+
+ @Override
+ public StreamStatus getStreamStatus() {
+ return currentStreamStatus;
+ }
+ };
+
when(mockTask.getName()).thenReturn("Mock Task");
when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
when(mockTask.getConfiguration()).thenReturn(config);
@@ -165,6 +183,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
+ when(mockTask.getStreamStatusMaintainer()).thenReturn(mockStreamStatusMaintainer);
doAnswer(new Answer<Void>() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
index fe2b03e..d9ad24d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
@@ -56,6 +56,11 @@ public class CollectingSourceContext<T extends Serializable> implements SourceFu
}
@Override
+ public void markAsTemporarilyIdle() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public Object getCheckpointLock() {
return lock;
}