You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2016/09/29 15:21:28 UTC

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/2570

    [FLINK-3674] Add an interface for Time aware User Functions

    This moves the event-time/processing-time trigger code from
    `WindowOperator` behind a well defined interface that can be used by
    operators (and user functions).
    
    `InternalTimerService` is the new interface that has the same
    functionality that `WindowOperator` used to have. `TimerService` is the user
    facing interface that does not allow dealing with namespaces/payloads
    and also does not allow deleting timers. There is a default
    implementation in `HeapInternalTimerService` that can checkpoint timers to
    a stream and also restore from a stream. Right now, this is managed in
    `AbstractStreamOperator` and operators can ask for an
    `InternalTimerService`.
    
    This also adds tests for `HeapInternalTimerService`.
    
    This adds two new user functions:
     - `TimelyFlatMapFunction`: an extension of `FlatMapFunction` that also
       allows querying time and setting timers
     - `TimelyCoFlatMapFunction`: the same, but for `CoFlatMapFunction`
    
    There are two new `StreamOperator` implementations for these that use the
    `InternalTimerService` interface.
    
    This also adds tests for the two new operators.
    
    This also adds the new interface `KeyContext` that is used for
    setting/querying the current key context for state and timers. Timers
    are always scoped to a key, for now.
    
    Also, this moves the handling of watermarks for both one-input and
    two-input operators to `AbstractStreamOperators` so that we have a central
    ground-truth.
    
    There was also a bunch of small changes that I had to do to make the proper change more clean. I would like to keep these as separate commits because they clearly document what was going on.
    
    ## Note for Reviewers
    You should probably start from the tests, i.e. `HeapInternalTimerServiceTest`, `TimelyFlatMapTest` and `TimelyCoFlatMapTest`. Then, the other interesting bits are `AbstractStreamOperator` that now deals with watermarks and checkpointing the timers and the `HeapInternalTimerService` as well. Keep in mind that this is just moving the code from `WindowOperator` to `HeapInternalTimerService` with some generalizations. I didn't try to optimize any of the data structures that are used.
    
    R: @StephanEwen @StefanRRichter @kl0u for review, please \U0001f603 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink timely-function

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2570.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2570
    
----
commit 1a09d9032bf5683a378a7fc8dc480f2d14c5924d
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-09-25T18:58:16Z

    Rename TimeServiceProvider to ProcessingTimeService
    
    The name is clashing with the soon-to-be-added
    TimerService/InternalTimerService which is meant as an interface for
    dealing with both processing time and event time.
    
    TimeServiceProvided is renamed to ProcessingTimeService to reflect the
    fact that it is a low-level utility that only deals with "physical"
    processing-time trigger tasks.

commit 758827c3c8508ef9ef2ec079ff3a8469d0096ca8
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-09-28T13:10:35Z

    Use OperatorTestHarness and TestProcessingTimeService in Kafka Tests

commit f6dd9c74dc2c58c4263fb6d084651b514898d47a
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-09-28T14:35:33Z

    Use Processing-Time Service of TestHarness in WindowOperatorTest
    
    Before, this was manually creating a TestProcessingTimeService, now,
    we're using the one that is there by default in
    OneInputStreamOperatorTestHarness.

commit 65389d66c5586e6707b7a6bf48df512354fac085
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-09-28T14:43:40Z

    Refactor OperatorTestHarness to always use TestProcessingTimeService
    
    Before, this would allow handing in a custom ProcessingTimeService but
    this was in reality always TestProcessingTimeService.

commit 1d013bcacc040552e5783c64d094ec309014457b
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-09-28T13:12:26Z

    Use TestHarness Processing-time Facility in BucketingSinkTest
    
    Before, this was manually creating a TestProcessingTimeService. Now we
    use the one that is there by default in
    OneInputStreamOperatorTestHarness.

commit eaf3dd00fefeb2487c7cafff6337123cbe42874b
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-09-28T13:32:24Z

    Use OperatorTestHarness in AlignedWindowOperator Tests

commit b597d2ef50c27554b83fddaff8873107265340d4
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-09-29T14:04:29Z

    Refactor Operator TestHarnesses to use Common Base Class
    
    This also introduces KeyedTwoInputStreamOperatorTestHarness which
    is similar to KeyedOneInputStreamOperatorTestHarness

