You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2017/02/17 14:15:08 UTC

[GitHub] flink pull request #3347: [FLINK-5716] [streaming] Make StreamSourceContexts...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/3347

    [FLINK-5716] [streaming] Make StreamSourceContexts aware of source idleness

    This PR is the second part of adding `StreamStatus` to the streaming runtime to facilitate idle sources in Flink.
    
    It allows the `AutomaticWatermarkContext` for ingestion time and `ManualWatermarkContext` for event time to detect source idleness. It works based on a fixed-interval check implementation, where we check if the context has collected any records or watermarks from the source UDF in-between two checks. If no records or watermarks were collected in-between 2 checks, we toggle the `SourceStreamTask` to be `IDLE`. As soon as a record or watermark is collected afterwards, we toggle back to be `ACTIVE`.
    
    This idleness check is disabled by default. It is not yet configurable with this PR (that would be worked on as a separate issue [FLINK-5018](https://issues.apache.org/jira/browse/FLINK-5018)).
    
    It also introduces a new user-exposed method to the `SourceFunction.SourceContext` interface:
    ```
    interface SourceFunction.SourceContext {
        ...
        void markAsTemporarilyIdle();
        ...
    }
    ```
    
    The purpose of this method is to allow the source UDF to proactively mark the source as `IDLE` without waiting for the underlying interval checks. UDFs should make a best effort to call this. For example, the Kafka and Kinesis consumers source instances can call this as soon as they determine they will not have subscribed partitions on startup.
    
    ## Others
    
    Introduced an internal interface `StreamStatusMaintainer` -
    ```
    interface StreamStatusMaintainer extends StreamStatusProvider {
        void toggleStreamStatus(StreamStatus streamStatus);
    }
    ```
    
    Main reason for this is to keep the `StreamSource` operator unaccessible to the `OperatorChain`.
    
    ## Tests
    
    Added `StreamSourceContextIdleDetectionTests` to test the proposed functionality. I did not add task-level tests that uses `StreamTaskTestHarness` yet because the idle timeout is not yet configurable beyond the `SourceContext` interfaces. I propose to add tests with task harnesses along with [FLINK-5018](https://issues.apache.org/jira/browse/FLINK-5018).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-5716

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3347.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3347
    
----
commit 66851ad987dedceb74475472d13a267101a95f61
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2017-02-16T18:43:44Z

    [FLINK-5716] [streaming] Make StreamSourceContexts aware of source idleness

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3347: [FLINK-5716] [streaming] Make StreamSourceContexts aware ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3347
  
    Thanks @aljoscha for the review! Will fix the typo and merge this to `master`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3347: [FLINK-5716] [streaming] Make StreamSourceContexts...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3347#discussion_r102431062
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java ---
    @@ -247,45 +282,220 @@ public void onProcessingTime(long timestamp) {
     	 * Streaming topologies can use timestamp assigner functions to override the timestamps
     	 * assigned here.
     	 */
    -	private static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> {
    +	private static class ManualWatermarkContext<T> extends WatermarkContext<T> {
     
    -		private final Object lock;
     		private final Output<StreamRecord<T>> output;
     		private final StreamRecord<T> reuse;
     
    -		private ManualWatermarkContext(Object checkpointLock, Output<StreamRecord<T>> output) {
    -			this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null.");
    +		private ManualWatermarkContext(
    +				final Output<StreamRecord<T>> output,
    +				final ProcessingTimeService timeService,
    +				final Object checkpointLock,
    +				final StreamStatusMaintainer streamStatusMaintainer,
    +				final long idleTimeout) {
    +
    +			super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout);
    +
     			this.output = Preconditions.checkNotNull(output, "The output cannot be null.");
     			this.reuse = new StreamRecord<>(null);
     		}
     
     		@Override
    +		protected void processAndCollect(T element) {
    +			output.collect(reuse.replace(element));
    +		}
    +
    +		@Override
    +		protected void processAndCollectWithTimestamp(T element, long timestamp) {
    +			output.collect(reuse.replace(element, timestamp));
    +		}
    +
    +		@Override
    +		protected void processAndEmitWatermark(Watermark mark) {
    +			output.emitWatermark(mark);
    +		}
    +
    +		@Override
    +		protected boolean allowWatermark(Watermark mark) {
    +			return true;
    +		}
    +	}
    +
    +	/**
    +	 * An asbtract {@link SourceFunction.SourceContext} that should be used as the base for
    --- End diff --
    
    Typo: `asbtract`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3347: [FLINK-5716] [streaming] Make StreamSourceContexts...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3347


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3347: [FLINK-5716] [streaming] Make StreamSourceContexts aware ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/3347
  
    @tzulitai Could you please go ahead and merge?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---