You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2016/06/17 12:26:53 UTC

[GitHub] flink pull request #2124: [FLINK-3647] Change StreamSource to use Processing...

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/2124

    [FLINK-3647] Change StreamSource to use Processing-Time Clock Service

    This PR changes the AutomaticWatermarkContext to user a pluggable processing time clock. This
    allows for better testability of the code.
    
    This PR also contains the solution to FLINK-3646.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink FLINK-3646

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2124.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2124
    
----
commit 88c01d2ffbb38ad83343c730e38c6a20ccdb8089
Author: kl0u <kk...@gmail.com>
Date:   2016-05-12T12:16:14Z

    [FLINK-3464] Use Processing-Time Clock in Window Assigners/Triggers
    
    Introduces a custom TimeServiceProvider to the StreamTask.
    This is responsible for defining and updating the current
    processingtime for a task and handling all related action,
    such as registering timers for actions to be executed in
    the future.

commit 97dba2bd165fc33b52350c81f09f38dbf8f60e72
Author: kl0u <kk...@gmail.com>
Date:   2016-06-09T19:23:38Z

    [FLINK-3647] Change StreamSource to use Processing-Time Clock Service

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2124: [FLINK-3647] Change StreamSource to use Processing-Time C...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2124
  
    This looks very good! I just have a few minor comments:
    
    * It's very good that you fixed typos and added comments/Javadoc to previously undocumented code. I would like to see them as a separate PR, though, and keep this PR only to the feature mentioned in the Jiras. 
    
    * You added `TestTimeServiceProvider` in two tests. I think it makes sense to move this to the same package as `DefaultTimerService` and clearly mark it as being for testing only. On a related note, it might make sense to normalize the names, right now there is `*TimeServiceProvider` and `*TimerService`.
    
    * You added the additional method `registerTimer(final long timestamp, final Runnable target)`. I imagine it is because the sources register their watermark timers as a `Runnable`. I think it would be better to keep having only `Triggerable` here. The watermark emitters should also work as a `Triggerable`.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2124: [FLINK-3647] Change StreamSource to use Processing...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u closed the pull request at:

    https://github.com/apache/flink/pull/2124


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2124: [FLINK-3647] Change StreamSource to use Processing...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2124#discussion_r68226912
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java ---
    @@ -186,16 +199,19 @@ public DistributedCache getDistributedCache() {
     	public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
     		throw new UnsupportedOperationException();
     	}
    -	
    +
    +	public long getCurrentProcessingTime() {
    +		Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized.");
    +		return timerService.getCurrentProcessingTime();
    +	}
    +
     	@Override
     	public ScheduledFuture<?> registerTimer(final long time, final Triggerable target) {
    -		if (timer == null) {
    -			timer = Executors.newSingleThreadScheduledExecutor();
    -		}
    +		Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized.");
     		
    -		final long delay = Math.max(time - System.currentTimeMillis(), 0);
    +		final long delay = Math.max(time - timerService.getCurrentProcessingTime(), 0);
    --- End diff --
    
    `timerService.registerTimer()` takes an absolute timestamp not a delay now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2124: [FLINK-3647] Change StreamSource to use Processing...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2124#discussion_r68227049
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssignerContext.java ---
    @@ -0,0 +1,37 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.windowing.assigners;
    +
    +import org.apache.flink.streaming.runtime.tasks.StreamTask;
    +
    +/**
    + * A context provided to the {@link WindowAssigner} that allows it to query the
    + * current processing time. This is provided to the assigner by its containing
    + * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
    + * which, in turn, gets it from the containing
    + * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
    + */
    +public abstract class WindowAssignerContext {
    --- End diff --
    
    This should be moved to `WindowAssigner`, similarly to how `TriggerContext` is an inner class of `Trigger`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2124: [FLINK-3647] Change StreamSource to use Processing...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2124#discussion_r68227329
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java ---
    @@ -27,10 +27,12 @@
     import org.apache.flink.api.common.typeutils.base.LongSerializer;
     import org.apache.flink.streaming.api.windowing.time.Time;
     import org.apache.flink.streaming.api.windowing.windows.Window;
    +import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
     
     /**
    - * A {@link Trigger} that continuously fires based on a given time interval. The time is the current
    - * system time.
    + * A {@link Trigger} that continuously fires based on a given time interval. The current (processing)
    + * time is provided by the {@link TimeServiceProvider}
    --- End diff --
    
    I think this is an implementation detail that should not necessarily be mentioned here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2124: [FLINK-3647] Change StreamSource to use Processing-Time C...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2124
  
    @aljoscha I integrated the comments.
    Let me know if there are any followups.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2124: [FLINK-3647] Change StreamSource to use Processing-Time C...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2124
  
    @kl0u could you please close this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---