commit 9b5b07ce97b31661ac5917c51e449ab0a85dbb58
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-09-26T14:21:51Z

    [FLINK-3674] Add an interface for Time aware User Functions
    
    This moves the event-time/processing-time trigger code from
    WindowOperator behind a well defined interface that can be used by
    operators (and user functions).
    
    InternalTimerService is the new interface that has the same
    functionality that WindowOperator used to have. TimerService is the user
    facing interface that does not allow dealing with namespaces/payloads
    and also does not allow deleting timers. There is a default
    implementation in HeapInternalTimerService that can checkpoint timers to
    a stream and also restore from a stream. Right now, this is managed in
    AbstractStreamOperator and operators can ask for an
    InternalTimerService.
    
    This also adds tests for HeapInternalTimerService.
    
    This adds two new user functions:
     - TimelyFlatMapFunction: an extension of FlatMapFunction that also
       allows querying time and setting timers
     - TimelyCoFlatMapFunction: the same, but for CoFlatMapFunction
    
    There are two new StreamOperator implementations for these that use the
    InternalTimerService interface.
    
    This also adds tests for the two new operators.
    
    This also adds the new interface KeyContext that is used for
    setting/querying the current key context for state and timers. Timers
    are always scoped to a key, for now.
    
    Also, this moves the handling of watermarks for both one-input and
    two-input operators to AbstractStreamOperators so that we have a central
    ground-truth.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2570: [FLINK-3674] Add an interface for Time aware User Functio...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2570
  
    This PR disables `RescalingITCase` because the timers are checkpointed using the legacy/deprecated `StreamCheckpointedOperator` interface. (`AbstractStreamOperator` implements the interface because it keeps the timers.) The tests can be reactivated once @kl0u gets in his work on making the timers key-group aware.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r83859446
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java ---
    @@ -0,0 +1,325 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.streaming.runtime.operators.Triggerable;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.HashSet;
    +import java.util.PriorityQueue;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * {@link InternalTimerService} that stores timers on the Java heap.
    + */
    +public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, Triggerable {
    +
    +	private final TypeSerializer<K> keySerializer;
    +
    +	private final TypeSerializer<N> namespaceSerializer;
    +
    +	private final ProcessingTimeService processingTimeService;
    +
    +	private long currentWatermark = Long.MIN_VALUE;
    +
    +	private final org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget;
    +
    +	private final KeyContext keyContext;
    +
    +	/**
    +	 * Processing time timers that are currently in-flight.
    +	 */
    +	private final PriorityQueue<InternalTimer<K, N>> processingTimeTimersQueue;
    +	private final Set<InternalTimer<K, N>> processingTimeTimers;
    +
    +	protected ScheduledFuture<?> nextTimer = null;
    +
    +	/**
    +	 * Currently waiting watermark callbacks.
    +	 */
    +	private final Set<InternalTimer<K, N>> watermarkTimers;
    +	private final PriorityQueue<InternalTimer<K, N>> watermarkTimersQueue;
    +
    +	public HeapInternalTimerService(
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService) {
    +		this.keySerializer = checkNotNull(keySerializer);
    +		this.namespaceSerializer = checkNotNull(namespaceSerializer);
    +		this.triggerTarget = checkNotNull(triggerTarget);
    +		this.keyContext = keyContext;
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +
    +		watermarkTimers = new HashSet<>();
    +		watermarkTimersQueue = new PriorityQueue<>(100);
    +
    +		processingTimeTimers = new HashSet<>();
    +		processingTimeTimersQueue = new PriorityQueue<>(100);
    +	}
    +
    +	public HeapInternalTimerService(
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			RestoredTimers<K, N> restoredTimers) {
    +
    +		this.keySerializer = checkNotNull(keySerializer);
    +		this.namespaceSerializer = checkNotNull(namespaceSerializer);
    +		this.triggerTarget = checkNotNull(triggerTarget);
    +		this.keyContext = keyContext;
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +
    --- End diff --
    
    RestoredTimers are serialized with their typeserializers. It could make sense to have some equals or compatibility check here against the passed type serializers. Also wonder if they need to be serialized in the first place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84255151
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultProcessingTimeService.java ---
    @@ -35,10 +34,10 @@
     import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
    - * A {@link TimeServiceProvider} which assigns as current processing time the result of calling
    + * A {@link ProcessingTimeService} which assigns as current processing time the result of calling
      * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}.
      */
    -public class DefaultTimeServiceProvider extends TimeServiceProvider {
    +public class DefaultProcessingTimeService extends ProcessingTimeService {
     
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84039288
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java ---
    @@ -0,0 +1,325 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.streaming.runtime.operators.Triggerable;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.HashSet;
    +import java.util.PriorityQueue;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * {@link InternalTimerService} that stores timers on the Java heap.
    + */
    +public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, Triggerable {
    +
    +	private final TypeSerializer<K> keySerializer;
    +
    +	private final TypeSerializer<N> namespaceSerializer;
    +
    +	private final ProcessingTimeService processingTimeService;
    +
    +	private long currentWatermark = Long.MIN_VALUE;
    +
    +	private final org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget;
    +
    +	private final KeyContext keyContext;
    +
    +	/**
    +	 * Processing time timers that are currently in-flight.
    +	 */
    +	private final PriorityQueue<InternalTimer<K, N>> processingTimeTimersQueue;
    +	private final Set<InternalTimer<K, N>> processingTimeTimers;
    +
    +	protected ScheduledFuture<?> nextTimer = null;
    +
    +	/**
    +	 * Currently waiting watermark callbacks.
    +	 */
    +	private final Set<InternalTimer<K, N>> watermarkTimers;
    +	private final PriorityQueue<InternalTimer<K, N>> watermarkTimersQueue;
    +
    +	public HeapInternalTimerService(
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService) {
    +		this.keySerializer = checkNotNull(keySerializer);
    +		this.namespaceSerializer = checkNotNull(namespaceSerializer);
    +		this.triggerTarget = checkNotNull(triggerTarget);
    +		this.keyContext = keyContext;
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +
    +		watermarkTimers = new HashSet<>();
    +		watermarkTimersQueue = new PriorityQueue<>(100);
    +
    +		processingTimeTimers = new HashSet<>();
    +		processingTimeTimersQueue = new PriorityQueue<>(100);
    +	}
    +
    +	public HeapInternalTimerService(
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			RestoredTimers<K, N> restoredTimers) {
    +
    +		this.keySerializer = checkNotNull(keySerializer);
    +		this.namespaceSerializer = checkNotNull(namespaceSerializer);
    +		this.triggerTarget = checkNotNull(triggerTarget);
    +		this.keyContext = keyContext;
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +
    --- End diff --
    
    That one's a bit tricky since I only get the `TypeSerializers` once the user requests a `TimerService`. Right now restoring is a two-step process, first the timers are read, and then when a user request the timer service are they actually put into a TimerService and we re-register the physical processing-time timer. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2570: [FLINK-3674] Add an interface for Time aware User Functio...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2570
  
    Thanks a lot for the review, @StefanRRichter! I addressed most comments by writing code, for some I only gave an answer that describes the rational.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84235862
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
    @@ -390,4 +425,141 @@ public void close() {
     			output.close();
     		}
     	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Watermark handling
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Returns a {@link InternalTimerService} that can be used to query current processing time
    +	 * and event time and to set timers. An operator can have several timer services, where
    +	 * each has its own namespace serializer. Timer services are differentiated by the string
    +	 * key that is given when requesting them, if you call this method with the same key
    +	 * multiple times you will get the same timer service instance in subsequent requests.
    +	 *
    +	 * <p>Timers are always scoped to a key, the currently active key of a keyed stream operation.
    +	 * When a timer fires, this key will also be set as the currently active key.
    +	 *
    +	 * <p>Each timer has attached metadata, the namespace. Different timer services
    +	 * can have a different namespace type. If you don't need namespace differentiation you
    +	 * can use {@link VoidNamespaceSerializer} as the namespace serializer.
    +	 *
    +	 * @param name The name of the requested timer service. If no service exists under the given
    +	 *             name a new one will be created and returned.
    +	 * @param keySerializer {@code TypeSerializer} for the keys of the timers.
    +	 * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
    +	 * @param triggerable The {@link Triggerable} that should be invoked when timers fire
    +	 *
    +	 * @param <K> The type of the timer keys.
    +	 * @param <N> The type of the timer namespace.
    +	 */
    +	public <K, N> InternalTimerService<N> getInternalTimerService(
    +			String name,
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			Triggerable<K, N> triggerable) {
    +
    +		@SuppressWarnings("unchecked")
    +		HeapInternalTimerService<K, N> service = (HeapInternalTimerService<K, N>) timerServices.get(name);
    +
    +		if (service == null) {
    +			if (restoredServices != null && restoredServices.containsKey(name)) {
    +				@SuppressWarnings("unchecked")
    +				HeapInternalTimerService.RestoredTimers<K, N> restoredService =
    +						(HeapInternalTimerService.RestoredTimers<K, N>) restoredServices.remove(name);
    +
    +				service = new HeapInternalTimerService<>(
    +						keySerializer,
    +						namespaceSerializer,
    +						triggerable,
    +						this,
    +						getRuntimeContext().getProcessingTimeService(),
    +						restoredService);
    +
    +			} else {
    +				service = new HeapInternalTimerService<>(
    +						keySerializer,
    +						namespaceSerializer,
    +						triggerable,
    +						this,
    +						getRuntimeContext().getProcessingTimeService());
    +			}
    +			timerServices.put(name, service);
    +		}
    +
    +		return service;
    +	}
    +
    +	public void processWatermark(Watermark mark) throws Exception {
    +		for (HeapInternalTimerService<?, ?> service : timerServices.values()) {
    +			service.advanceWatermark(mark.getTimestamp());
    +		}
    +		output.emitWatermark(mark);
    +	}
    +
    +	public void processWatermark1(Watermark mark) throws Exception {
    +		input1Watermark = mark.getTimestamp();
    +		long newMin = Math.min(input1Watermark, input2Watermark);
    --- End diff --
    
    I agree with @StefanRRichter 's comment below and I just have to add that for `processWatermark1` and `processWatermark2` much of the code is repeated so the common part can become a private method that is called by both these methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84237459
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java ---
    @@ -0,0 +1,317 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.HashSet;
    +import java.util.PriorityQueue;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * {@link InternalTimerService} that stores timers on the Java heap.
    + */
    +public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, ProcessingTimeCallback {
    +
    +	private final TypeSerializer<K> keySerializer;
    +
    +	private final TypeSerializer<N> namespaceSerializer;
    +
    +	private final ProcessingTimeService processingTimeService;
    +
    +	private long currentWatermark = Long.MIN_VALUE;
    +
    +	private final org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget;
    +
    +	private final KeyContext keyContext;
    +
    +	/**
    +	 * Processing time timers that are currently in-flight.
    +	 */
    +	private final PriorityQueue<InternalTimer<K, N>> processingTimeTimersQueue;
    +	private final Set<InternalTimer<K, N>> processingTimeTimers;
    +
    +	protected ScheduledFuture<?> nextTimer = null;
    +
    +	/**
    +	 * Currently waiting watermark callbacks.
    +	 */
    +	private final Set<InternalTimer<K, N>> watermarkTimers;
    +	private final PriorityQueue<InternalTimer<K, N>> watermarkTimersQueue;
    +
    +	public HeapInternalTimerService(
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService) {
    +		this.keySerializer = checkNotNull(keySerializer);
    +		this.namespaceSerializer = checkNotNull(namespaceSerializer);
    +		this.triggerTarget = checkNotNull(triggerTarget);
    +		this.keyContext = keyContext;
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +
    +		watermarkTimers = new HashSet<>();
    --- End diff --
    
    I am wondering if it would be better to rename this to `EventTimeTimers`. This plays well with `processingTimeTimers` and also it indicates what we are talking about. We have event time whose "clock-tiks" are the Watermarks and processing time whose clock tiks are the wall clock ones. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84031706
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java ---
    @@ -15,38 +15,26 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    +package org.apache.flink.streaming.api.operators;
     
    -package org.apache.flink.streaming.runtime.operators.windowing;
    +import org.apache.flink.annotation.Internal;
     
    -import org.apache.flink.streaming.runtime.operators.Triggerable;
    -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
    -
    -import java.util.concurrent.ScheduledFuture;
    -
    -class NoOpTimerService extends TimeServiceProvider {
    -
    -	private volatile boolean terminated;
    -
    -	@Override
    -	public long getCurrentProcessingTime() {
    -		return System.currentTimeMillis();
    -	}
    -
    -	@Override
    -	public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
    -		return null;
    -	}
    -
    -	@Override
    -	public boolean isTerminated() {
    -		return terminated;
    -	}
    -
    -	@Override
    -	public void quiesceAndAwaitPending() {}
    -
    -	@Override
    -	public void shutdownService() {
    -		terminated = true;
    -	}
    +/**
    + * Interface for things that can be called by {@link InternalTimerService}.
    + *
    + * @param <K> Type of the keys to which timers are scoped.
    + * @param <N> Type of the namespace to which timers are scoped.
    + */
    +@Internal
    +public interface Triggerable<K, N> {
    --- End diff --
    
    Done, changed it to `ProcessingTimeCallback`, which has a method `onProcessingTime()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r83862277
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
    @@ -390,4 +425,141 @@ public void close() {
     			output.close();
     		}
     	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Watermark handling
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Returns a {@link InternalTimerService} that can be used to query current processing time
    +	 * and event time and to set timers. An operator can have several timer services, where
    +	 * each has its own namespace serializer. Timer services are differentiated by the string
    +	 * key that is given when requesting them, if you call this method with the same key
    +	 * multiple times you will get the same timer service instance in subsequent requests.
    +	 *
    +	 * <p>Timers are always scoped to a key, the currently active key of a keyed stream operation.
    +	 * When a timer fires, this key will also be set as the currently active key.
    +	 *
    +	 * <p>Each timer has attached metadata, the namespace. Different timer services
    +	 * can have a different namespace type. If you don't need namespace differentiation you
    +	 * can use {@link VoidNamespaceSerializer} as the namespace serializer.
    +	 *
    +	 * @param name The name of the requested timer service. If no service exists under the given
    +	 *             name a new one will be created and returned.
    +	 * @param keySerializer {@code TypeSerializer} for the keys of the timers.
    +	 * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
    +	 * @param triggerable The {@link Triggerable} that should be invoked when timers fire
    +	 *
    +	 * @param <K> The type of the timer keys.
    +	 * @param <N> The type of the timer namespace.
    +	 */
    +	public <K, N> InternalTimerService<N> getInternalTimerService(
    +			String name,
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			Triggerable<K, N> triggerable) {
    +
    +		@SuppressWarnings("unchecked")
    +		HeapInternalTimerService<K, N> service = (HeapInternalTimerService<K, N>) timerServices.get(name);
    +
    +		if (service == null) {
    +			if (restoredServices != null && restoredServices.containsKey(name)) {
    +				@SuppressWarnings("unchecked")
    +				HeapInternalTimerService.RestoredTimers<K, N> restoredService =
    +						(HeapInternalTimerService.RestoredTimers<K, N>) restoredServices.remove(name);
    +
    +				service = new HeapInternalTimerService<>(
    +						keySerializer,
    +						namespaceSerializer,
    +						triggerable,
    +						this,
    +						getRuntimeContext().getProcessingTimeService(),
    +						restoredService);
    +
    +			} else {
    +				service = new HeapInternalTimerService<>(
    +						keySerializer,
    +						namespaceSerializer,
    +						triggerable,
    +						this,
    +						getRuntimeContext().getProcessingTimeService());
    +			}
    +			timerServices.put(name, service);
    +		}
    +
    +		return service;
    +	}
    +
    +	public void processWatermark(Watermark mark) throws Exception {
    +		for (HeapInternalTimerService<?, ?> service : timerServices.values()) {
    +			service.advanceWatermark(mark.getTimestamp());
    +		}
    +		output.emitWatermark(mark);
    +	}
    +
    +	public void processWatermark1(Watermark mark) throws Exception {
    +		input1Watermark = mark.getTimestamp();
    +		long newMin = Math.min(input1Watermark, input2Watermark);
    +		if (newMin > combinedWatermark) {
    +			combinedWatermark = newMin;
    +			processWatermark(new Watermark(combinedWatermark));
    +		}
    +	}
    +
    +	public void processWatermark2(Watermark mark) throws Exception {
    --- End diff --
    
    As a general comment, somehow I don't like how two cases (one and two imputs) are hardcoded here. 
    
    For a minimum improvement, is having one method with an index an option, that maybe accesses an array of longs (even though it is either size 1 or 2)? At very last, half of the method is duplicate code with its sibling and could be moved into a helper method.
    
    Also I wonder if the combined watermark should be a member or just computed on the fly. As is, i think it is never queried except when its value a watermark changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84038706
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
    @@ -390,4 +425,141 @@ public void close() {
     			output.close();
     		}
     	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Watermark handling
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Returns a {@link InternalTimerService} that can be used to query current processing time
    +	 * and event time and to set timers. An operator can have several timer services, where
    +	 * each has its own namespace serializer. Timer services are differentiated by the string
    +	 * key that is given when requesting them, if you call this method with the same key
    +	 * multiple times you will get the same timer service instance in subsequent requests.
    +	 *
    +	 * <p>Timers are always scoped to a key, the currently active key of a keyed stream operation.
    +	 * When a timer fires, this key will also be set as the currently active key.
    +	 *
    +	 * <p>Each timer has attached metadata, the namespace. Different timer services
    +	 * can have a different namespace type. If you don't need namespace differentiation you
    +	 * can use {@link VoidNamespaceSerializer} as the namespace serializer.
    +	 *
    +	 * @param name The name of the requested timer service. If no service exists under the given
    +	 *             name a new one will be created and returned.
    +	 * @param keySerializer {@code TypeSerializer} for the keys of the timers.
    +	 * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
    +	 * @param triggerable The {@link Triggerable} that should be invoked when timers fire
    +	 *
    +	 * @param <K> The type of the timer keys.
    +	 * @param <N> The type of the timer namespace.
    +	 */
    +	public <K, N> InternalTimerService<N> getInternalTimerService(
    +			String name,
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			Triggerable<K, N> triggerable) {
    +
    +		@SuppressWarnings("unchecked")
    +		HeapInternalTimerService<K, N> service = (HeapInternalTimerService<K, N>) timerServices.get(name);
    +
    +		if (service == null) {
    +			if (restoredServices != null && restoredServices.containsKey(name)) {
    +				@SuppressWarnings("unchecked")
    +				HeapInternalTimerService.RestoredTimers<K, N> restoredService =
    +						(HeapInternalTimerService.RestoredTimers<K, N>) restoredServices.remove(name);
    +
    +				service = new HeapInternalTimerService<>(
    +						keySerializer,
    +						namespaceSerializer,
    +						triggerable,
    +						this,
    +						getRuntimeContext().getProcessingTimeService(),
    +						restoredService);
    +
    +			} else {
    +				service = new HeapInternalTimerService<>(
    +						keySerializer,
    +						namespaceSerializer,
    +						triggerable,
    +						this,
    +						getRuntimeContext().getProcessingTimeService());
    +			}
    +			timerServices.put(name, service);
    +		}
    +
    +		return service;
    +	}
    +
    +	public void processWatermark(Watermark mark) throws Exception {
    +		for (HeapInternalTimerService<?, ?> service : timerServices.values()) {
    +			service.advanceWatermark(mark.getTimestamp());
    +		}
    +		output.emitWatermark(mark);
    +	}
    +
    +	public void processWatermark1(Watermark mark) throws Exception {
    +		input1Watermark = mark.getTimestamp();
    +		long newMin = Math.min(input1Watermark, input2Watermark);
    +		if (newMin > combinedWatermark) {
    +			combinedWatermark = newMin;
    +			processWatermark(new Watermark(combinedWatermark));
    +		}
    +	}
    +
    +	public void processWatermark2(Watermark mark) throws Exception {
    --- End diff --
    
    Jip, I'm already trying to address that in another PR for changing the operator hierarchy. \U0001f604 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r83861268
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
    @@ -390,4 +425,141 @@ public void close() {
     			output.close();
     		}
     	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Watermark handling
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Returns a {@link InternalTimerService} that can be used to query current processing time
    +	 * and event time and to set timers. An operator can have several timer services, where
    +	 * each has its own namespace serializer. Timer services are differentiated by the string
    +	 * key that is given when requesting them, if you call this method with the same key
    +	 * multiple times you will get the same timer service instance in subsequent requests.
    +	 *
    +	 * <p>Timers are always scoped to a key, the currently active key of a keyed stream operation.
    +	 * When a timer fires, this key will also be set as the currently active key.
    +	 *
    +	 * <p>Each timer has attached metadata, the namespace. Different timer services
    +	 * can have a different namespace type. If you don't need namespace differentiation you
    +	 * can use {@link VoidNamespaceSerializer} as the namespace serializer.
    +	 *
    +	 * @param name The name of the requested timer service. If no service exists under the given
    +	 *             name a new one will be created and returned.
    +	 * @param keySerializer {@code TypeSerializer} for the keys of the timers.
    +	 * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
    +	 * @param triggerable The {@link Triggerable} that should be invoked when timers fire
    +	 *
    +	 * @param <K> The type of the timer keys.
    +	 * @param <N> The type of the timer namespace.
    +	 */
    +	public <K, N> InternalTimerService<N> getInternalTimerService(
    +			String name,
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			Triggerable<K, N> triggerable) {
    +
    +		@SuppressWarnings("unchecked")
    +		HeapInternalTimerService<K, N> service = (HeapInternalTimerService<K, N>) timerServices.get(name);
    +
    +		if (service == null) {
    +			if (restoredServices != null && restoredServices.containsKey(name)) {
    +				@SuppressWarnings("unchecked")
    +				HeapInternalTimerService.RestoredTimers<K, N> restoredService =
    --- End diff --
    
    `contains()` + `remove()` seems a bit redundant for this use. I would just always remove and check the return value for null. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84038623
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
    @@ -390,4 +425,141 @@ public void close() {
     			output.close();
     		}
     	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Watermark handling
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Returns a {@link InternalTimerService} that can be used to query current processing time
    +	 * and event time and to set timers. An operator can have several timer services, where
    +	 * each has its own namespace serializer. Timer services are differentiated by the string
    +	 * key that is given when requesting them, if you call this method with the same key
    +	 * multiple times you will get the same timer service instance in subsequent requests.
    +	 *
    +	 * <p>Timers are always scoped to a key, the currently active key of a keyed stream operation.
    +	 * When a timer fires, this key will also be set as the currently active key.
    +	 *
    +	 * <p>Each timer has attached metadata, the namespace. Different timer services
    +	 * can have a different namespace type. If you don't need namespace differentiation you
    +	 * can use {@link VoidNamespaceSerializer} as the namespace serializer.
    +	 *
    +	 * @param name The name of the requested timer service. If no service exists under the given
    +	 *             name a new one will be created and returned.
    +	 * @param keySerializer {@code TypeSerializer} for the keys of the timers.
    +	 * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
    +	 * @param triggerable The {@link Triggerable} that should be invoked when timers fire
    +	 *
    +	 * @param <K> The type of the timer keys.
    +	 * @param <N> The type of the timer namespace.
    +	 */
    +	public <K, N> InternalTimerService<N> getInternalTimerService(
    +			String name,
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			Triggerable<K, N> triggerable) {
    +
    +		@SuppressWarnings("unchecked")
    +		HeapInternalTimerService<K, N> service = (HeapInternalTimerService<K, N>) timerServices.get(name);
    +
    +		if (service == null) {
    +			if (restoredServices != null && restoredServices.containsKey(name)) {
    +				@SuppressWarnings("unchecked")
    +				HeapInternalTimerService.RestoredTimers<K, N> restoredService =
    --- End diff --
    
    I can replace it by this but not sure if it's more readable:
    ```
    @SuppressWarnings("unchecked")
    HeapInternalTimerService.RestoredTimers<K, N> restoredService =
      restoredServices == null ? null : (HeapInternalTimerService.RestoredTimers<K, N>) restoredServices.remove(name);
    
    if (restoredService != null) {
      ...
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84251732
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java ---
    @@ -0,0 +1,317 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.HashSet;
    +import java.util.PriorityQueue;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * {@link InternalTimerService} that stores timers on the Java heap.
    + */
    +public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, ProcessingTimeCallback {
    +
    +	private final TypeSerializer<K> keySerializer;
    +
    +	private final TypeSerializer<N> namespaceSerializer;
    +
    +	private final ProcessingTimeService processingTimeService;
    +
    +	private long currentWatermark = Long.MIN_VALUE;
    +
    +	private final org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget;
    +
    +	private final KeyContext keyContext;
    +
    +	/**
    +	 * Processing time timers that are currently in-flight.
    +	 */
    +	private final PriorityQueue<InternalTimer<K, N>> processingTimeTimersQueue;
    +	private final Set<InternalTimer<K, N>> processingTimeTimers;
    +
    +	protected ScheduledFuture<?> nextTimer = null;
    +
    +	/**
    +	 * Currently waiting watermark callbacks.
    +	 */
    +	private final Set<InternalTimer<K, N>> watermarkTimers;
    +	private final PriorityQueue<InternalTimer<K, N>> watermarkTimersQueue;
    +
    +	public HeapInternalTimerService(
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService) {
    +		this.keySerializer = checkNotNull(keySerializer);
    +		this.namespaceSerializer = checkNotNull(namespaceSerializer);
    +		this.triggerTarget = checkNotNull(triggerTarget);
    +		this.keyContext = keyContext;
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +
    +		watermarkTimers = new HashSet<>();
    --- End diff --
    
    I'm renaming


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r83855317
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java ---
    @@ -0,0 +1,442 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.streaming.api.TimeDomain;
    +import org.apache.flink.streaming.api.TimerService;
    +import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
    +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +import org.apache.flink.streaming.util.TestHarnessUtil;
    +import org.apache.flink.util.Collector;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Test;
    +
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Tests {@link StreamTimelyFlatMap}.
    + */
    +public class TimelyFlatMapTest extends TestLogger {
    +
    +	@Test
    +	public void testCurrentEventTime() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processWatermark(new Watermark(17));
    +		testHarness.processElement(new StreamRecord<>(5, 12L));
    +
    +		testHarness.processWatermark(new Watermark(42));
    +		testHarness.processElement(new StreamRecord<>(6, 13L));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new Watermark(17L));
    +		expectedOutput.add(new StreamRecord<>("5WM:17", 12L));
    +		expectedOutput.add(new Watermark(42L));
    +		expectedOutput.add(new StreamRecord<>("6WM:42", 13L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	@Test
    +	public void testCurrentProcessingTime() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.setProcessingTime(17);
    +		testHarness.processElement(new StreamRecord<>(5));
    +
    +		testHarness.setProcessingTime(42);
    +		testHarness.processElement(new StreamRecord<>(6));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>("5PT:17"));
    +		expectedOutput.add(new StreamRecord<>("6PT:42"));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	@Test
    +	public void testEventTimeTimers() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, Integer> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new EventTimeTriggeringFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>(17, 42L));
    +
    +		testHarness.processWatermark(new Watermark(5));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>(17, 42L));
    +		expectedOutput.add(new StreamRecord<>(1777, 5L));
    +		expectedOutput.add(new Watermark(5L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	@Test
    +	public void testProcessingTimeTimers() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, Integer> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeTriggeringFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>(17));
    +
    +		testHarness.setProcessingTime(5);
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>(17));
    +		expectedOutput.add(new StreamRecord<>(1777, 5L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	/**
    +	 * Verifies that we don't have leakage between different keys.
    +	 */
    +	@Test
    +	public void testEventTimeTimerWithState() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new EventTimeTriggeringStatefulFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processWatermark(new Watermark(1));
    +		testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6
    +
    +		testHarness.processWatermark(new Watermark(2));
    +		testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7
    +
    +		testHarness.processWatermark(new Watermark(6));
    +		testHarness.processWatermark(new Watermark(7));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new Watermark(1L));
    +		expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
    +		expectedOutput.add(new Watermark(2L));
    +		expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
    +		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
    +		expectedOutput.add(new Watermark(6L));
    +		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
    +		expectedOutput.add(new Watermark(7L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	/**
    +	 * Verifies that we don't have leakage between different keys.
    +	 */
    +	@Test
    +	public void testProcessingTimeTimerWithState() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeTriggeringStatefulFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.setProcessingTime(1);
    +		testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6
    +
    +		testHarness.setProcessingTime(2);
    +		testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7
    +
    +		testHarness.setProcessingTime(6);
    +		testHarness.setProcessingTime(7);
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>("INPUT:17"));
    +		expectedOutput.add(new StreamRecord<>("INPUT:42"));
    +		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
    +		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	@Test
    +	public void testSnapshotAndRestore() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>(5, 12L));
    +
    +		// snapshot and restore from scratch
    +		StreamStateHandle snapshot = testHarness.snapshot(0, 0);
    +
    +		testHarness.close();
    +
    +		operator = new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
    +
    +		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.restore(snapshot);
    +		testHarness.open();
    +
    +		testHarness.setProcessingTime(5);
    +		testHarness.processWatermark(new Watermark(6));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
    +		expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
    +		expectedOutput.add(new Watermark(6));
    +
    +		System.out.println("GOT: " + testHarness.getOutput());
    --- End diff --
    
    This should be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r83860732
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java ---
    @@ -0,0 +1,509 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.common.typeutils.base.StringSerializer;
    +import org.apache.flink.streaming.api.windowing.triggers.Trigger;
    +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
    +import org.junit.Test;
    +import org.mockito.Matchers;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.InputStream;
    +
    +import static org.hamcrest.Matchers.containsInAnyOrder;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertThat;
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Matchers.contains;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.*;
    +
    +/**
    + * Tests for {@link HeapInternalTimerService}.
    + */
    +public class HeapInternalTimerServiceTest {
    +
    +	private static InternalTimer<Integer, String> anyInternalTimer() {
    +		return any();
    +	}
    +
    +	/**
    +	 * Verify that we only ever have one processing-time task registered at the
    +	 * {@link ProcessingTimeService}.
    +	 */
    +	@Test
    +	public void testOnlySetsOnePhysicalProcessingTimeTimer() throws Exception {
    +		@SuppressWarnings("unchecked")
    +		Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
    +
    +		TestKeyContext keyContext = new TestKeyContext();
    +
    +		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
    +
    +		HeapInternalTimerService<Integer, String> timerService =
    +				createTimerService(mockTriggerable, keyContext, processingTimeService);
    +
    +		keyContext.setCurrentKey(0);
    +
    +		timerService.registerProcessingTimeTimer("ciao", 10);
    +		timerService.registerProcessingTimeTimer("ciao", 20);
    +		timerService.registerProcessingTimeTimer("ciao", 30);
    +		timerService.registerProcessingTimeTimer("hello", 10);
    +		timerService.registerProcessingTimeTimer("hello", 20);
    +
    +		assertEquals(5, timerService.numProcessingTimeTimers());
    +		assertEquals(2, timerService.numProcessingTimeTimers("hello"));
    +		assertEquals(3, timerService.numProcessingTimeTimers("ciao"));
    +
    +		assertEquals(1, processingTimeService.getNumRegisteredTimers());
    +		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L));
    +
    +		processingTimeService.setCurrentTime(10);
    +
    +		assertEquals(3, timerService.numProcessingTimeTimers());
    +		assertEquals(1, timerService.numProcessingTimeTimers("hello"));
    +		assertEquals(2, timerService.numProcessingTimeTimers("ciao"));
    +
    +		assertEquals(1, processingTimeService.getNumRegisteredTimers());
    +		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L));
    +
    +		processingTimeService.setCurrentTime(20);
    +
    +		assertEquals(1, timerService.numProcessingTimeTimers());
    +		assertEquals(0, timerService.numProcessingTimeTimers("hello"));
    +		assertEquals(1, timerService.numProcessingTimeTimers("ciao"));
    +
    +		assertEquals(1, processingTimeService.getNumRegisteredTimers());
    +		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(30L));
    +
    +		processingTimeService.setCurrentTime(30);
    +
    +		assertEquals(0, timerService.numProcessingTimeTimers());
    +
    +		assertEquals(0, processingTimeService.getNumRegisteredTimers());
    +	}
    --- End diff --
    
    Could have an additional check that it `getNumRegisteredTimers()` goes again back up to 1 if we start registering a timer again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r83856028
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java ---
    @@ -0,0 +1,442 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.streaming.api.TimeDomain;
    +import org.apache.flink.streaming.api.TimerService;
    +import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
    +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +import org.apache.flink.streaming.util.TestHarnessUtil;
    +import org.apache.flink.util.Collector;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Test;
    +
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Tests {@link StreamTimelyFlatMap}.
    + */
    +public class TimelyFlatMapTest extends TestLogger {
    +
    +	@Test
    +	public void testCurrentEventTime() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processWatermark(new Watermark(17));
    +		testHarness.processElement(new StreamRecord<>(5, 12L));
    +
    +		testHarness.processWatermark(new Watermark(42));
    +		testHarness.processElement(new StreamRecord<>(6, 13L));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new Watermark(17L));
    +		expectedOutput.add(new StreamRecord<>("5WM:17", 12L));
    +		expectedOutput.add(new Watermark(42L));
    +		expectedOutput.add(new StreamRecord<>("6WM:42", 13L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	@Test
    +	public void testCurrentProcessingTime() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.setProcessingTime(17);
    +		testHarness.processElement(new StreamRecord<>(5));
    +
    +		testHarness.setProcessingTime(42);
    +		testHarness.processElement(new StreamRecord<>(6));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>("5PT:17"));
    +		expectedOutput.add(new StreamRecord<>("6PT:42"));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	@Test
    +	public void testEventTimeTimers() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, Integer> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new EventTimeTriggeringFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>(17, 42L));
    +
    +		testHarness.processWatermark(new Watermark(5));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>(17, 42L));
    +		expectedOutput.add(new StreamRecord<>(1777, 5L));
    +		expectedOutput.add(new Watermark(5L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	@Test
    +	public void testProcessingTimeTimers() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, Integer> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeTriggeringFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>(17));
    +
    +		testHarness.setProcessingTime(5);
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>(17));
    +		expectedOutput.add(new StreamRecord<>(1777, 5L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	/**
    +	 * Verifies that we don't have leakage between different keys.
    +	 */
    +	@Test
    +	public void testEventTimeTimerWithState() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new EventTimeTriggeringStatefulFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processWatermark(new Watermark(1));
    +		testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6
    +
    +		testHarness.processWatermark(new Watermark(2));
    +		testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7
    +
    +		testHarness.processWatermark(new Watermark(6));
    +		testHarness.processWatermark(new Watermark(7));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new Watermark(1L));
    +		expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
    +		expectedOutput.add(new Watermark(2L));
    +		expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
    +		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
    +		expectedOutput.add(new Watermark(6L));
    +		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
    +		expectedOutput.add(new Watermark(7L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	/**
    +	 * Verifies that we don't have leakage between different keys.
    +	 */
    +	@Test
    +	public void testProcessingTimeTimerWithState() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeTriggeringStatefulFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.setProcessingTime(1);
    +		testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6
    +
    +		testHarness.setProcessingTime(2);
    +		testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7
    +
    +		testHarness.setProcessingTime(6);
    +		testHarness.setProcessingTime(7);
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>("INPUT:17"));
    +		expectedOutput.add(new StreamRecord<>("INPUT:42"));
    +		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
    +		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	@Test
    +	public void testSnapshotAndRestore() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>(5, 12L));
    +
    +		// snapshot and restore from scratch
    +		StreamStateHandle snapshot = testHarness.snapshot(0, 0);
    +
    +		testHarness.close();
    +
    +		operator = new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
    +
    +		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.restore(snapshot);
    +		testHarness.open();
    +
    +		testHarness.setProcessingTime(5);
    +		testHarness.processWatermark(new Watermark(6));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
    +		expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
    +		expectedOutput.add(new Watermark(6));
    +
    +		System.out.println("GOT: " + testHarness.getOutput());
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
    +		private static final long serialVersionUID = 1L;
    +
    +		@Override
    +		public T getKey(T value) throws Exception {
    +			return value;
    +		}
    +	}
    +
    +	private static class WatermarkQueryingFlatMapFunction implements TimelyFlatMapFunction<Integer, String> {
    +
    +		private static final long serialVersionUID = 1L;
    +
    +		@Override
    +		public void flatMap(Integer value, TimerService timerService, Collector<String> out) throws Exception {
    +			out.collect(value + "WM:" + timerService.currentEventTime());
    +		}
    +
    +		@Override
    +		public void onTimer(
    +				long timestamp,
    +				TimeDomain timeDomain,
    +				TimerService timerService,
    +				Collector<String> out) throws Exception {
    +		}
    +	}
    +
    +	private static class EventTimeTriggeringFlatMapFunction implements TimelyFlatMapFunction<Integer, Integer> {
    +
    +		private static final long serialVersionUID = 1L;
    +
    +		@Override
    +		public void flatMap(Integer value, TimerService timerService, Collector<Integer> out) throws Exception {
    +			out.collect(value);
    +			timerService.registerEventTimeTimer(5);
    +		}
    +
    +		@Override
    +		public void onTimer(
    +				long timestamp,
    +				TimeDomain timeDomain,
    +				TimerService timerService,
    +				Collector<Integer> out) throws Exception {
    +
    +			assertEquals(TimeDomain.EVENT_TIME, timeDomain);
    +			out.collect(1777);
    +		}
    +	}
    +
    +	private static class EventTimeTriggeringStatefulFlatMapFunction extends RichTimelyFlatMapFunction<Integer, String> {
    +
    +		private static final long serialVersionUID = 1L;
    +
    +		private final ValueStateDescriptor<Integer> state =
    +				new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE, null);
    +
    +		@Override
    +		public void flatMap(Integer value, TimerService timerService, Collector<String> out) throws Exception {
    +			out.collect("INPUT:" + value);
    +			getRuntimeContext().getState(state).update(value);
    +			timerService.registerEventTimeTimer(timerService.currentEventTime() + 5);
    +		}
    +
    +		@Override
    +		public void onTimer(
    +				long timestamp,
    +				TimeDomain timeDomain,
    +				TimerService timerService,
    +				Collector<String> out) throws Exception {
    +			assertEquals(TimeDomain.EVENT_TIME, timeDomain);
    +			out.collect("STATE:" + getRuntimeContext().getState(state).value());
    +		}
    +	}
    +
    +	private static class ProcessingTimeTriggeringFlatMapFunction implements TimelyFlatMapFunction<Integer, Integer> {
    --- End diff --
    
    Mapper classes for processing time and event time are almost identical? I think some duplicate code can be reduced by making the time domain a construction parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84236659
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java ---
    @@ -0,0 +1,317 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.HashSet;
    +import java.util.PriorityQueue;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * {@link InternalTimerService} that stores timers on the Java heap.
    + */
    +public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, ProcessingTimeCallback {
    +
    +	private final TypeSerializer<K> keySerializer;
    +
    +	private final TypeSerializer<N> namespaceSerializer;
    +
    +	private final ProcessingTimeService processingTimeService;
    +
    +	private long currentWatermark = Long.MIN_VALUE;
    +
    +	private final org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget;
    +
    +	private final KeyContext keyContext;
    +
    +	/**
    +	 * Processing time timers that are currently in-flight.
    +	 */
    +	private final PriorityQueue<InternalTimer<K, N>> processingTimeTimersQueue;
    +	private final Set<InternalTimer<K, N>> processingTimeTimers;
    +
    +	protected ScheduledFuture<?> nextTimer = null;
    +
    +	/**
    +	 * Currently waiting watermark callbacks.
    +	 */
    +	private final Set<InternalTimer<K, N>> watermarkTimers;
    +	private final PriorityQueue<InternalTimer<K, N>> watermarkTimersQueue;
    +
    +	public HeapInternalTimerService(
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService) {
    +		this.keySerializer = checkNotNull(keySerializer);
    +		this.namespaceSerializer = checkNotNull(namespaceSerializer);
    +		this.triggerTarget = checkNotNull(triggerTarget);
    +		this.keyContext = keyContext;
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +
    +		watermarkTimers = new HashSet<>();
    +		watermarkTimersQueue = new PriorityQueue<>(100);
    +
    +		processingTimeTimers = new HashSet<>();
    +		processingTimeTimersQueue = new PriorityQueue<>(100);
    +	}
    +
    +	public HeapInternalTimerService(
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			RestoredTimers<K, N> restoredTimers) {
    +
    +		this.keySerializer = checkNotNull(keySerializer);
    +		this.namespaceSerializer = checkNotNull(namespaceSerializer);
    +		this.triggerTarget = checkNotNull(triggerTarget);
    +		this.keyContext = keyContext;
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +
    +		watermarkTimers = restoredTimers.watermarkTimers;
    +		watermarkTimersQueue = restoredTimers.watermarkTimersQueue;
    +
    +		processingTimeTimers = restoredTimers.processingTimeTimers;
    +		processingTimeTimersQueue = restoredTimers.processingTimeTimersQueue;
    +
    +		// re-register the restored timers (if any)
    +		if (processingTimeTimersQueue.size() > 0) {
    +			nextTimer =
    +					processingTimeService.registerTimer(processingTimeTimersQueue.peek().getTimestamp(), this);
    +		}
    +	}
    +
    +
    +	@Override
    +	public long currentProcessingTime() {
    +		return processingTimeService.getCurrentProcessingTime();
    +	}
    +
    +	@Override
    +	public long currentWatermark() {
    +		return currentWatermark;
    +	}
    +
    +	@Override
    +	public void registerProcessingTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +
    +		// make sure we only put one timer per key into the queue
    +		if (processingTimeTimers.add(timer)) {
    +
    +			InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
    +			long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
    +
    +			processingTimeTimersQueue.add(timer);
    +
    +			// check if we need to re-schedule our timer to earlier
    +			if (time < nextTriggerTime) {
    +				if (nextTimer != null) {
    +					nextTimer.cancel(false);
    +				}
    +				nextTimer = processingTimeService.registerTimer(time, this);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void registerEventTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +		if (watermarkTimers.add(timer)) {
    +			watermarkTimersQueue.add(timer);
    +		}
    +	}
    +
    +	@Override
    +	public void deleteProcessingTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +
    +		if (processingTimeTimers.remove(timer)) {
    +			processingTimeTimersQueue.remove(timer);
    +		}
    +	}
    +
    +	@Override
    +	public void deleteEventTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +		if (watermarkTimers.remove(timer)) {
    +			watermarkTimersQueue.remove(timer);
    +		}
    +	}
    +
    +	@Override
    +	public void onProcessingTime(long time) throws Exception {
    +		// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
    +		// inside the callback.
    +		nextTimer = null;
    +
    +		InternalTimer<K, N> timer;
    +
    +		while ((timer  = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
    +
    +			processingTimeTimers.remove(timer);
    +			processingTimeTimersQueue.remove();
    +
    +			keyContext.setCurrentKey(timer.getKey());
    +			triggerTarget.onProcessingTime(timer);
    +		}
    +
    +		if (timer != null) {
    +			if (nextTimer == null) {
    +				nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
    +			}
    +		}
    +	}
    +
    +	public void advanceWatermark(long time) throws Exception {
    +		currentWatermark = time;
    +
    +		InternalTimer<K, N> timer = watermarkTimersQueue.peek();
    +
    +		while (timer != null && timer.getTimestamp() <= time) {
    --- End diff --
    
    Here you could `peek()` inside the the `while(...)`, as in the `trigger()` method above, so that you avoid having to peek outside and at the end of the loop body.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84251918
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java ---
    @@ -0,0 +1,317 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.HashSet;
    +import java.util.PriorityQueue;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * {@link InternalTimerService} that stores timers on the Java heap.
    + */
    +public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, ProcessingTimeCallback {
    +
    +	private final TypeSerializer<K> keySerializer;
    +
    +	private final TypeSerializer<N> namespaceSerializer;
    +
    +	private final ProcessingTimeService processingTimeService;
    +
    +	private long currentWatermark = Long.MIN_VALUE;
    +
    +	private final org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget;
    +
    +	private final KeyContext keyContext;
    +
    +	/**
    +	 * Processing time timers that are currently in-flight.
    +	 */
    +	private final PriorityQueue<InternalTimer<K, N>> processingTimeTimersQueue;
    +	private final Set<InternalTimer<K, N>> processingTimeTimers;
    +
    +	protected ScheduledFuture<?> nextTimer = null;
    +
    +	/**
    +	 * Currently waiting watermark callbacks.
    +	 */
    +	private final Set<InternalTimer<K, N>> watermarkTimers;
    +	private final PriorityQueue<InternalTimer<K, N>> watermarkTimersQueue;
    +
    +	public HeapInternalTimerService(
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService) {
    +		this.keySerializer = checkNotNull(keySerializer);
    +		this.namespaceSerializer = checkNotNull(namespaceSerializer);
    +		this.triggerTarget = checkNotNull(triggerTarget);
    +		this.keyContext = keyContext;
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +
    +		watermarkTimers = new HashSet<>();
    +		watermarkTimersQueue = new PriorityQueue<>(100);
    +
    +		processingTimeTimers = new HashSet<>();
    +		processingTimeTimersQueue = new PriorityQueue<>(100);
    +	}
    +
    +	public HeapInternalTimerService(
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			RestoredTimers<K, N> restoredTimers) {
    +
    +		this.keySerializer = checkNotNull(keySerializer);
    +		this.namespaceSerializer = checkNotNull(namespaceSerializer);
    +		this.triggerTarget = checkNotNull(triggerTarget);
    +		this.keyContext = keyContext;
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +
    +		watermarkTimers = restoredTimers.watermarkTimers;
    +		watermarkTimersQueue = restoredTimers.watermarkTimersQueue;
    +
    +		processingTimeTimers = restoredTimers.processingTimeTimers;
    +		processingTimeTimersQueue = restoredTimers.processingTimeTimersQueue;
    +
    +		// re-register the restored timers (if any)
    +		if (processingTimeTimersQueue.size() > 0) {
    +			nextTimer =
    +					processingTimeService.registerTimer(processingTimeTimersQueue.peek().getTimestamp(), this);
    +		}
    +	}
    +
    +
    +	@Override
    +	public long currentProcessingTime() {
    +		return processingTimeService.getCurrentProcessingTime();
    +	}
    +
    +	@Override
    +	public long currentWatermark() {
    +		return currentWatermark;
    +	}
    +
    +	@Override
    +	public void registerProcessingTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +
    +		// make sure we only put one timer per key into the queue
    +		if (processingTimeTimers.add(timer)) {
    +
    +			InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
    +			long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
    +
    +			processingTimeTimersQueue.add(timer);
    +
    +			// check if we need to re-schedule our timer to earlier
    +			if (time < nextTriggerTime) {
    +				if (nextTimer != null) {
    +					nextTimer.cancel(false);
    +				}
    +				nextTimer = processingTimeService.registerTimer(time, this);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void registerEventTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +		if (watermarkTimers.add(timer)) {
    +			watermarkTimersQueue.add(timer);
    +		}
    +	}
    +
    +	@Override
    +	public void deleteProcessingTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +
    +		if (processingTimeTimers.remove(timer)) {
    +			processingTimeTimersQueue.remove(timer);
    +		}
    +	}
    +
    +	@Override
    +	public void deleteEventTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +		if (watermarkTimers.remove(timer)) {
    +			watermarkTimersQueue.remove(timer);
    +		}
    +	}
    +
    +	@Override
    +	public void onProcessingTime(long time) throws Exception {
    +		// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
    +		// inside the callback.
    +		nextTimer = null;
    +
    +		InternalTimer<K, N> timer;
    +
    +		while ((timer  = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
    +
    +			processingTimeTimers.remove(timer);
    +			processingTimeTimersQueue.remove();
    +
    +			keyContext.setCurrentKey(timer.getKey());
    +			triggerTarget.onProcessingTime(timer);
    +		}
    +
    +		if (timer != null) {
    +			if (nextTimer == null) {
    +				nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
    +			}
    +		}
    +	}
    +
    +	public void advanceWatermark(long time) throws Exception {
    +		currentWatermark = time;
    +
    +		InternalTimer<K, N> timer = watermarkTimersQueue.peek();
    +
    +		while (timer != null && timer.getTimestamp() <= time) {
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84036274
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java ---
    @@ -0,0 +1,60 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.annotation.Internal;
    +
    +/**
    + * Interface for working with time and timers.
    + *
    + * <p>This is the internal version of {@link org.apache.flink.streaming.api.TimerService}
    + * that allows to specify a key and a namespace to which timers should be scoped.
    + *
    + * @param <N> Type of the namespace to which timers are scoped.
    + */
    +@Internal
    +public interface InternalTimerService<N> {
    +
    +	/** Returns the current processing time. */
    +	long currentProcessingTime();
    +
    +	/** Returns the current event time. */
    +	long currentWatermark();
    --- End diff --
    
    Jip, will rename everything to `currentWatermark()` because that's also what we already have in other places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r83860194
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java ---
    @@ -0,0 +1,325 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.streaming.runtime.operators.Triggerable;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.HashSet;
    +import java.util.PriorityQueue;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * {@link InternalTimerService} that stores timers on the Java heap.
    + */
    +public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, Triggerable {
    +
    +	private final TypeSerializer<K> keySerializer;
    +
    +	private final TypeSerializer<N> namespaceSerializer;
    +
    +	private final ProcessingTimeService processingTimeService;
    +
    +	private long currentWatermark = Long.MIN_VALUE;
    +
    +	private final org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget;
    +
    +	private final KeyContext keyContext;
    +
    +	/**
    +	 * Processing time timers that are currently in-flight.
    +	 */
    +	private final PriorityQueue<InternalTimer<K, N>> processingTimeTimersQueue;
    +	private final Set<InternalTimer<K, N>> processingTimeTimers;
    +
    +	protected ScheduledFuture<?> nextTimer = null;
    +
    +	/**
    +	 * Currently waiting watermark callbacks.
    +	 */
    +	private final Set<InternalTimer<K, N>> watermarkTimers;
    +	private final PriorityQueue<InternalTimer<K, N>> watermarkTimersQueue;
    +
    +	public HeapInternalTimerService(
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService) {
    +		this.keySerializer = checkNotNull(keySerializer);
    +		this.namespaceSerializer = checkNotNull(namespaceSerializer);
    +		this.triggerTarget = checkNotNull(triggerTarget);
    +		this.keyContext = keyContext;
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +
    +		watermarkTimers = new HashSet<>();
    +		watermarkTimersQueue = new PriorityQueue<>(100);
    +
    +		processingTimeTimers = new HashSet<>();
    +		processingTimeTimersQueue = new PriorityQueue<>(100);
    +	}
    +
    +	public HeapInternalTimerService(
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			RestoredTimers<K, N> restoredTimers) {
    +
    +		this.keySerializer = checkNotNull(keySerializer);
    +		this.namespaceSerializer = checkNotNull(namespaceSerializer);
    +		this.triggerTarget = checkNotNull(triggerTarget);
    +		this.keyContext = keyContext;
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +
    +		watermarkTimers = restoredTimers.watermarkTimers;
    +		watermarkTimersQueue = restoredTimers.watermarkTimersQueue;
    +
    +		processingTimeTimers = restoredTimers.processingTimeTimers;
    +		processingTimeTimersQueue = restoredTimers.processingTimeTimersQueue;
    +
    +		// re-register the restored timers (if any)
    +		if (processingTimeTimersQueue.size() > 0) {
    +			nextTimer =
    +					processingTimeService.registerTimer(processingTimeTimersQueue.peek().getTimestamp(), this);
    +		}
    +	}
    +
    +
    +	@Override
    +	public long currentProcessingTime() {
    +		return processingTimeService.getCurrentProcessingTime();
    +	}
    +
    +	@Override
    +	public long currentWatermark() {
    +		return currentWatermark;
    +	}
    +
    +	@Override
    +	public void registerProcessingTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +
    +		// make sure we only put one timer per key into the queue
    +		if (processingTimeTimers.add(timer)) {
    +
    +			InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
    +			long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
    +
    +			processingTimeTimersQueue.add(timer);
    +
    +			// check if we need to re-schedule our timer to earlier
    +			if (time < nextTriggerTime) {
    +				if (nextTimer != null) {
    +					nextTimer.cancel(false);
    +				}
    +				nextTimer = processingTimeService.registerTimer(time, this);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void registerEventTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +		if (watermarkTimers.add(timer)) {
    +			watermarkTimersQueue.add(timer);
    +		}
    +	}
    +
    +	@Override
    +	public void deleteProcessingTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +
    +		if (processingTimeTimers.remove(timer)) {
    +			processingTimeTimersQueue.remove(timer);
    +		}
    +	}
    +
    +	@Override
    +	public void deleteEventTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +		if (watermarkTimers.remove(timer)) {
    +			watermarkTimersQueue.remove(timer);
    +		}
    +	}
    +
    +	@Override
    +	public void trigger(long time) throws Exception {
    +		// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
    +		// inside the callback.
    +		nextTimer = null;
    +
    +		InternalTimer<K, N> timer;
    +
    +		while ((timer  = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
    +			timer = processingTimeTimersQueue.peek();
    --- End diff --
    
    Redundant `peek()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha closed the pull request at:

    https://github.com/apache/flink/pull/2570


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84251996
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java ---
    @@ -0,0 +1,317 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.HashSet;
    +import java.util.PriorityQueue;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * {@link InternalTimerService} that stores timers on the Java heap.
    + */
    +public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, ProcessingTimeCallback {
    +
    +	private final TypeSerializer<K> keySerializer;
    +
    +	private final TypeSerializer<N> namespaceSerializer;
    +
    +	private final ProcessingTimeService processingTimeService;
    +
    +	private long currentWatermark = Long.MIN_VALUE;
    +
    +	private final org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget;
    +
    +	private final KeyContext keyContext;
    +
    +	/**
    +	 * Processing time timers that are currently in-flight.
    +	 */
    +	private final PriorityQueue<InternalTimer<K, N>> processingTimeTimersQueue;
    +	private final Set<InternalTimer<K, N>> processingTimeTimers;
    +
    +	protected ScheduledFuture<?> nextTimer = null;
    +
    +	/**
    +	 * Currently waiting watermark callbacks.
    +	 */
    +	private final Set<InternalTimer<K, N>> watermarkTimers;
    +	private final PriorityQueue<InternalTimer<K, N>> watermarkTimersQueue;
    +
    +	public HeapInternalTimerService(
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService) {
    +		this.keySerializer = checkNotNull(keySerializer);
    +		this.namespaceSerializer = checkNotNull(namespaceSerializer);
    +		this.triggerTarget = checkNotNull(triggerTarget);
    +		this.keyContext = keyContext;
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +
    +		watermarkTimers = new HashSet<>();
    +		watermarkTimersQueue = new PriorityQueue<>(100);
    +
    +		processingTimeTimers = new HashSet<>();
    +		processingTimeTimersQueue = new PriorityQueue<>(100);
    +	}
    +
    +	public HeapInternalTimerService(
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			RestoredTimers<K, N> restoredTimers) {
    +
    +		this.keySerializer = checkNotNull(keySerializer);
    +		this.namespaceSerializer = checkNotNull(namespaceSerializer);
    +		this.triggerTarget = checkNotNull(triggerTarget);
    +		this.keyContext = keyContext;
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +
    +		watermarkTimers = restoredTimers.watermarkTimers;
    +		watermarkTimersQueue = restoredTimers.watermarkTimersQueue;
    +
    +		processingTimeTimers = restoredTimers.processingTimeTimers;
    +		processingTimeTimersQueue = restoredTimers.processingTimeTimersQueue;
    +
    +		// re-register the restored timers (if any)
    +		if (processingTimeTimersQueue.size() > 0) {
    +			nextTimer =
    +					processingTimeService.registerTimer(processingTimeTimersQueue.peek().getTimestamp(), this);
    +		}
    +	}
    +
    +
    +	@Override
    +	public long currentProcessingTime() {
    +		return processingTimeService.getCurrentProcessingTime();
    +	}
    +
    +	@Override
    +	public long currentWatermark() {
    +		return currentWatermark;
    +	}
    +
    +	@Override
    +	public void registerProcessingTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +
    +		// make sure we only put one timer per key into the queue
    +		if (processingTimeTimers.add(timer)) {
    +
    +			InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
    +			long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
    +
    +			processingTimeTimersQueue.add(timer);
    +
    +			// check if we need to re-schedule our timer to earlier
    +			if (time < nextTriggerTime) {
    +				if (nextTimer != null) {
    +					nextTimer.cancel(false);
    +				}
    +				nextTimer = processingTimeService.registerTimer(time, this);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void registerEventTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +		if (watermarkTimers.add(timer)) {
    +			watermarkTimersQueue.add(timer);
    +		}
    +	}
    +
    +	@Override
    +	public void deleteProcessingTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +
    +		if (processingTimeTimers.remove(timer)) {
    +			processingTimeTimersQueue.remove(timer);
    +		}
    +	}
    +
    +	@Override
    +	public void deleteEventTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +		if (watermarkTimers.remove(timer)) {
    +			watermarkTimersQueue.remove(timer);
    +		}
    +	}
    +
    +	@Override
    +	public void onProcessingTime(long time) throws Exception {
    +		// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
    +		// inside the callback.
    +		nextTimer = null;
    +
    +		InternalTimer<K, N> timer;
    +
    +		while ((timer  = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
    +
    +			processingTimeTimers.remove(timer);
    +			processingTimeTimersQueue.remove();
    +
    +			keyContext.setCurrentKey(timer.getKey());
    +			triggerTarget.onProcessingTime(timer);
    +		}
    +
    +		if (timer != null) {
    +			if (nextTimer == null) {
    +				nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
    +			}
    +		}
    +	}
    +
    +	public void advanceWatermark(long time) throws Exception {
    +		currentWatermark = time;
    +
    +		InternalTimer<K, N> timer = watermarkTimersQueue.peek();
    +
    +		while (timer != null && timer.getTimestamp() <= time) {
    +			watermarkTimers.remove(timer);
    +			watermarkTimersQueue.remove();
    +
    +			keyContext.setCurrentKey(timer.getKey());
    +			triggerTarget.onEventTime(timer);
    +
    +			timer = watermarkTimersQueue.peek();
    +		}
    +	}
    +
    +	public void snapshotTimers(OutputStream outStream) throws IOException {
    +		InstantiationUtil.serializeObject(outStream, keySerializer);
    +		InstantiationUtil.serializeObject(outStream, namespaceSerializer);
    +
    +		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(outStream);
    +
    --- End diff --
    
    Done



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r83857663
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.functions.Function;
    +import org.apache.flink.streaming.api.TimeDomain;
    +import org.apache.flink.streaming.api.TimerService;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.Serializable;
    +
    +/**
    + * Base interface for timely flatMap functions. FlatMap functions take elements and transform them,
    + * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists
    + * and arrays.
    + *
    + * <p>A {@code TimelyFlatMapFunction} can, in addition to the functionality of a normal
    + * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set timers and react
    + * to them firing.
    + *
    + * <pre>{@code
    + * DataStream<X> input = ...;
    + *
    + * DataStream<Y> result = input.flatMap(new MyTimelyFlatMapFunction());
    + * }</pre>
    + *
    + * @param <I> Type of the input elements.
    + * @param <O> Type of the returned elements.
    + */
    +@PublicEvolving
    +public interface TimelyFlatMapFunction<I, O> extends Function, Serializable {
    +
    +	/**
    +	 * The core method of the {@code TimelyFlatMapFunction}. Takes an element from the input data set and transforms
    +	 * it into zero, one, or more elements.
    +	 *
    +	 * @param value The input value.
    +	 * @param timerService A {@link TimerService} that allows setting timers and querying the
    +	 *                        current time.
    +	 * @param out The collector for returning result values.
    +	 *
    +	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
    +	 *                   to fail and may trigger recovery.
    +	 */
    +	void flatMap(I value, TimerService timerService, Collector<O> out) throws Exception;
    +
    +	/**
    +	 * Called when a timer set using {@link TimerService} fires.
    +	 *
    +	 * @param timestamp The timestamp of the firing timer.
    +	 * @param timeDomain The {@link TimeDomain} of the firing timer.
    +	 * @param timerService A {@link TimerService} that allows setting timers and querying the
    +	 *                        current time.
    +	 * @param out The collector for returning result values.
    +	 *
    +	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
    +	 *                   to fail and may trigger recovery.
    +	 */
    +	void onTimer(long timestamp, TimeDomain timeDomain, TimerService timerService, Collector<O> out) throws Exception ;
    --- End diff --
    
    I wonder if `TimeDomain` and `TimerService` should be parameter to methods in this interface. I assume both remain stable for the lifetime of the UDF and could be passed once in some init method that can also be preimplemented in a `RichTimelyFlatMapFunction`. Maybe there is a good reason against this, but I like to keep the number of parameters small when possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2570: [FLINK-3674] Add an interface for Time aware User Functio...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2570
  
    R: @StefanRRichter @kl0u could you please review again, I pushed some changes and rebased on top of master.
    
    Each commit can stand on its own and the purpose of each commit is documented in the commit message so you should probably review this commit by commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2570: [FLINK-3674] Add an interface for Time aware User Functio...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2570
  
    Thanks for your comments @StefanRRichter and @kl0u. I incorporated most of them by now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84035970
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.functions.Function;
    +import org.apache.flink.streaming.api.TimeDomain;
    +import org.apache.flink.streaming.api.TimerService;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.Serializable;
    +
    +/**
    + * Base interface for timely flatMap functions. FlatMap functions take elements and transform them,
    + * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists
    + * and arrays.
    + *
    + * <p>A {@code TimelyFlatMapFunction} can, in addition to the functionality of a normal
    + * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set timers and react
    + * to them firing.
    + *
    + * <pre>{@code
    + * DataStream<X> input = ...;
    + *
    + * DataStream<Y> result = input.flatMap(new MyTimelyFlatMapFunction());
    + * }</pre>
    + *
    + * @param <I> Type of the input elements.
    + * @param <O> Type of the returned elements.
    + */
    +@PublicEvolving
    +public interface TimelyFlatMapFunction<I, O> extends Function, Serializable {
    +
    +	/**
    +	 * The core method of the {@code TimelyFlatMapFunction}. Takes an element from the input data set and transforms
    +	 * it into zero, one, or more elements.
    +	 *
    +	 * @param value The input value.
    +	 * @param timerService A {@link TimerService} that allows setting timers and querying the
    +	 *                        current time.
    +	 * @param out The collector for returning result values.
    +	 *
    +	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
    +	 *                   to fail and may trigger recovery.
    +	 */
    +	void flatMap(I value, TimerService timerService, Collector<O> out) throws Exception;
    +
    +	/**
    +	 * Called when a timer set using {@link TimerService} fires.
    +	 *
    +	 * @param timestamp The timestamp of the firing timer.
    +	 * @param timeDomain The {@link TimeDomain} of the firing timer.
    +	 * @param timerService A {@link TimerService} that allows setting timers and querying the
    +	 *                        current time.
    +	 * @param out The collector for returning result values.
    +	 *
    +	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
    +	 *                   to fail and may trigger recovery.
    +	 */
    +	void onTimer(long timestamp, TimeDomain timeDomain, TimerService timerService, Collector<O> out) throws Exception ;
    --- End diff --
    
    I had it initially with an initialisation method that would get the `TimerService` but then went for this version because users don't have to have an extra field and store the timer service if they don't use a `RichFunction`.
    
    Another alternative would be to put all the parameters (timestamp, time domain, timer service) into something like a context parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84031809
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java ---
    @@ -0,0 +1,442 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.streaming.api.TimeDomain;
    +import org.apache.flink.streaming.api.TimerService;
    +import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
    +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +import org.apache.flink.streaming.util.TestHarnessUtil;
    +import org.apache.flink.util.Collector;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Test;
    +
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Tests {@link StreamTimelyFlatMap}.
    + */
    +public class TimelyFlatMapTest extends TestLogger {
    +
    +	@Test
    +	public void testCurrentEventTime() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processWatermark(new Watermark(17));
    +		testHarness.processElement(new StreamRecord<>(5, 12L));
    +
    +		testHarness.processWatermark(new Watermark(42));
    +		testHarness.processElement(new StreamRecord<>(6, 13L));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new Watermark(17L));
    +		expectedOutput.add(new StreamRecord<>("5WM:17", 12L));
    +		expectedOutput.add(new Watermark(42L));
    +		expectedOutput.add(new StreamRecord<>("6WM:42", 13L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	@Test
    +	public void testCurrentProcessingTime() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.setProcessingTime(17);
    +		testHarness.processElement(new StreamRecord<>(5));
    +
    +		testHarness.setProcessingTime(42);
    +		testHarness.processElement(new StreamRecord<>(6));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>("5PT:17"));
    +		expectedOutput.add(new StreamRecord<>("6PT:42"));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	@Test
    +	public void testEventTimeTimers() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, Integer> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new EventTimeTriggeringFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>(17, 42L));
    +
    +		testHarness.processWatermark(new Watermark(5));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>(17, 42L));
    +		expectedOutput.add(new StreamRecord<>(1777, 5L));
    +		expectedOutput.add(new Watermark(5L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	@Test
    +	public void testProcessingTimeTimers() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, Integer> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeTriggeringFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>(17));
    +
    +		testHarness.setProcessingTime(5);
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>(17));
    +		expectedOutput.add(new StreamRecord<>(1777, 5L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	/**
    +	 * Verifies that we don't have leakage between different keys.
    +	 */
    +	@Test
    +	public void testEventTimeTimerWithState() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new EventTimeTriggeringStatefulFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processWatermark(new Watermark(1));
    +		testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6
    +
    +		testHarness.processWatermark(new Watermark(2));
    +		testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7
    +
    +		testHarness.processWatermark(new Watermark(6));
    +		testHarness.processWatermark(new Watermark(7));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new Watermark(1L));
    +		expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
    +		expectedOutput.add(new Watermark(2L));
    +		expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
    +		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
    +		expectedOutput.add(new Watermark(6L));
    +		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
    +		expectedOutput.add(new Watermark(7L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	/**
    +	 * Verifies that we don't have leakage between different keys.
    +	 */
    +	@Test
    +	public void testProcessingTimeTimerWithState() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeTriggeringStatefulFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.setProcessingTime(1);
    +		testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6
    +
    +		testHarness.setProcessingTime(2);
    +		testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7
    +
    +		testHarness.setProcessingTime(6);
    +		testHarness.setProcessingTime(7);
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>("INPUT:17"));
    +		expectedOutput.add(new StreamRecord<>("INPUT:42"));
    +		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
    +		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	@Test
    +	public void testSnapshotAndRestore() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>(5, 12L));
    +
    +		// snapshot and restore from scratch
    +		StreamStateHandle snapshot = testHarness.snapshot(0, 0);
    +
    +		testHarness.close();
    +
    +		operator = new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
    +
    +		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.restore(snapshot);
    +		testHarness.open();
    +
    +		testHarness.setProcessingTime(5);
    +		testHarness.processWatermark(new Watermark(6));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
    +		expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
    +		expectedOutput.add(new Watermark(6));
    +
    +		System.out.println("GOT: " + testHarness.getOutput());
    --- End diff --
    
    done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2570: [FLINK-3674] Add an interface for Time aware User Functio...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2570
  
    I updated this on top of the latest master with @StefanRRichter's state changes.
    
    Please take another look, @StefanRRichter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84237685
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultProcessingTimeService.java ---
    @@ -35,10 +34,10 @@
     import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
    - * A {@link TimeServiceProvider} which assigns as current processing time the result of calling
    + * A {@link ProcessingTimeService} which assigns as current processing time the result of calling
      * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}.
      */
    -public class DefaultTimeServiceProvider extends TimeServiceProvider {
    +public class DefaultProcessingTimeService extends ProcessingTimeService {
     
    --- End diff --
    
    We could also rename it to SystemProcessingTimeService or sth more indicative of where clock tics come from.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84236439
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java ---
    @@ -0,0 +1,317 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.HashSet;
    +import java.util.PriorityQueue;
    +import java.util.Set;
    +import java.util.concurrent.ScheduledFuture;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * {@link InternalTimerService} that stores timers on the Java heap.
    + */
    +public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, ProcessingTimeCallback {
    +
    +	private final TypeSerializer<K> keySerializer;
    +
    +	private final TypeSerializer<N> namespaceSerializer;
    +
    +	private final ProcessingTimeService processingTimeService;
    +
    +	private long currentWatermark = Long.MIN_VALUE;
    +
    +	private final org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget;
    +
    +	private final KeyContext keyContext;
    +
    +	/**
    +	 * Processing time timers that are currently in-flight.
    +	 */
    +	private final PriorityQueue<InternalTimer<K, N>> processingTimeTimersQueue;
    +	private final Set<InternalTimer<K, N>> processingTimeTimers;
    +
    +	protected ScheduledFuture<?> nextTimer = null;
    +
    +	/**
    +	 * Currently waiting watermark callbacks.
    +	 */
    +	private final Set<InternalTimer<K, N>> watermarkTimers;
    +	private final PriorityQueue<InternalTimer<K, N>> watermarkTimersQueue;
    +
    +	public HeapInternalTimerService(
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService) {
    +		this.keySerializer = checkNotNull(keySerializer);
    +		this.namespaceSerializer = checkNotNull(namespaceSerializer);
    +		this.triggerTarget = checkNotNull(triggerTarget);
    +		this.keyContext = keyContext;
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +
    +		watermarkTimers = new HashSet<>();
    +		watermarkTimersQueue = new PriorityQueue<>(100);
    +
    +		processingTimeTimers = new HashSet<>();
    +		processingTimeTimersQueue = new PriorityQueue<>(100);
    +	}
    +
    +	public HeapInternalTimerService(
    +			TypeSerializer<K> keySerializer,
    +			TypeSerializer<N> namespaceSerializer,
    +			org.apache.flink.streaming.api.operators.Triggerable<K, N> triggerTarget,
    +			KeyContext keyContext,
    +			ProcessingTimeService processingTimeService,
    +			RestoredTimers<K, N> restoredTimers) {
    +
    +		this.keySerializer = checkNotNull(keySerializer);
    +		this.namespaceSerializer = checkNotNull(namespaceSerializer);
    +		this.triggerTarget = checkNotNull(triggerTarget);
    +		this.keyContext = keyContext;
    +		this.processingTimeService = checkNotNull(processingTimeService);
    +
    +		watermarkTimers = restoredTimers.watermarkTimers;
    +		watermarkTimersQueue = restoredTimers.watermarkTimersQueue;
    +
    +		processingTimeTimers = restoredTimers.processingTimeTimers;
    +		processingTimeTimersQueue = restoredTimers.processingTimeTimersQueue;
    +
    +		// re-register the restored timers (if any)
    +		if (processingTimeTimersQueue.size() > 0) {
    +			nextTimer =
    +					processingTimeService.registerTimer(processingTimeTimersQueue.peek().getTimestamp(), this);
    +		}
    +	}
    +
    +
    +	@Override
    +	public long currentProcessingTime() {
    +		return processingTimeService.getCurrentProcessingTime();
    +	}
    +
    +	@Override
    +	public long currentWatermark() {
    +		return currentWatermark;
    +	}
    +
    +	@Override
    +	public void registerProcessingTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +
    +		// make sure we only put one timer per key into the queue
    +		if (processingTimeTimers.add(timer)) {
    +
    +			InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
    +			long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
    +
    +			processingTimeTimersQueue.add(timer);
    +
    +			// check if we need to re-schedule our timer to earlier
    +			if (time < nextTriggerTime) {
    +				if (nextTimer != null) {
    +					nextTimer.cancel(false);
    +				}
    +				nextTimer = processingTimeService.registerTimer(time, this);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void registerEventTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +		if (watermarkTimers.add(timer)) {
    +			watermarkTimersQueue.add(timer);
    +		}
    +	}
    +
    +	@Override
    +	public void deleteProcessingTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +
    +		if (processingTimeTimers.remove(timer)) {
    +			processingTimeTimersQueue.remove(timer);
    +		}
    +	}
    +
    +	@Override
    +	public void deleteEventTimeTimer(N namespace, long time) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    +		if (watermarkTimers.remove(timer)) {
    +			watermarkTimersQueue.remove(timer);
    +		}
    +	}
    +
    +	@Override
    +	public void onProcessingTime(long time) throws Exception {
    +		// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
    +		// inside the callback.
    +		nextTimer = null;
    +
    +		InternalTimer<K, N> timer;
    +
    +		while ((timer  = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
    +
    +			processingTimeTimers.remove(timer);
    +			processingTimeTimersQueue.remove();
    +
    +			keyContext.setCurrentKey(timer.getKey());
    +			triggerTarget.onProcessingTime(timer);
    +		}
    +
    +		if (timer != null) {
    +			if (nextTimer == null) {
    +				nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
    +			}
    +		}
    +	}
    +
    +	public void advanceWatermark(long time) throws Exception {
    +		currentWatermark = time;
    +
    +		InternalTimer<K, N> timer = watermarkTimersQueue.peek();
    +
    +		while (timer != null && timer.getTimestamp() <= time) {
    +			watermarkTimers.remove(timer);
    +			watermarkTimersQueue.remove();
    +
    +			keyContext.setCurrentKey(timer.getKey());
    +			triggerTarget.onEventTime(timer);
    +
    +			timer = watermarkTimersQueue.peek();
    +		}
    +	}
    +
    +	public void snapshotTimers(OutputStream outStream) throws IOException {
    +		InstantiationUtil.serializeObject(outStream, keySerializer);
    +		InstantiationUtil.serializeObject(outStream, namespaceSerializer);
    +
    +		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(outStream);
    +
    --- End diff --
    
    Just for uniformity, you could use the Sets for both processing and event time timers when iterating over them to checkpoint.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2570: [FLINK-3674] Add an interface for Time aware User Functio...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/2570
  
    Rebased version looks good to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84035592
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java ---
    @@ -0,0 +1,442 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.operators;
    +
    +
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeutils.base.IntSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.runtime.state.StreamStateHandle;
    +import org.apache.flink.streaming.api.TimeDomain;
    +import org.apache.flink.streaming.api.TimerService;
    +import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
    +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +import org.apache.flink.streaming.util.TestHarnessUtil;
    +import org.apache.flink.util.Collector;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Test;
    +
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Tests {@link StreamTimelyFlatMap}.
    + */
    +public class TimelyFlatMapTest extends TestLogger {
    +
    +	@Test
    +	public void testCurrentEventTime() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processWatermark(new Watermark(17));
    +		testHarness.processElement(new StreamRecord<>(5, 12L));
    +
    +		testHarness.processWatermark(new Watermark(42));
    +		testHarness.processElement(new StreamRecord<>(6, 13L));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new Watermark(17L));
    +		expectedOutput.add(new StreamRecord<>("5WM:17", 12L));
    +		expectedOutput.add(new Watermark(42L));
    +		expectedOutput.add(new StreamRecord<>("6WM:42", 13L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	@Test
    +	public void testCurrentProcessingTime() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.setProcessingTime(17);
    +		testHarness.processElement(new StreamRecord<>(5));
    +
    +		testHarness.setProcessingTime(42);
    +		testHarness.processElement(new StreamRecord<>(6));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>("5PT:17"));
    +		expectedOutput.add(new StreamRecord<>("6PT:42"));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	@Test
    +	public void testEventTimeTimers() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, Integer> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new EventTimeTriggeringFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>(17, 42L));
    +
    +		testHarness.processWatermark(new Watermark(5));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>(17, 42L));
    +		expectedOutput.add(new StreamRecord<>(1777, 5L));
    +		expectedOutput.add(new Watermark(5L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	@Test
    +	public void testProcessingTimeTimers() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, Integer> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeTriggeringFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>(17));
    +
    +		testHarness.setProcessingTime(5);
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>(17));
    +		expectedOutput.add(new StreamRecord<>(1777, 5L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	/**
    +	 * Verifies that we don't have leakage between different keys.
    +	 */
    +	@Test
    +	public void testEventTimeTimerWithState() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new EventTimeTriggeringStatefulFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processWatermark(new Watermark(1));
    +		testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6
    +
    +		testHarness.processWatermark(new Watermark(2));
    +		testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7
    +
    +		testHarness.processWatermark(new Watermark(6));
    +		testHarness.processWatermark(new Watermark(7));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new Watermark(1L));
    +		expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
    +		expectedOutput.add(new Watermark(2L));
    +		expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
    +		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
    +		expectedOutput.add(new Watermark(6L));
    +		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
    +		expectedOutput.add(new Watermark(7L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	/**
    +	 * Verifies that we don't have leakage between different keys.
    +	 */
    +	@Test
    +	public void testProcessingTimeTimerWithState() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new ProcessingTimeTriggeringStatefulFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.setProcessingTime(1);
    +		testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6
    +
    +		testHarness.setProcessingTime(2);
    +		testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7
    +
    +		testHarness.setProcessingTime(6);
    +		testHarness.setProcessingTime(7);
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>("INPUT:17"));
    +		expectedOutput.add(new StreamRecord<>("INPUT:42"));
    +		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
    +		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	@Test
    +	public void testSnapshotAndRestore() throws Exception {
    +
    +		StreamTimelyFlatMap<Integer, Integer, String> operator =
    +				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
    +
    +		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
    +				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.processElement(new StreamRecord<>(5, 12L));
    +
    +		// snapshot and restore from scratch
    +		StreamStateHandle snapshot = testHarness.snapshot(0, 0);
    +
    +		testHarness.close();
    +
    +		operator = new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
    +
    +		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
    +
    +		testHarness.setup();
    +		testHarness.restore(snapshot);
    +		testHarness.open();
    +
    +		testHarness.setProcessingTime(5);
    +		testHarness.processWatermark(new Watermark(6));
    +
    +		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
    +
    +		expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
    +		expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
    +		expectedOutput.add(new Watermark(6));
    +
    +		System.out.println("GOT: " + testHarness.getOutput());
    +
    +		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    +
    +		testHarness.close();
    +	}
    +
    +	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
    +		private static final long serialVersionUID = 1L;
    +
    +		@Override
    +		public T getKey(T value) throws Exception {
    +			return value;
    +		}
    +	}
    +
    +	private static class WatermarkQueryingFlatMapFunction implements TimelyFlatMapFunction<Integer, String> {
    +
    +		private static final long serialVersionUID = 1L;
    +
    +		@Override
    +		public void flatMap(Integer value, TimerService timerService, Collector<String> out) throws Exception {
    +			out.collect(value + "WM:" + timerService.currentEventTime());
    +		}
    +
    +		@Override
    +		public void onTimer(
    +				long timestamp,
    +				TimeDomain timeDomain,
    +				TimerService timerService,
    +				Collector<String> out) throws Exception {
    +		}
    +	}
    +
    +	private static class EventTimeTriggeringFlatMapFunction implements TimelyFlatMapFunction<Integer, Integer> {
    +
    +		private static final long serialVersionUID = 1L;
    +
    +		@Override
    +		public void flatMap(Integer value, TimerService timerService, Collector<Integer> out) throws Exception {
    +			out.collect(value);
    +			timerService.registerEventTimeTimer(5);
    +		}
    +
    +		@Override
    +		public void onTimer(
    +				long timestamp,
    +				TimeDomain timeDomain,
    +				TimerService timerService,
    +				Collector<Integer> out) throws Exception {
    +
    +			assertEquals(TimeDomain.EVENT_TIME, timeDomain);
    +			out.collect(1777);
    +		}
    +	}
    +
    +	private static class EventTimeTriggeringStatefulFlatMapFunction extends RichTimelyFlatMapFunction<Integer, String> {
    +
    +		private static final long serialVersionUID = 1L;
    +
    +		private final ValueStateDescriptor<Integer> state =
    +				new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE, null);
    +
    +		@Override
    +		public void flatMap(Integer value, TimerService timerService, Collector<String> out) throws Exception {
    +			out.collect("INPUT:" + value);
    +			getRuntimeContext().getState(state).update(value);
    +			timerService.registerEventTimeTimer(timerService.currentEventTime() + 5);
    +		}
    +
    +		@Override
    +		public void onTimer(
    +				long timestamp,
    +				TimeDomain timeDomain,
    +				TimerService timerService,
    +				Collector<String> out) throws Exception {
    +			assertEquals(TimeDomain.EVENT_TIME, timeDomain);
    +			out.collect("STATE:" + getRuntimeContext().getState(state).value());
    +		}
    +	}
    +
    +	private static class ProcessingTimeTriggeringFlatMapFunction implements TimelyFlatMapFunction<Integer, Integer> {
    --- End diff --
    
    Done, I consolidated the testing functions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r83858268
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.annotation.Internal;
    +
    +/**
    + * Internal class for keeping track of in-flight timers.
    + *
    + * @param <K> Type of the keys to which timers are scoped.
    + * @param <N> Type of the namespace to which timers are scoped.
    + */
    +@Internal
    +public class InternalTimer<K, N> implements Comparable<InternalTimer<K, N>> {
    +	private final long timestamp;
    +	private final K key;
    +	private final N namespace;
    +
    +	public InternalTimer(long timestamp, K key, N namespace) {
    +		this.timestamp = timestamp;
    +		this.key = key;
    +		this.namespace = namespace;
    +	}
    +
    +	public long getTimestamp() {
    +		return timestamp;
    +	}
    +
    +	public K getKey() {
    +		return key;
    +	}
    +
    +	public N getNamespace() {
    +		return namespace;
    +	}
    +
    +	@Override
    +	public int compareTo(InternalTimer<K, N> o) {
    +		return Long.compare(this.timestamp, o.timestamp);
    --- End diff --
    
    Method compareTo is not fully aligned with equals, which is acceptable but strongly advised against by the documentation of `Comparable`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r83858860
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java ---
    @@ -0,0 +1,60 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.operators;
    +
    +import org.apache.flink.annotation.Internal;
    +
    +/**
    + * Interface for working with time and timers.
    + *
    + * <p>This is the internal version of {@link org.apache.flink.streaming.api.TimerService}
    + * that allows to specify a key and a namespace to which timers should be scoped.
    + *
    + * @param <N> Type of the namespace to which timers are scoped.
    + */
    +@Internal
    +public interface InternalTimerService<N> {
    +
    +	/** Returns the current processing time. */
    +	long currentProcessingTime();
    +
    +	/** Returns the current event time. */
    +	long currentWatermark();
    --- End diff --
    
    The corresponding method in the public interface is called `currentEventTime`. Does it makes sense to keep both method names synchronous? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r84234419
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.functions.Function;
    +import org.apache.flink.streaming.api.TimeDomain;
    +import org.apache.flink.streaming.api.TimerService;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.Serializable;
    +
    +/**
    + * Base interface for timely flatMap functions. FlatMap functions take elements and transform them,
    + * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists
    + * and arrays.
    + *
    + * <p>A {@code TimelyFlatMapFunction} can, in addition to the functionality of a normal
    + * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set timers and react
    + * to them firing.
    + *
    + * <pre>{@code
    + * DataStream<X> input = ...;
    + *
    + * DataStream<Y> result = input.flatMap(new MyTimelyFlatMapFunction());
    + * }</pre>
    + *
    + * @param <I> Type of the input elements.
    + * @param <O> Type of the returned elements.
    + */
    +@PublicEvolving
    +public interface TimelyFlatMapFunction<I, O> extends Function, Serializable {
    +
    +	/**
    +	 * The core method of the {@code TimelyFlatMapFunction}. Takes an element from the input data set and transforms
    +	 * it into zero, one, or more elements.
    +	 *
    +	 * @param value The input value.
    +	 * @param timerService A {@link TimerService} that allows setting timers and querying the
    +	 *                        current time.
    +	 * @param out The collector for returning result values.
    +	 *
    +	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
    +	 *                   to fail and may trigger recovery.
    +	 */
    +	void flatMap(I value, TimerService timerService, Collector<O> out) throws Exception;
    +
    +	/**
    +	 * Called when a timer set using {@link TimerService} fires.
    +	 *
    +	 * @param timestamp The timestamp of the firing timer.
    +	 * @param timeDomain The {@link TimeDomain} of the firing timer.
    +	 * @param timerService A {@link TimerService} that allows setting timers and querying the
    +	 *                        current time.
    +	 * @param out The collector for returning result values.
    +	 *
    +	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
    +	 *                   to fail and may trigger recovery.
    +	 */
    +	void onTimer(long timestamp, TimeDomain timeDomain, TimerService timerService, Collector<O> out) throws Exception ;
    --- End diff --
    
    Why not having 2 methods `onEventTime` and `onProcessingTime`? This way we avoid the `TimeDomain` argument, and we also tell the user to think about what he is doing in each case. 
    
    In addition, this way is similar to the APIs we expose for the Triggers and I think it is good to have uniform APIs. Another solution would be to change the Trigger APIs to this but this would break user code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r83854971
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Triggerable.java ---
    @@ -15,38 +15,26 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    +package org.apache.flink.streaming.api.operators;
     
    -package org.apache.flink.streaming.runtime.operators.windowing;
    +import org.apache.flink.annotation.Internal;
     
    -import org.apache.flink.streaming.runtime.operators.Triggerable;
    -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
    -
    -import java.util.concurrent.ScheduledFuture;
    -
    -class NoOpTimerService extends TimeServiceProvider {
    -
    -	private volatile boolean terminated;
    -
    -	@Override
    -	public long getCurrentProcessingTime() {
    -		return System.currentTimeMillis();
    -	}
    -
    -	@Override
    -	public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
    -		return null;
    -	}
    -
    -	@Override
    -	public boolean isTerminated() {
    -		return terminated;
    -	}
    -
    -	@Override
    -	public void quiesceAndAwaitPending() {}
    -
    -	@Override
    -	public void shutdownService() {
    -		terminated = true;
    -	}
    +/**
    + * Interface for things that can be called by {@link InternalTimerService}.
    + *
    + * @param <K> Type of the keys to which timers are scoped.
    + * @param <N> Type of the namespace to which timers are scoped.
    + */
    +@Internal
    +public interface Triggerable<K, N> {
    --- End diff --
    
    I found this name a bit confusing, it could lead to confusion with the concept of Triggers. Furthermore, there is another class with the same simple name in a different package. Maybe this could be called `TimerCallback`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---