You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StephanEwen <gi...@git.apache.org> on 2015/09/15 17:17:35 UTC

[GitHub] flink pull request: [FLINK-2675] [streaming] Add utilities for sch...

GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/1133

    [FLINK-2675] [streaming] Add utilities for scheduled triggers.

    These utilities are used by processing time triggers to schedule evaluations for the future. They are the first part of reworking the streaming windows to make them robust and faster.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink triggers

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1133.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1133
    
----
commit f57d0d68a691a518482935fed9290efad6f30dbd
Author: Stephan Ewen <se...@apache.org>
Date:   2015-09-15T12:55:41Z

    [FLINK-2675] [streaming] Add utilities for scheduled triggers.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2675] [streaming] Add utilities for sch...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1133#discussion_r39936382
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java ---
    @@ -0,0 +1,119 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.runtime.operators;
    +
    +import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
    +
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * A timer that triggers targets at a specific point in the future. This timer executes single-threaded,
    + * which means that never more than one trigger will be executed at the same time.
    + * <p>
    + * This timer generally maintains order of trigger events. This means that for two triggers scheduled at
    + * different times, the one scheduled for the later time will be executed after the one scheduled for the
    + * earlier time.
    + */
    +public class TriggerTimer {
    +	
    +	/** The thread group that holds all trigger timer threads */
    +	public static final ThreadGroup TRIGGER_THREADS_GROUP = new ThreadGroup("Triggers");
    +	
    +	/** The executor service that */
    +	private final ScheduledExecutorService scheduler;
    +
    +
    +	/**
    +	 * Creates a new trigger timer, where the timer thread has the default name "TriggerTimer Thread".
    +	 */
    +	public TriggerTimer() {
    +		this("TriggerTimer Thread");
    +	}
    +
    +	/**
    +	 * Creates a new trigger timer, where the timer thread has the given name.
    +	 * 
    +	 * @param triggerName The name for the trigger thread.
    +	 */
    +	public TriggerTimer(String triggerName) {
    +		this.scheduler = Executors.newSingleThreadScheduledExecutor(
    +				new DispatcherThreadFactory(TRIGGER_THREADS_GROUP, triggerName));
    +	}
    +
    +	/**
    +	 * Schedules a new trigger event. The trigger event will occur roughly at the given timestamp.
    +	 * If the timestamp is in the past (or now), the trigger will be queued for immediate execution. Note that other
    +	 * triggers that are to be executed now will be executed before this trigger.
    +	 * 
    +	 * @param target The target to be triggered.
    +	 * @param timestamp The timestamp when the trigger should occur, and the timestamp given
    +	 *                  to the trigger-able target.
    +	 */
    +	public void scheduleTriggerAt(Triggerable target, long timestamp) {
    +		long delay = Math.max(timestamp - System.currentTimeMillis(), 0);
    +		
    +		scheduler.schedule(
    +				new TriggerTask(target, timestamp),
    +				delay,
    +				TimeUnit.MILLISECONDS);
    +	}
    +
    +	/**
    +	 * Shuts down the trigger timer, canceling all pending triggers and stopping the trigger thread.
    +	 */
    +	public void shutdown() {
    +		scheduler.shutdownNow();
    +	}
    +
    +	/**
    +	 * The finalize method shuts down the timer. This is a fail-safe shutdown, in case the original
    +	 * shutdown method was never called.
    +	 * <p>
    +	 * This should not be relied upon! It will cause shutdown to happen much later than if manual
    +	 * shutdown is attempted, and cause threads to linger for longer than needed.
    +	 */
    +	@Override
    +	@SuppressWarnings("FinalizeDoesntCallSuperFinalize")
    +	protected void finalize() {
    +		shutdown();
    --- End diff --
    
    I think a warning would be nice if the the scheduler is not shutdown when this runs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2675] [streaming] Add utilities for sch...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1133#discussion_r39526103
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java ---
    @@ -0,0 +1,119 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.runtime.operators;
    +
    +import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
    +
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * A timer that triggers targets at a specific point in the future. This timer executes single-threaded,
    + * which means that never more than one trigger will be executed at the same time.
    + * <p>
    + * This timer generally maintains order of trigger events. This means that for two triggers scheduled at
    + * different times, the one scheduled for the later time will be executed after the one scheduled for the
    + * earlier time.
    + */
    +public class TriggerTimer {
    +	
    +	/** The thread group that holds all trigger timer threads */
    +	public static final ThreadGroup TRIGGER_THREADS_GROUP = new ThreadGroup("Triggers");
    +	
    +	/** The executor service that */
    --- End diff --
    
    ah, right ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2675] [streaming] Add utilities for sch...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1133#discussion_r39955086
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java ---
    @@ -0,0 +1,119 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.runtime.operators;
    +
    +import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
    +
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * A timer that triggers targets at a specific point in the future. This timer executes single-threaded,
    + * which means that never more than one trigger will be executed at the same time.
    + * <p>
    + * This timer generally maintains order of trigger events. This means that for two triggers scheduled at
    + * different times, the one scheduled for the later time will be executed after the one scheduled for the
    + * earlier time.
    + */
    +public class TriggerTimer {
    +	
    +	/** The thread group that holds all trigger timer threads */
    +	public static final ThreadGroup TRIGGER_THREADS_GROUP = new ThreadGroup("Triggers");
    +	
    +	/** The executor service that */
    +	private final ScheduledExecutorService scheduler;
    +
    +
    +	/**
    +	 * Creates a new trigger timer, where the timer thread has the default name "TriggerTimer Thread".
    +	 */
    +	public TriggerTimer() {
    +		this("TriggerTimer Thread");
    +	}
    +
    +	/**
    +	 * Creates a new trigger timer, where the timer thread has the given name.
    +	 * 
    +	 * @param triggerName The name for the trigger thread.
    +	 */
    +	public TriggerTimer(String triggerName) {
    +		this.scheduler = Executors.newSingleThreadScheduledExecutor(
    +				new DispatcherThreadFactory(TRIGGER_THREADS_GROUP, triggerName));
    +	}
    +
    +	/**
    +	 * Schedules a new trigger event. The trigger event will occur roughly at the given timestamp.
    +	 * If the timestamp is in the past (or now), the trigger will be queued for immediate execution. Note that other
    +	 * triggers that are to be executed now will be executed before this trigger.
    +	 * 
    +	 * @param target The target to be triggered.
    +	 * @param timestamp The timestamp when the trigger should occur, and the timestamp given
    +	 *                  to the trigger-able target.
    +	 */
    +	public void scheduleTriggerAt(Triggerable target, long timestamp) {
    +		long delay = Math.max(timestamp - System.currentTimeMillis(), 0);
    +		
    +		scheduler.schedule(
    +				new TriggerTask(target, timestamp),
    +				delay,
    +				TimeUnit.MILLISECONDS);
    +	}
    +
    +	/**
    +	 * Shuts down the trigger timer, canceling all pending triggers and stopping the trigger thread.
    +	 */
    +	public void shutdown() {
    +		scheduler.shutdownNow();
    +	}
    +
    +	/**
    +	 * The finalize method shuts down the timer. This is a fail-safe shutdown, in case the original
    +	 * shutdown method was never called.
    +	 * <p>
    +	 * This should not be relied upon! It will cause shutdown to happen much later than if manual
    +	 * shutdown is attempted, and cause threads to linger for longer than needed.
    +	 */
    +	@Override
    +	@SuppressWarnings("FinalizeDoesntCallSuperFinalize")
    +	protected void finalize() {
    +		shutdown();
    --- End diff --
    
    Makes sense, will add this when merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2675] [streaming] Add utilities for sch...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1133#issuecomment-141928885
  
    @aljoscha Is it possible that I merge the TimeWindow operator pull request and once you merge you changes, we swap this one with your timer service?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2675] [streaming] Add utilities for sch...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1133#issuecomment-141929298
  
    Should be doable. I have it ready now, with tests. But please, go ahead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2675] [streaming] Add utilities for sch...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1133#issuecomment-142704647
  
    Merged and subsumed by #1147


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2675] [streaming] Add utilities for sch...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1133#discussion_r39525474
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/TriggerTimer.java ---
    @@ -0,0 +1,119 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.runtime.operators;
    +
    +import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
    +
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * A timer that triggers targets at a specific point in the future. This timer executes single-threaded,
    + * which means that never more than one trigger will be executed at the same time.
    + * <p>
    + * This timer generally maintains order of trigger events. This means that for two triggers scheduled at
    + * different times, the one scheduled for the later time will be executed after the one scheduled for the
    + * earlier time.
    + */
    +public class TriggerTimer {
    +	
    +	/** The thread group that holds all trigger timer threads */
    +	public static final ThreadGroup TRIGGER_THREADS_GROUP = new ThreadGroup("Triggers");
    +	
    +	/** The executor service that */
    --- End diff --
    
    some text missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2675] [streaming] Add utilities for sch...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1133#issuecomment-141905444
  
    I'm currently working on integrating this with the StreamTask and adding tests. We have to integrate it with StreamTask to ensure that timer callbacks are not called concurrently with the other processing methods of StreamOperator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2675] [streaming] Add utilities for sch...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen closed the pull request at:

    https://github.com/apache/flink/pull/1133


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---