You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by azagrebin <gi...@git.apache.org> on 2018/06/20 10:15:47 UTC

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

GitHub user azagrebin opened a pull request:

    https://github.com/apache/flink/pull/6186

    [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

    ## What is the purpose of the change
    
    This PR introduces TTL logic wrappers for state objects.
    
    ## Brief change log
    
    Added
      - sketch of TtlConfig
      - AbstractTtlWrapper and AbstractTtlState
      - concrete TTL wrappers for state objects
    
    ## Verifying this change
    
    Unit tests for state objects TTL wrappers
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (not applicable at this step)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/azagrebin/flink FLINK-9515

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6186.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6186
    
----
commit 62faa8ee220c21fa824fec690073c27a0a994be5
Author: Andrey Zagrebin <az...@...>
Date:   2018-06-04T15:28:40Z

    [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

----


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r196784275
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +import org.apache.flink.util.function.ThrowingRunnable;
    +
    +/**
    + * Base class for TTL logic wrappers.
    + *
    + * @param <T> Type of originally wrapped object
    + */
    +abstract class AbstractTtlDecorator<T> {
    +	final T original;
    +	final TtlConfig config;
    +	final TtlTimeProvider timeProvider;
    +	final boolean updateTsOnRead;
    +	final boolean returnExpired;
    +
    +	AbstractTtlDecorator(
    +		T original,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider) {
    +		Preconditions.checkNotNull(original);
    +		Preconditions.checkNotNull(config);
    +		Preconditions.checkNotNull(timeProvider);
    +		Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled,
    +			"State does not need to be wrapped with TTL if it is configured as disabled.");
    +		this.original = original;
    +		this.config = config;
    +		this.timeProvider = timeProvider;
    +		this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite;
    +		this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed;
    +	}
    +
    +	<V> V getUnexpried(TtlValue<V> ttlValue) {
    +		return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue();
    +	}
    +
    +	<V> boolean expired(TtlValue<V> ttlValue) {
    +		return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp();
    --- End diff --
    
    This looks like a bit problematic, because the `ttlValue.getExpirationTimestamp()` might be negative. E.g when the user provide `Long.MAX_VALUE` as the TTL value, what he expected is that the value should never be expired, but according to the current code, it will immediately expired.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r198436490
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUpdateType.java ---
    @@ -0,0 +1,31 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +/**
    + * This option value configures when to prolong state TTL.
    + */
    +public enum TtlUpdateType {
    +	/** TTL is disabled. State does not expire. */
    +	Disabled,
    --- End diff --
    
    This is more for internally named configuration, basically for later use in TTL wrapper factory to decide whether to wrap with TTL or not. Alternative would be probably to save null'ed, Optional config or negative TTL in state descriptor when configured w/o TTL by default. I found it more explicit, although, it faces user and I planned that he will rather probably just not configure TTL at all than use this option.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r196998472
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps list state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user entry value of state with TTL
    + */
    +class TtlListState<K, N, T> extends
    +	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
    +	implements InternalListState<K, N, T> {
    +	TtlListState(
    +		InternalListState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<List<T>> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public void update(List<T> values) throws Exception {
    +		updateInternal(values);
    +	}
    +
    +	@Override
    +	public void addAll(List<T> values) throws Exception {
    +		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
    +		original.addAll(withTs(values));
    +	}
    +
    +	@Override
    +	public Iterable<T> get() throws Exception {
    +		Iterable<TtlValue<T>> ttlValue = original.get();
    +		ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
    +		if (updateTsOnRead) {
    +			List<TtlValue<T>> collected = collect(ttlValue);
    +			ttlValue = collected;
    +			updateTs(collected);
    +		}
    +		final Iterable<TtlValue<T>> finalResult = ttlValue;
    +		return () -> new IteratorWithCleanup(finalResult.iterator());
    +	}
    +
    +	private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
    +		List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
    +			.filter(v -> !expired(v))
    +			.map(this::rewrapWithNewTs)
    +			.collect(Collectors.toList());
    +		if (!unexpiredWithUpdatedTs.isEmpty()) {
    +			original.update(unexpiredWithUpdatedTs);
    +		}
    +	}
    +
    +	@Override
    +	public void add(T value) throws Exception {
    +		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
    +		original.add(wrapWithTs(value));
    +	}
    +
    +	@Override
    +	public void clear() {
    +		original.clear();
    +	}
    +
    +	@Override
    +	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
    +		original.mergeNamespaces(target, sources);
    +	}
    +
    +	@Override
    +	public List<T> getInternal() throws Exception {
    +		return collect(get());
    +	}
    +
    +	private <E> List<E> collect(Iterable<E> iterable) {
    --- End diff --
    
    If we've called `getInterval()` in `get()`, and make the `updateTs()` to accept `Iterable`, then this method seems could be removed(Or at least, we should add a check for if the `iterable` is assignable from `List`, if true we could cast it to List and return immediately).


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197482482
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +import org.apache.flink.util.function.ThrowingRunnable;
    +
    +/**
    + * Base class for TTL logic wrappers.
    + *
    + * @param <T> Type of originally wrapped object
    + */
    +abstract class AbstractTtlDecorator<T> {
    +	final T original;
    +	final TtlConfig config;
    +	final TtlTimeProvider timeProvider;
    +	final boolean updateTsOnRead;
    +	final boolean returnExpired;
    +	final long ttl;
    +
    +	AbstractTtlDecorator(
    +		T original,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider) {
    +		Preconditions.checkNotNull(original);
    +		Preconditions.checkNotNull(config);
    +		Preconditions.checkNotNull(timeProvider);
    +		Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled,
    +			"State does not need to be wrapped with TTL if it is configured as disabled.");
    +		this.original = original;
    +		this.config = config;
    +		this.timeProvider = timeProvider;
    +		this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite;
    +		this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed;
    +		this.ttl = config.getTtl().toMilliseconds();
    +	}
    +
    +	<V> V getUnexpried(TtlValue<V> ttlValue) {
    +		return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue();
    +	}
    +
    +	<V> boolean expired(TtlValue<V> ttlValue) {
    +		return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp();
    +	}
    +
    +	<V> TtlValue<V> wrapWithTs(V value) {
    +		return wrapWithTs(value, newExpirationTimestamp());
    +	}
    +
    +	static <V> TtlValue<V> wrapWithTs(V value, long ts) {
    +		return value == null ? null : new TtlValue<>(value, ts);
    +	}
    +
    +	<V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) {
    +		return wrapWithTs(ttlValue.getUserValue());
    +	}
    +
    +	private long newExpirationTimestamp() {
    +		long currentTs = timeProvider.currentTimestamp();
    +		long ttlWithoutOverflow = currentTs > 0 ? Math.min(Long.MAX_VALUE - currentTs, ttl) : ttl;
    +		return currentTs + ttlWithoutOverflow;
    +	}
    +
    +	<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> V getWithTtlCheckAndUpdate(
    --- End diff --
    
    Seems also like this method almost fits better into `AbstractTtlState`, for example you can access `clear()` directly.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r196996094
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps list state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user entry value of state with TTL
    + */
    +class TtlListState<K, N, T> extends
    +	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
    +	implements InternalListState<K, N, T> {
    +	TtlListState(
    +		InternalListState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<List<T>> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public void update(List<T> values) throws Exception {
    +		updateInternal(values);
    +	}
    +
    +	@Override
    +	public void addAll(List<T> values) throws Exception {
    +		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
    +		original.addAll(withTs(values));
    +	}
    +
    +	@Override
    +	public Iterable<T> get() throws Exception {
    +		Iterable<TtlValue<T>> ttlValue = original.get();
    +		ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
    +		if (updateTsOnRead) {
    +			List<TtlValue<T>> collected = collect(ttlValue);
    +			ttlValue = collected;
    +			updateTs(collected);
    +		}
    +		final Iterable<TtlValue<T>> finalResult = ttlValue;
    +		return () -> new IteratorWithCleanup(finalResult.iterator());
    +	}
    +
    +	private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
    +		List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
    +			.filter(v -> !expired(v))
    +			.map(this::rewrapWithNewTs)
    +			.collect(Collectors.toList());
    +		if (!unexpiredWithUpdatedTs.isEmpty()) {
    +			original.update(unexpiredWithUpdatedTs);
    +		}
    +	}
    +
    +	@Override
    +	public void add(T value) throws Exception {
    +		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
    +		original.add(wrapWithTs(value));
    +	}
    +
    +	@Override
    +	public void clear() {
    +		original.clear();
    +	}
    +
    +	@Override
    +	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
    +		original.mergeNamespaces(target, sources);
    +	}
    +
    +	@Override
    +	public List<T> getInternal() throws Exception {
    +		return collect(get());
    --- End diff --
    
    This looks a bit weird, my gut feeling is that we should call `getInternal()` in `get()`(as we called `updateInternal()` in `update()` in this class), but here is reverse.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197475154
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java ---
    @@ -0,0 +1,29 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +/**
    + * This option configures whether to return expired user value or not.
    + */
    +public enum TtlStateVisibility {
    --- End diff --
    
    Same here.


---

[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/6186
  
    @azagrebin thanks for addressing the concerns, looks good from my side now. let's wait for @StefanRRichter 's review.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197080576
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps list state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user entry value of state with TTL
    + */
    +class TtlListState<K, N, T> extends
    +	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
    +	implements InternalListState<K, N, T> {
    +	TtlListState(
    +		InternalListState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<List<T>> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public void update(List<T> values) throws Exception {
    +		updateInternal(values);
    +	}
    +
    +	@Override
    +	public void addAll(List<T> values) throws Exception {
    +		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
    +		original.addAll(withTs(values));
    +	}
    +
    +	@Override
    +	public Iterable<T> get() throws Exception {
    +		Iterable<TtlValue<T>> ttlValue = original.get();
    +		ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
    +		if (updateTsOnRead) {
    +			List<TtlValue<T>> collected = collect(ttlValue);
    +			ttlValue = collected;
    +			updateTs(collected);
    +		}
    +		final Iterable<TtlValue<T>> finalResult = ttlValue;
    +		return () -> new IteratorWithCleanup(finalResult.iterator());
    +	}
    +
    +	private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
    +		List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
    +			.filter(v -> !expired(v))
    +			.map(this::rewrapWithNewTs)
    +			.collect(Collectors.toList());
    +		if (!unexpiredWithUpdatedTs.isEmpty()) {
    +			original.update(unexpiredWithUpdatedTs);
    +		}
    +	}
    +
    +	@Override
    +	public void add(T value) throws Exception {
    +		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
    +		original.add(wrapWithTs(value));
    +	}
    +
    +	@Override
    +	public void clear() {
    +		original.clear();
    +	}
    +
    +	@Override
    +	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
    +		original.mergeNamespaces(target, sources);
    +	}
    +
    +	@Override
    +	public List<T> getInternal() throws Exception {
    +		return collect(get());
    +	}
    +
    +	private <E> List<E> collect(Iterable<E> iterable) {
    --- End diff --
    
    If we need to "update on read", then here is a bit confusion to me. Currently we attach TTL for every list item, so the "update on read" should scope to the list item, not the whole list. So, it makes me feel that an `iterable` for `updateTs` seems more reasonable. What do you think?


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197102118
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps list state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user entry value of state with TTL
    + */
    +class TtlListState<K, N, T> extends
    +	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
    +	implements InternalListState<K, N, T> {
    +	TtlListState(
    +		InternalListState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<List<T>> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public void update(List<T> values) throws Exception {
    +		updateInternal(values);
    +	}
    +
    +	@Override
    +	public void addAll(List<T> values) throws Exception {
    +		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
    +		original.addAll(withTs(values));
    +	}
    +
    +	@Override
    +	public Iterable<T> get() throws Exception {
    +		Iterable<TtlValue<T>> ttlValue = original.get();
    +		ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
    +		if (updateTsOnRead) {
    +			List<TtlValue<T>> collected = collect(ttlValue);
    +			ttlValue = collected;
    +			updateTs(collected);
    +		}
    +		final Iterable<TtlValue<T>> finalResult = ttlValue;
    +		return () -> new IteratorWithCleanup(finalResult.iterator());
    +	}
    +
    +	private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
    +		List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
    +			.filter(v -> !expired(v))
    +			.map(this::rewrapWithNewTs)
    +			.collect(Collectors.toList());
    +		if (!unexpiredWithUpdatedTs.isEmpty()) {
    +			original.update(unexpiredWithUpdatedTs);
    +		}
    +	}
    +
    +	@Override
    +	public void add(T value) throws Exception {
    +		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
    +		original.add(wrapWithTs(value));
    +	}
    +
    +	@Override
    +	public void clear() {
    +		original.clear();
    +	}
    +
    +	@Override
    +	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
    +		original.mergeNamespaces(target, sources);
    +	}
    +
    +	@Override
    +	public List<T> getInternal() throws Exception {
    +		return collect(get());
    +	}
    +
    +	private <E> List<E> collect(Iterable<E> iterable) {
    --- End diff --
    
    Well, not sure, it matters at this point, it is private and used just once with a collected `LIst`. My thinking was that it is less verbose: 
    `list.stream()` vs `StreamSupport.stream(iterable.spliterator(), false)`.


---

[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on the issue:

    https://github.com/apache/flink/pull/6186
  
    cc @StefanRRichter 


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197476710
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java ---
    @@ -0,0 +1,29 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +/**
    + * This option configures whether to return expired user value or not.
    + */
    +public enum TtlStateVisibility {
    +	/** Return still available expired user value (not yet cleaned up). */
    +	Relaxed,
    --- End diff --
    
    Would explain this different in the comment. It means that expired state can be returned if it is not yet cleaned up.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r199082008
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java ---
    @@ -0,0 +1,172 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps list state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user entry value of state with TTL
    + */
    +class TtlListState<K, N, T> extends
    +	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
    +	implements InternalListState<K, N, T> {
    +	TtlListState(
    +		InternalListState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<List<T>> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public void update(List<T> values) throws Exception {
    +		updateInternal(values);
    +	}
    +
    +	@Override
    +	public void addAll(List<T> values) throws Exception {
    +		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
    +		original.addAll(withTs(values));
    +	}
    +
    +	@Override
    +	public Iterable<T> get() throws Exception {
    --- End diff --
    
    My idea was to keep this method lazy when possible. Currently this implementation does not assume that underlying state returns `List` but rather works as if it is `Iterable` and lazily wraps it with `IteratorWithCleanup`. Only when it has to update timestamps on read, it materialises `Iterable` to `List`.


---

[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/6186
  
    Maybe let me elaborate the TTL checking condition in detail, overall the checking condition contains two parts and looks like `(current_ts - update_ts) - time_shift_offset >= TTL`.
    
    The `time_shift_offset` is the shift offset that we should applied when checking the TTL.
    
    - For the records that the `update_ts` > `checkpoint_ts`, we could know they were created(or updated) after the last restoring so we don't need to apply any shift to it. So that shift offset is `0`.
    
    - For the records that the `update_ts` <= `checkpoint_ts`, we could know they were created(or updated) before the last restoring so we need to apply the shift to it, the shift offset is `recovery_ts - checkpoint_ts`.
    
    In our current code, we didn't do the time-align works, it equals to a special case of the above condition where the `time_shift_offset` is always `0`.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r196995820
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps list state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user entry value of state with TTL
    + */
    +class TtlListState<K, N, T> extends
    +	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
    +	implements InternalListState<K, N, T> {
    +	TtlListState(
    +		InternalListState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<List<T>> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public void update(List<T> values) throws Exception {
    +		updateInternal(values);
    +	}
    +
    +	@Override
    +	public void addAll(List<T> values) throws Exception {
    +		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
    +		original.addAll(withTs(values));
    +	}
    +
    +	@Override
    +	public Iterable<T> get() throws Exception {
    +		Iterable<TtlValue<T>> ttlValue = original.get();
    +		ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
    +		if (updateTsOnRead) {
    +			List<TtlValue<T>> collected = collect(ttlValue);
    +			ttlValue = collected;
    +			updateTs(collected);
    +		}
    +		final Iterable<TtlValue<T>> finalResult = ttlValue;
    +		return () -> new IteratorWithCleanup(finalResult.iterator());
    +	}
    +
    +	private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
    +		List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
    +			.filter(v -> !expired(v))
    +			.map(this::rewrapWithNewTs)
    +			.collect(Collectors.toList());
    +		if (!unexpiredWithUpdatedTs.isEmpty()) {
    +			original.update(unexpiredWithUpdatedTs);
    +		}
    +	}
    +
    +	@Override
    +	public void add(T value) throws Exception {
    +		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
    +		original.add(wrapWithTs(value));
    +	}
    +
    +	@Override
    +	public void clear() {
    +		original.clear();
    +	}
    +
    +	@Override
    +	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
    +		original.mergeNamespaces(target, sources);
    --- End diff --
    
    Again, should we also do the `TTL` check for `original.mergeNamespaces()`? Since we need to query the state when merging namespaces.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197476858
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java ---
    @@ -0,0 +1,29 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +/**
    + * This option configures whether to return expired user value or not.
    + */
    +public enum TtlStateVisibility {
    +	/** Return still available expired user value (not yet cleaned up). */
    +	Relaxed,
    +	/** Hide expired user value and behave as if it does not exist any more. */
    +	Exact
    --- End diff --
    
    And here: expired state is never returned to the user. I think it does not matter if it is just hidden, or deleted.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197501902
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java ---
    @@ -0,0 +1,43 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.functions.FoldFunction;
    +
    +/**
    + * This class wraps folding function with TTL logic.
    + *
    + * @param <T> Type of the values folded into the state
    + * @param <ACC> Type of the value in the state
    + *
    + * @deprecated use {@link TtlAggregateFunction} instead
    + */
    +@Deprecated
    +class TtlFoldFunction<T, ACC>
    +	extends AbstractTtlDecorator<FoldFunction<T, ACC>>
    +	implements FoldFunction<T, TtlValue<ACC>> {
    +	TtlFoldFunction(FoldFunction<T, ACC> original, TtlConfig config, TtlTimeProvider timeProvider) {
    +		super(original, config, timeProvider);
    +	}
    +
    +	@Override
    +	public TtlValue<ACC> fold(TtlValue<ACC> accumulator, T value) throws Exception {
    +		return wrapWithTs(original.fold(getUnexpried(accumulator), value));
    --- End diff --
    
    I think that (similar to`TtlAggregationFunction`) you need to intercept `null` values of accumulator here and replace them by the default value.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197475245
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeCharacteristic.java ---
    @@ -0,0 +1,27 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +/**
    + * This option configures time scale to use for ttl.
    + */
    +public enum TtlTimeCharacteristic {
    --- End diff --
    
    Same here.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r198430741
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java ---
    @@ -0,0 +1,66 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * Configuration of state TTL logic.
    + * TODO: builder
    --- End diff --
    
    Yes, config is only sketched. It is planned for later step when TTL will be activated in a State Descriptor API.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r196827863
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps list state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user entry value of state with TTL
    + */
    +class TtlListState<K, N, T> extends
    +	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
    +	implements InternalListState<K, N, T> {
    +	TtlListState(
    +		InternalListState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<List<T>> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public void update(List<T> values) throws Exception {
    +		updateInternal(values);
    +	}
    +
    +	@Override
    +	public void addAll(List<T> values) throws Exception {
    +		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
    +		original.addAll(withTs(values));
    +	}
    +
    +	@Override
    +	public Iterable<T> get() throws Exception {
    +		Iterable<TtlValue<T>> ttlValue = original.get();
    +		ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
    +		if (updateTsOnRead) {
    +			List<TtlValue<T>> collected = collect(ttlValue);
    +			ttlValue = collected;
    +			updateTs(collected);
    +		}
    +		final Iterable<TtlValue<T>> finalResult = ttlValue;
    --- End diff --
    
    Oh sorry, my bad, I'm misunderstand...


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197077377
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps list state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user entry value of state with TTL
    + */
    +class TtlListState<K, N, T> extends
    +	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
    +	implements InternalListState<K, N, T> {
    +	TtlListState(
    +		InternalListState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<List<T>> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public void update(List<T> values) throws Exception {
    +		updateInternal(values);
    +	}
    +
    +	@Override
    +	public void addAll(List<T> values) throws Exception {
    +		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
    +		original.addAll(withTs(values));
    +	}
    +
    +	@Override
    +	public Iterable<T> get() throws Exception {
    +		Iterable<TtlValue<T>> ttlValue = original.get();
    +		ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
    +		if (updateTsOnRead) {
    +			List<TtlValue<T>> collected = collect(ttlValue);
    +			ttlValue = collected;
    +			updateTs(collected);
    +		}
    +		final Iterable<TtlValue<T>> finalResult = ttlValue;
    +		return () -> new IteratorWithCleanup(finalResult.iterator());
    +	}
    +
    +	private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
    +		List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
    +			.filter(v -> !expired(v))
    +			.map(this::rewrapWithNewTs)
    +			.collect(Collectors.toList());
    +		if (!unexpiredWithUpdatedTs.isEmpty()) {
    +			original.update(unexpiredWithUpdatedTs);
    +		}
    +	}
    +
    +	@Override
    +	public void add(T value) throws Exception {
    +		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
    +		original.add(wrapWithTs(value));
    +	}
    +
    +	@Override
    +	public void clear() {
    +		original.clear();
    +	}
    +
    +	@Override
    +	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
    +		original.mergeNamespaces(target, sources);
    +	}
    +
    +	@Override
    +	public List<T> getInternal() throws Exception {
    +		return collect(get());
    +	}
    +
    +	private <E> List<E> collect(Iterable<E> iterable) {
    --- End diff --
    
    In general the idea is to keep `get()` method as lazy as possible, return `Iterable` and avoid querying backend if not needed. At the moment list state internally stores `List` according to API, not `Iterable` (something to think about in future). So `getInternal()` returns `List` - collected `Iterable`. I agree it looks weird but if `get()` calls `getInternal()` it forces collecting `List`.
    
    If we do not need update on read, everything can still stay lazy in final resulting `IteratorWithCleanup` - one more potential user iteration over original `Iterable` from backend.
    
    If we do need update on read, we have to collect it anyways fully, so potentially query backend. That is why it is collected and cached to avoid iterating over original `Iterable` from backend. `updateTs` and `IteratorWithCleanup` perform a bit different iterations over `collected` but already materialised in this case.
    
    Currently we return `List` object in both backends. I think, it is a good idea to check `Iterable` in `collect` whether it is already a collected `List` and skip actual collecting then, thanks. I will add it.


---

[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on the issue:

    https://github.com/apache/flink/pull/6186
  
    Thanks for review @sihuazhou, I think now all concerns should have been addressed.
    CI failure is unrelated, works in [my CI for the same commit](https://travis-ci.org/azagrebin/flink/builds/395137069?utm_source=github_status&utm_medium=notification).


---

[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6186
  
    Merging this.


---

[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6186
  
    Had one more minor comment. Besides, this looks good 👍 Nice job!


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197502165
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java ---
    @@ -0,0 +1,43 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.functions.FoldFunction;
    +
    +/**
    + * This class wraps folding function with TTL logic.
    + *
    + * @param <T> Type of the values folded into the state
    + * @param <ACC> Type of the value in the state
    + *
    + * @deprecated use {@link TtlAggregateFunction} instead
    + */
    +@Deprecated
    +class TtlFoldFunction<T, ACC>
    +	extends AbstractTtlDecorator<FoldFunction<T, ACC>>
    +	implements FoldFunction<T, TtlValue<ACC>> {
    +	TtlFoldFunction(FoldFunction<T, ACC> original, TtlConfig config, TtlTimeProvider timeProvider) {
    +		super(original, config, timeProvider);
    +	}
    +
    +	@Override
    +	public TtlValue<ACC> fold(TtlValue<ACC> accumulator, T value) throws Exception {
    +		return wrapWithTs(original.fold(getUnexpried(accumulator), value));
    --- End diff --
    
    I this is true, that is a valuable case to be tested for all the appending states that they work correctly.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197004891
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
    @@ -0,0 +1,47 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This class wraps user value of state with TTL.
    + *
    + * @param <T> Type of the user value of state with TTL
    + */
    +class TtlValue<T> implements Serializable {
    +	private final T userValue;
    +	private final long expirationTimestamp;
    --- End diff --
    
    The `expirationTimestamp` is an absolute timestamp, should we do the timestamp shift for `TtlValue` when checkpoint & recovery? For example, when user using the `ProcessTime` as the TimeCharacater, and set the `TTL = 10min`. For some reason, he triggers a savepoint, and after 11 min he recover the job from the savepoint, if we don't do the timestamp shift, then all the state will be expired.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197079917
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
    @@ -0,0 +1,47 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This class wraps user value of state with TTL.
    + *
    + * @param <T> Type of the user value of state with TTL
    + */
    +class TtlValue<T> implements Serializable {
    +	private final T userValue;
    +	private final long expirationTimestamp;
    --- End diff --
    
    I think it is a bit out of score for this PR. I thought about this concern but semantics of processing time is similar of real clock time and it does not stop if job is stopped. There is also event time option with more control over time. I would leave it for now. This is more about user migration of checkpoints. We can add a comment/open question to design doc about it.


---

[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6186
  
    @azagrebin I think the overall idea is well implemented. I just had a couple of comments inline.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197109577
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java ---
    @@ -0,0 +1,98 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +import org.apache.flink.util.function.ThrowingRunnable;
    +
    +/**
    + * Base class for TTL logic wrappers.
    + *
    + * @param <T> Type of originally wrapped object
    + */
    +abstract class AbstractTtlDecorator<T> {
    +	final T original;
    +	final TtlConfig config;
    +	final TtlTimeProvider timeProvider;
    +	final boolean updateTsOnRead;
    +	final boolean returnExpired;
    +
    +	AbstractTtlDecorator(
    +		T original,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider) {
    +		Preconditions.checkNotNull(original);
    +		Preconditions.checkNotNull(config);
    +		Preconditions.checkNotNull(timeProvider);
    +		Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled,
    +			"State does not need to be wrapped with TTL if it is configured as disabled.");
    +		this.original = original;
    +		this.config = config;
    +		this.timeProvider = timeProvider;
    +		this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite;
    +		this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed;
    +	}
    +
    +	<V> V getUnexpried(TtlValue<V> ttlValue) {
    +		return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue();
    +	}
    +
    +	<V> boolean expired(TtlValue<V> ttlValue) {
    +		return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp();
    +	}
    +
    +	<V> TtlValue<V> wrapWithTs(V value) {
    +		return wrapWithTs(value, newExpirationTimestamp());
    +	}
    +
    +	static <V> TtlValue<V> wrapWithTs(V value, long ts) {
    +		return value == null ? null : new TtlValue<>(value, ts);
    +	}
    +
    +	<V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) {
    +		return wrapWithTs(ttlValue.getUserValue());
    +	}
    +
    +	private long newExpirationTimestamp() {
    +		long currentTs = timeProvider.currentTimestamp();
    +		long ttl = config.getTtl().toMilliseconds();
    --- End diff --
    
    I think it will be optimised by JIT as an only usage, but I will do just in case


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197001110
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalReducingState;
    +
    +import java.util.Collection;
    +
    +/**
    + * This class wraps reducing state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user value of state with TTL
    + */
    +class TtlReducingState<K, N, T>
    +	extends AbstractTtlState<K, N, T, TtlValue<T>, InternalReducingState<K, N, TtlValue<T>>>
    +	implements InternalReducingState<K, N, T> {
    +	TtlReducingState(
    +		InternalReducingState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<T> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public T get() throws Exception {
    +		return getInternal();
    +	}
    +
    +	@Override
    +	public void add(T value) throws Exception {
    +		original.add(wrapWithTs(value, Long.MAX_VALUE));
    +	}
    +
    +	@Override
    +	public void clear() {
    +		original.clear();
    +	}
    +
    +	@Override
    +	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
    +		original.mergeNamespaces(target, sources);
    --- End diff --
    
    Again, Should we also do the TTL check for original.mergeNamespaces()? Since we need to query the state when merging namespaces.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197477831
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java ---
    @@ -0,0 +1,29 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +/**
    + * This option configures whether to return expired user value or not.
    --- End diff --
    
    "can" be returned. we accept it in relaxed, we don't enforce it.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197480035
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +import org.apache.flink.util.function.ThrowingRunnable;
    +
    +/**
    + * Base class for TTL logic wrappers.
    + *
    + * @param <T> Type of originally wrapped object
    + */
    +abstract class AbstractTtlDecorator<T> {
    --- End diff --
    
    I wonder it is better to have this as abstract base class or prefer composition over inheritance for `AbstractTtlState`. It has no abstract methods and also no methods are overriden by subclasses, it could as well just be a reference of `AbstractTtlState` and not abstract.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197108498
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps list state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user entry value of state with TTL
    + */
    +class TtlListState<K, N, T> extends
    +	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
    +	implements InternalListState<K, N, T> {
    +	TtlListState(
    +		InternalListState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<List<T>> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public void update(List<T> values) throws Exception {
    +		updateInternal(values);
    +	}
    +
    +	@Override
    +	public void addAll(List<T> values) throws Exception {
    +		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
    +		original.addAll(withTs(values));
    +	}
    +
    +	@Override
    +	public Iterable<T> get() throws Exception {
    +		Iterable<TtlValue<T>> ttlValue = original.get();
    +		ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
    +		if (updateTsOnRead) {
    +			List<TtlValue<T>> collected = collect(ttlValue);
    +			ttlValue = collected;
    +			updateTs(collected);
    +		}
    +		final Iterable<TtlValue<T>> finalResult = ttlValue;
    +		return () -> new IteratorWithCleanup(finalResult.iterator());
    +	}
    +
    +	private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
    +		List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
    +			.filter(v -> !expired(v))
    +			.map(this::rewrapWithNewTs)
    +			.collect(Collectors.toList());
    +		if (!unexpiredWithUpdatedTs.isEmpty()) {
    +			original.update(unexpiredWithUpdatedTs);
    +		}
    +	}
    +
    +	@Override
    +	public void add(T value) throws Exception {
    +		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
    +		original.add(wrapWithTs(value));
    +	}
    +
    +	@Override
    +	public void clear() {
    +		original.clear();
    +	}
    +
    +	@Override
    +	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
    +		original.mergeNamespaces(target, sources);
    --- End diff --
    
    I think this is out of TTL scope now. Merge namespace methods are not part of API which immediately face user. It is used only internally for windowing, not sure if it needs TTL ever. Anyways read part of API should eventually evict expired state, merged or not. For aggregating states, merge already uses wrapped reading methods from aggregate functions underneath which should also speed up cleanup if ever used with `mergeNamespaces`.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r198439894
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +import org.apache.flink.util.function.ThrowingRunnable;
    +
    +/**
    + * Base class for TTL logic wrappers.
    + *
    + * @param <T> Type of originally wrapped object
    + */
    +abstract class AbstractTtlDecorator<T> {
    --- End diff --
    
    Currently this class is used to wrap any object with TTL logic, not only state objects: e.g. aggregating functions. It can be injected as a "TTL time service" but then the common member `original` will have to be duplicated in function wrappers.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6186


---

[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on the issue:

    https://github.com/apache/flink/pull/6186
  
    Hi, after read the whole implementation,  i found that the state is expired when it is accessed, When there is the dirty data store to state and never be queried, how does it can be expired. Or is there an undergoing work for this ? @azagrebin 


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r198947650
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java ---
    @@ -0,0 +1,43 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.functions.FoldFunction;
    +
    +/**
    + * This class wraps folding function with TTL logic.
    + *
    + * @param <T> Type of the values folded into the state
    + * @param <ACC> Type of the value in the state
    + *
    + * @deprecated use {@link TtlAggregateFunction} instead
    + */
    +@Deprecated
    +class TtlFoldFunction<T, ACC>
    +	extends AbstractTtlDecorator<FoldFunction<T, ACC>>
    +	implements FoldFunction<T, TtlValue<ACC>> {
    +	TtlFoldFunction(FoldFunction<T, ACC> original, TtlConfig config, TtlTimeProvider timeProvider) {
    +		super(original, config, timeProvider);
    +	}
    +
    +	@Override
    +	public TtlValue<ACC> fold(TtlValue<ACC> accumulator, T value) throws Exception {
    +		return wrapWithTs(original.fold(getUnexpried(accumulator), value));
    --- End diff --
    
    It should be covered with `updateExpired` in `testExactExpirationOnWrite`


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r201971238
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps list state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user entry value of state with TTL
    + */
    +class TtlListState<K, N, T> extends
    +	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
    +	implements InternalListState<K, N, T> {
    +	TtlListState(
    +		InternalListState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<List<T>> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public void update(List<T> values) throws Exception {
    +		updateInternal(values);
    +	}
    +
    +	@Override
    +	public void addAll(List<T> values) throws Exception {
    +		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
    +		original.addAll(withTs(values));
    +	}
    +
    +	@Override
    +	public Iterable<T> get() throws Exception {
    +		Iterable<TtlValue<T>> ttlValue = original.get();
    +		ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
    +		if (updateTsOnRead) {
    +			List<TtlValue<T>> collected = collect(ttlValue);
    +			ttlValue = collected;
    +			updateTs(collected);
    +		}
    +		final Iterable<TtlValue<T>> finalResult = ttlValue;
    +		return () -> new IteratorWithCleanup(finalResult.iterator());
    +	}
    +
    +	private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
    +		List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
    +			.filter(v -> !expired(v))
    +			.map(this::rewrapWithNewTs)
    +			.collect(Collectors.toList());
    +		if (!unexpiredWithUpdatedTs.isEmpty()) {
    +			original.update(unexpiredWithUpdatedTs);
    +		}
    +	}
    +
    +	@Override
    +	public void add(T value) throws Exception {
    +		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
    +		original.add(wrapWithTs(value));
    +	}
    +
    +	@Override
    +	public void clear() {
    +		original.clear();
    +	}
    +
    +	@Override
    +	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
    +		original.mergeNamespaces(target, sources);
    +	}
    +
    +	@Override
    +	public List<T> getInternal() throws Exception {
    +		return collect(get());
    +	}
    +
    +	private <E> List<E> collect(Iterable<E> iterable) {
    --- End diff --
    
    Got it, thanks for your explanation.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197459764
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +import org.apache.flink.util.function.ThrowingRunnable;
    +
    +/**
    + * Base class for TTL logic wrappers.
    + *
    + * @param <T> Type of originally wrapped object
    + */
    +abstract class AbstractTtlDecorator<T> {
    +	final T original;
    +	final TtlConfig config;
    +	final TtlTimeProvider timeProvider;
    +	final boolean updateTsOnRead;
    +	final boolean returnExpired;
    +	final long ttl;
    +
    +	AbstractTtlDecorator(
    +		T original,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider) {
    +		Preconditions.checkNotNull(original);
    +		Preconditions.checkNotNull(config);
    +		Preconditions.checkNotNull(timeProvider);
    +		Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled,
    +			"State does not need to be wrapped with TTL if it is configured as disabled.");
    +		this.original = original;
    +		this.config = config;
    +		this.timeProvider = timeProvider;
    +		this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite;
    +		this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed;
    +		this.ttl = config.getTtl().toMilliseconds();
    +	}
    +
    +	<V> V getUnexpried(TtlValue<V> ttlValue) {
    --- End diff --
    
    This method does two things: checking the ttl and unwrapping the value. I would make this as two methods to separate concerns and make it more readable.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197494851
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalMapState;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.AbstractMap;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps map state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <UK> Type of the user entry key of state with TTL
    + * @param <UV> Type of the user entry value of state with TTL
    + */
    +class TtlMapState<K, N, UK, UV>
    +	extends AbstractTtlState<K, N, Map<UK, UV>, Map<UK, TtlValue<UV>>, InternalMapState<K, N, UK, TtlValue<UV>>>
    +	implements InternalMapState<K, N, UK, UV> {
    +	TtlMapState(
    +		InternalMapState<K, N, UK, TtlValue<UV>> original,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<Map<UK, UV>> valueSerializer) {
    +		super(original, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public UV get(UK key) throws Exception {
    +		return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
    +	}
    +
    +	@Override
    +	public void put(UK key, UV value) throws Exception {
    +		original.put(key, wrapWithTs(value));
    +	}
    +
    +	@Override
    +	public void putAll(Map<UK, UV> map) throws Exception {
    +		if (map == null) {
    +			return;
    +		}
    +		Map<UK, TtlValue<UV>> ttlMap = map.entrySet().stream()
    +			.collect(Collectors.toMap(Map.Entry::getKey, e -> wrapWithTs(e.getValue())));
    +		original.putAll(ttlMap);
    +	}
    +
    +	@Override
    +	public void remove(UK key) throws Exception {
    +		original.remove(key);
    +	}
    +
    +	@Override
    +	public boolean contains(UK key) throws Exception {
    +		return get(key) != null;
    +	}
    +
    +	@Override
    +	public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
    +		return entriesStream()::iterator;
    +	}
    +
    +	private Stream<Map.Entry<UK, UV>> entriesStream() throws Exception {
    +		Iterable<Map.Entry<UK, TtlValue<UV>>> withTs = original.entries();
    +		withTs = withTs == null ? Collections.emptyList() : withTs;
    +		return StreamSupport
    +			.stream(withTs.spliterator(), false)
    --- End diff --
    
    I saw that some of the classes make heavy use of streams by getting spliterators from collections. While the code is concise, this creates some default adapter from iterator to spliterator under the hood. IIRC this can have significant performance impact, especially if used with hot code paths. State access in Flink can be considered a hot path for some cases. It is hard to quantify the impact just from looking at it, but when using this kind of api adapters in "low-level" classes, please be aware of the potential impact. We might want to have a look at the performance and tune if needed. Should be ok for now because there is no regression in existing code, but we might want to measure this for heap based state eventually.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197001339
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalAggregatingState;
    +
    +import java.util.Collection;
    +
    +/**
    + * This class wraps aggregating state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <IN> Type of the value added to the state
    + * @param <ACC> The type of the accumulator (intermediate aggregate state).
    + * @param <OUT> Type of the value extracted from the state
    + *
    + */
    +class TtlAggregatingState<K, N, IN, ACC, OUT>
    +	extends AbstractTtlState<K, N, ACC, TtlValue<ACC>, InternalAggregatingState<K, N, IN, TtlValue<ACC>, OUT>>
    +	implements InternalAggregatingState<K, N, IN, ACC, OUT> {
    +
    +	TtlAggregatingState(
    +		InternalAggregatingState<K, N, IN, TtlValue<ACC>, OUT> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<ACC> valueSerializer,
    +		TtlAggregateFunction<IN, ACC, OUT> aggregateFunction) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +		aggregateFunction.stateClear = originalState::clear;
    +		aggregateFunction.updater = originalState::updateInternal;
    +	}
    +
    +	@Override
    +	public OUT get() throws Exception {
    +		return original.get();
    +	}
    +
    +	@Override
    +	public void add(IN value) throws Exception {
    +		original.add(value);
    +	}
    +
    +	@Override
    +	public void clear() {
    +		original.clear();
    +	}
    +
    +	@Override
    +	public ACC getInternal() throws Exception {
    +		return getWithTtlCheckAndUpdate(original::getInternal, original::updateInternal);
    +	}
    +
    +	@Override
    +	public void updateInternal(ACC valueToStore) throws Exception {
    +		original.updateInternal(wrapWithTs(valueToStore));
    +	}
    +
    +	@Override
    +	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
    +		original.mergeNamespaces(target, sources);
    --- End diff --
    
    Should we also do the TTL check for original.mergeNamespaces()? Since we need to query the state when merging namespaces.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197457017
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +import org.apache.flink.util.function.ThrowingRunnable;
    +
    +/**
    + * Base class for TTL logic wrappers.
    + *
    + * @param <T> Type of originally wrapped object
    + */
    +abstract class AbstractTtlDecorator<T> {
    +	final T original;
    +	final TtlConfig config;
    +	final TtlTimeProvider timeProvider;
    +	final boolean updateTsOnRead;
    +	final boolean returnExpired;
    +	final long ttl;
    +
    +	AbstractTtlDecorator(
    +		T original,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider) {
    +		Preconditions.checkNotNull(original);
    +		Preconditions.checkNotNull(config);
    +		Preconditions.checkNotNull(timeProvider);
    +		Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled,
    +			"State does not need to be wrapped with TTL if it is configured as disabled.");
    +		this.original = original;
    +		this.config = config;
    +		this.timeProvider = timeProvider;
    +		this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite;
    +		this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed;
    +		this.ttl = config.getTtl().toMilliseconds();
    +	}
    +
    +	<V> V getUnexpried(TtlValue<V> ttlValue) {
    --- End diff --
    
    typo in method name


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r196826339
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps list state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user entry value of state with TTL
    + */
    +class TtlListState<K, N, T> extends
    +	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
    +	implements InternalListState<K, N, T> {
    +	TtlListState(
    +		InternalListState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<List<T>> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public void update(List<T> values) throws Exception {
    +		updateInternal(values);
    +	}
    +
    +	@Override
    +	public void addAll(List<T> values) throws Exception {
    +		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
    +		original.addAll(withTs(values));
    +	}
    +
    +	@Override
    +	public Iterable<T> get() throws Exception {
    +		Iterable<TtlValue<T>> ttlValue = original.get();
    +		ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
    +		if (updateTsOnRead) {
    +			List<TtlValue<T>> collected = collect(ttlValue);
    +			ttlValue = collected;
    +			updateTs(collected);
    +		}
    +		final Iterable<TtlValue<T>> finalResult = ttlValue;
    --- End diff --
    
    The `ttlValue` is changed before the lambda from return statement so it is not effectively immutable any more to be used in lambda, that is why `finalResult` is formally needed to avoid compilation error.


---

[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/6186
  
    I'm still a bit worried about the time-align problem on recovery(because I've met serval case that would become disaster on production if we don't do the time-align on recovery. (One instance: we used the DFS to store the checkpoint data, and the DFS went into safe-mode because of some problems, we took several hours to notice that and also took some times to address the issue. After addressing DFS's issue, user's jobs were resumed and begin to run correctly. In this case, if we don't do the time-align on recovery, then user's state maybe already totally expired(when TTL <= the `system down time`)).
    
    I had a second thought on this problem, and I think maybe we could do that without a full scanning of the records, the approach is outlined below.
    
    - 1. We need to remember the timestamp when performing the checkpoint, let's say it `checkpoint_ts`.
    - 2. We also need to remember the timestamp when recovering from the checkpoint, let's say it `recovery_ts`.
    - 3. For each record, we remember it's last update timestamp, let's say it `update_ts`.
    - 5. And the current time stamp is `current_ts`.
    - 4. Then we could use the follow condition `checkpoint_ts - update_ts + current_s - recovery_ts >= TTL` to check whether the record is expired. If it's true then record is expired, otherwise the record is still alive.
    
    What do you think? @azagrebin ,  and @StefanRRichter would be really nice to learn your opinion about this problem.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r196786370
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +import org.apache.flink.util.function.ThrowingRunnable;
    +
    +/**
    + * Base class for TTL logic wrappers.
    + *
    + * @param <T> Type of originally wrapped object
    + */
    +abstract class AbstractTtlDecorator<T> {
    +	final T original;
    +	final TtlConfig config;
    +	final TtlTimeProvider timeProvider;
    +	final boolean updateTsOnRead;
    +	final boolean returnExpired;
    +
    +	AbstractTtlDecorator(
    +		T original,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider) {
    +		Preconditions.checkNotNull(original);
    +		Preconditions.checkNotNull(config);
    +		Preconditions.checkNotNull(timeProvider);
    +		Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled,
    +			"State does not need to be wrapped with TTL if it is configured as disabled.");
    +		this.original = original;
    +		this.config = config;
    +		this.timeProvider = timeProvider;
    +		this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite;
    +		this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed;
    +	}
    +
    +	<V> V getUnexpried(TtlValue<V> ttlValue) {
    +		return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue();
    +	}
    +
    +	<V> boolean expired(TtlValue<V> ttlValue) {
    +		return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp();
    --- End diff --
    
    Does it make sense to never expire the value when the `ttValue.getExpirationTimestamp()` return negative?


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197111980
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
    @@ -0,0 +1,47 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This class wraps user value of state with TTL.
    + *
    + * @param <T> Type of the user value of state with TTL
    + */
    +class TtlValue<T> implements Serializable {
    +	private final T userValue;
    +	private final long expirationTimestamp;
    --- End diff --
    
    I'm really not sure whether we should leave it for now. If we leave it for now, then it will be a headache problem on practical production.
    
    As a very common situation there is a job, which reading data from kafka, and the user set the `TTL = 2hours` because he thinks that the data's latency is absolute less than 2 hours, this way they can use the TTL to safely control the whole state size, and got a exactly result. But, if he found that the job need to scale up, then he need to trigger a savepoint and rescale the job from it. but what if there's some problems that stop he recovering the job from the savepoint in a very short time, let's say he will took 30min to recover the job, then the result become inaccuracy.
    
    Even the user never need to trigger a savepoint for any reason, what if the job means some problem(maybe some problem with some machine) and loop in "failed-restart-failed-..", after 2 hours we fixed the problem and the job automatically resume, but the state has all been expired. I think this is a disaster for the user.
    
    Yes, when using the `EventTime` people this problem won't help, but the `ProccessTime` is a very common use case(In our production, most of the job's TimeCharacter is `ProccessTime`).
    
    I know Flink's TimeService also didn't do the time align works on recovery, but state's TTL is a bit different with Timer. When registering a timer, what users offer to the API is a absolute time, but when setting the TTL, what users offer is just a relative time, it's us that convert the relative time to a absolute time to implement the TTL.



---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197114442
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
    @@ -0,0 +1,47 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This class wraps user value of state with TTL.
    + *
    + * @param <T> Type of the user value of state with TTL
    + */
    +class TtlValue<T> implements Serializable {
    +	private final T userValue;
    +	private final long expirationTimestamp;
    --- End diff --
    
    Additional, If we won't do the time align works on recovery, then what is the safe `TTL` value we should set for the a job? (this is the question that the users always ask us when they trying to use the `TTL`(we implemented it in a hacking way based on `TtlDB`) to control the state's size)


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197192870
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java ---
    @@ -0,0 +1,65 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * Configuration of state TTL logic.
    + * TODO: builder
    + */
    +public class TtlConfig {
    +	private final TtlUpdateType ttlUpdateType;
    +	private final TtlStateVisibility stateVisibility;
    +	private final TtlTimeCharacteristic timeCharacteristic;
    +	private final Time ttl;
    +
    +	public TtlConfig(
    +		TtlUpdateType ttlUpdateType,
    +		TtlStateVisibility stateVisibility,
    +		TtlTimeCharacteristic timeCharacteristic,
    +		Time ttl) {
    +		Preconditions.checkNotNull(ttlUpdateType);
    +		Preconditions.checkNotNull(stateVisibility);
    +		Preconditions.checkNotNull(timeCharacteristic);
    +		Preconditions.checkArgument(ttl.toMilliseconds() >= 0,
    --- End diff --
    
    Maybe we should pre check the `ttl` is not null, and I wonder does the `ttl.toMilliseconds() == 0` would make any sense?


---

[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6186
  
    @sihuazhou I think your concern and suggestion can clearly make sense for some cases. However I don't think it should be a general default as well because I can also find cases where this is not what a user might want. For example, TTL could be used for compliance with some user data privacy law. In this case, the law does not care about technical details like restores but only about wall-clock time, e.g. from the last user interaction. 
    As this is a new feature, there is also no regression and users can still go for a custom timer-based solution as alternative. I agree for some cases it makes sense to have this timeshift and also your outlined approach can make sense. However, I think this does not have to block this PR, because I would consider it a feature/improvement on top of this work. We could still target to have your suggested followup before the release.


---

[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on the issue:

    https://github.com/apache/flink/pull/6186
  
    Hi @Aitozi, the main issue FLINK-9510 contains a design doc for this effort with roadmap


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197475873
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUpdateType.java ---
    @@ -0,0 +1,31 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +/**
    + * This option value configures when to prolong state TTL.
    + */
    +public enum TtlUpdateType {
    +	/** TTL is disabled. State does not expire. */
    +	Disabled,
    --- End diff --
    
    Seems like this is never needed? Why would somebody register TTL state and then declare it disabled?


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r201941972
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps list state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user entry value of state with TTL
    + */
    +class TtlListState<K, N, T> extends
    +	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
    +	implements InternalListState<K, N, T> {
    +	TtlListState(
    +		InternalListState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<List<T>> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public void update(List<T> values) throws Exception {
    +		updateInternal(values);
    +	}
    +
    +	@Override
    +	public void addAll(List<T> values) throws Exception {
    +		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
    +		original.addAll(withTs(values));
    +	}
    +
    +	@Override
    +	public Iterable<T> get() throws Exception {
    +		Iterable<TtlValue<T>> ttlValue = original.get();
    +		ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
    +		if (updateTsOnRead) {
    +			List<TtlValue<T>> collected = collect(ttlValue);
    +			ttlValue = collected;
    +			updateTs(collected);
    +		}
    +		final Iterable<TtlValue<T>> finalResult = ttlValue;
    +		return () -> new IteratorWithCleanup(finalResult.iterator());
    +	}
    +
    +	private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
    +		List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
    +			.filter(v -> !expired(v))
    +			.map(this::rewrapWithNewTs)
    +			.collect(Collectors.toList());
    +		if (!unexpiredWithUpdatedTs.isEmpty()) {
    +			original.update(unexpiredWithUpdatedTs);
    +		}
    +	}
    +
    +	@Override
    +	public void add(T value) throws Exception {
    +		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
    +		original.add(wrapWithTs(value));
    +	}
    +
    +	@Override
    +	public void clear() {
    +		original.clear();
    +	}
    +
    +	@Override
    +	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
    +		original.mergeNamespaces(target, sources);
    +	}
    +
    +	@Override
    +	public List<T> getInternal() throws Exception {
    +		return collect(get());
    +	}
    +
    +	private <E> List<E> collect(Iterable<E> iterable) {
    --- End diff --
    
    Hi @Aitozi, in case of current implementation of list state in Rocksdb you are right. But e.g. there was an effort to make lists scalable like maps in rocksdb, it could be lazy in this case. 
    
    This TTL implementation does not make any assumptions of underlying state backend. The generic state user API returns `Iterable` which in general can be lazy so TTL wrapper tries to keep it lazy where it is possible.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r196820474
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java ---
    @@ -0,0 +1,98 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +import org.apache.flink.util.function.ThrowingRunnable;
    +
    +/**
    + * Base class for TTL logic wrappers.
    + *
    + * @param <T> Type of originally wrapped object
    + */
    +abstract class AbstractTtlDecorator<T> {
    +	final T original;
    +	final TtlConfig config;
    +	final TtlTimeProvider timeProvider;
    +	final boolean updateTsOnRead;
    +	final boolean returnExpired;
    +
    +	AbstractTtlDecorator(
    +		T original,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider) {
    +		Preconditions.checkNotNull(original);
    +		Preconditions.checkNotNull(config);
    +		Preconditions.checkNotNull(timeProvider);
    +		Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled,
    +			"State does not need to be wrapped with TTL if it is configured as disabled.");
    +		this.original = original;
    +		this.config = config;
    +		this.timeProvider = timeProvider;
    +		this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite;
    +		this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed;
    +	}
    +
    +	<V> V getUnexpried(TtlValue<V> ttlValue) {
    +		return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue();
    +	}
    +
    +	<V> boolean expired(TtlValue<V> ttlValue) {
    +		return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp();
    +	}
    +
    +	<V> TtlValue<V> wrapWithTs(V value) {
    +		return wrapWithTs(value, newExpirationTimestamp());
    +	}
    +
    +	static <V> TtlValue<V> wrapWithTs(V value, long ts) {
    +		return value == null ? null : new TtlValue<>(value, ts);
    +	}
    +
    +	<V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) {
    +		return wrapWithTs(ttlValue.getUserValue());
    +	}
    +
    +	private long newExpirationTimestamp() {
    +		long currentTs = timeProvider.currentTimestamp();
    +		long ttl = config.getTtl().toMilliseconds();
    --- End diff --
    
    This will be called a lot often, so does it make sense to introduce a field to remember the `config.getTtl().toMilliseconds()`?


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197719642
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/** Test suite for {@link TtlListState}. */
    +public class TtlListStateTest extends TtlStateTestBase<TtlListState<?, ?, Integer>, List<Integer>, Iterable<Integer>> {
    +	@Override
    +	TtlListState<?, ?, Integer> createState() {
    +		return new TtlListState<>(new MockInternalListState<>(), ttlConfig, timeProvider, null);
    +	}
    +
    +	@Override
    +	void initTestValues() {
    +		updater = v -> ttlState.addAll(v);
    +		getter = () -> StreamSupport.stream(ttlState.get().spliterator(), false).collect(Collectors.toList());
    +		originalGetter = () -> ttlState.original.get();
    +
    +		emptyValue = Collections.emptyList();
    +
    +		updateValue1 = Arrays.asList(5, 7, 10);
    +		updateValue2 = Arrays.asList(8, 9, 11);
    +
    +		getValue1 = updateValue1;
    +		getValue2 = updateValue2;
    +	}
    +
    +	private static class MockInternalListState<K, N, T>
    +		extends MockInternalKvState<K, N, List<T>>
    +		implements InternalListState<K, N, T> {
    +
    +		MockInternalListState() {
    +			value = new ArrayList<>();
    +		}
    +
    +		@Override
    +		public void update(List<T> elements) {
    +			updateInternal(elements);
    +		}
    +
    +		@Override
    +		public void addAll(List<T> elements) {
    +			value.addAll(elements);
    +		}
    +
    +		@Override
    +		public void mergeNamespaces(N target, Collection<N> sources) {
    --- End diff --
    
    Wouldn't it make sense to check that merging namespaces also works correctly with the TTL state and to fix validate the contract what has to happen to the timestamps in this case?


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r198751723
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalMapState;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.AbstractMap;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps map state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <UK> Type of the user entry key of state with TTL
    + * @param <UV> Type of the user entry value of state with TTL
    + */
    +class TtlMapState<K, N, UK, UV>
    +	extends AbstractTtlState<K, N, Map<UK, UV>, Map<UK, TtlValue<UV>>, InternalMapState<K, N, UK, TtlValue<UV>>>
    +	implements InternalMapState<K, N, UK, UV> {
    +	TtlMapState(
    +		InternalMapState<K, N, UK, TtlValue<UV>> original,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<Map<UK, UV>> valueSerializer) {
    +		super(original, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public UV get(UK key) throws Exception {
    +		return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
    +	}
    +
    +	@Override
    +	public void put(UK key, UV value) throws Exception {
    +		original.put(key, wrapWithTs(value));
    +	}
    +
    +	@Override
    +	public void putAll(Map<UK, UV> map) throws Exception {
    +		if (map == null) {
    +			return;
    +		}
    +		Map<UK, TtlValue<UV>> ttlMap = map.entrySet().stream()
    +			.collect(Collectors.toMap(Map.Entry::getKey, e -> wrapWithTs(e.getValue())));
    +		original.putAll(ttlMap);
    +	}
    +
    +	@Override
    +	public void remove(UK key) throws Exception {
    +		original.remove(key);
    +	}
    +
    +	@Override
    +	public boolean contains(UK key) throws Exception {
    +		return get(key) != null;
    +	}
    +
    +	@Override
    +	public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
    +		return entriesStream()::iterator;
    +	}
    +
    +	private Stream<Map.Entry<UK, UV>> entriesStream() throws Exception {
    +		Iterable<Map.Entry<UK, TtlValue<UV>>> withTs = original.entries();
    +		withTs = withTs == null ? Collections.emptyList() : withTs;
    +		return StreamSupport
    +			.stream(withTs.spliterator(), false)
    --- End diff --
    
    As I understand, it depends on use case. If it is parallelizable, lazy operations over big collection like filter and map over lists, stream will give boost over loops but for short collections or non-parallelizable spliterators the overhead kills the performance. Though, it might be hard to predict the type of used spliterator. I agree the real benchmarking should be done to make sure.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197475080
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUpdateType.java ---
    @@ -0,0 +1,31 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +/**
    + * This option value configures when to prolong state TTL.
    + */
    +public enum TtlUpdateType {
    --- End diff --
    
    Why not better declare the enums in the `TtlConfig` class where they belong to?


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r198757244
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java ---
    @@ -0,0 +1,43 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.functions.FoldFunction;
    +
    +/**
    + * This class wraps folding function with TTL logic.
    + *
    + * @param <T> Type of the values folded into the state
    + * @param <ACC> Type of the value in the state
    + *
    + * @deprecated use {@link TtlAggregateFunction} instead
    + */
    +@Deprecated
    +class TtlFoldFunction<T, ACC>
    +	extends AbstractTtlDecorator<FoldFunction<T, ACC>>
    +	implements FoldFunction<T, TtlValue<ACC>> {
    +	TtlFoldFunction(FoldFunction<T, ACC> original, TtlConfig config, TtlTimeProvider timeProvider) {
    +		super(original, config, timeProvider);
    +	}
    +
    +	@Override
    +	public TtlValue<ACC> fold(TtlValue<ACC> accumulator, T value) throws Exception {
    +		return wrapWithTs(original.fold(getUnexpried(accumulator), value));
    --- End diff --
    
    As I understand, the wrapped states should already provide the default values. My idea was to wrap the original default value [in TTL factory](https://github.com/apache/flink/pull/6196/commits/99994dedb9a20244a2addd337617778b17fe8349#diff-13011fbe7c28b56b994783572b461aaeR174) with expiration timestamp `Long.MAX_VALUE`, basically never expiring. Good point about test cases for it, I will add them for appending states.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197474567
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java ---
    @@ -0,0 +1,66 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * Configuration of state TTL logic.
    + * TODO: builder
    --- End diff --
    
    Was this forgotten or planned for a later commit?


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197188305
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
    @@ -0,0 +1,47 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This class wraps user value of state with TTL.
    + *
    + * @param <T> Type of the user value of state with TTL
    + */
    +class TtlValue<T> implements Serializable {
    +	private final T userValue;
    +	private final long expirationTimestamp;
    --- End diff --
    
    Okay, I see this is tricky, I agree that this should be addressed in another PR. We need to figure out a proper way to do that.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197460522
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +import org.apache.flink.util.function.ThrowingRunnable;
    +
    +/**
    + * Base class for TTL logic wrappers.
    + *
    + * @param <T> Type of originally wrapped object
    + */
    +abstract class AbstractTtlDecorator<T> {
    +	final T original;
    +	final TtlConfig config;
    +	final TtlTimeProvider timeProvider;
    +	final boolean updateTsOnRead;
    +	final boolean returnExpired;
    +	final long ttl;
    +
    +	AbstractTtlDecorator(
    +		T original,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider) {
    +		Preconditions.checkNotNull(original);
    +		Preconditions.checkNotNull(config);
    +		Preconditions.checkNotNull(timeProvider);
    +		Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled,
    +			"State does not need to be wrapped with TTL if it is configured as disabled.");
    +		this.original = original;
    +		this.config = config;
    +		this.timeProvider = timeProvider;
    +		this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite;
    +		this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed;
    +		this.ttl = config.getTtl().toMilliseconds();
    +	}
    +
    +	<V> V getUnexpried(TtlValue<V> ttlValue) {
    +		return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue();
    +	}
    +
    +	<V> boolean expired(TtlValue<V> ttlValue) {
    +		return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp();
    +	}
    +
    +	<V> TtlValue<V> wrapWithTs(V value) {
    --- End diff --
    
    Better `wrapWithTtl`


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r198439148
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java ---
    @@ -0,0 +1,29 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +/**
    + * This option configures whether to return expired user value or not.
    + */
    +public enum TtlStateVisibility {
    +	/** Return still available expired user value (not yet cleaned up). */
    +	Relaxed,
    +	/** Hide expired user value and behave as if it does not exist any more. */
    +	Exact
    --- End diff --
    
    I think I will also rename enum entries.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r201895445
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps list state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user entry value of state with TTL
    + */
    +class TtlListState<K, N, T> extends
    +	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
    +	implements InternalListState<K, N, T> {
    +	TtlListState(
    +		InternalListState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<List<T>> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public void update(List<T> values) throws Exception {
    +		updateInternal(values);
    +	}
    +
    +	@Override
    +	public void addAll(List<T> values) throws Exception {
    +		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
    +		original.addAll(withTs(values));
    +	}
    +
    +	@Override
    +	public Iterable<T> get() throws Exception {
    +		Iterable<TtlValue<T>> ttlValue = original.get();
    +		ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
    +		if (updateTsOnRead) {
    +			List<TtlValue<T>> collected = collect(ttlValue);
    +			ttlValue = collected;
    +			updateTs(collected);
    +		}
    +		final Iterable<TtlValue<T>> finalResult = ttlValue;
    +		return () -> new IteratorWithCleanup(finalResult.iterator());
    +	}
    +
    +	private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
    +		List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
    +			.filter(v -> !expired(v))
    +			.map(this::rewrapWithNewTs)
    +			.collect(Collectors.toList());
    +		if (!unexpiredWithUpdatedTs.isEmpty()) {
    +			original.update(unexpiredWithUpdatedTs);
    +		}
    +	}
    +
    +	@Override
    +	public void add(T value) throws Exception {
    +		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
    +		original.add(wrapWithTs(value));
    +	}
    +
    +	@Override
    +	public void clear() {
    +		original.clear();
    +	}
    +
    +	@Override
    +	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
    +		original.mergeNamespaces(target, sources);
    +	}
    +
    +	@Override
    +	public List<T> getInternal() throws Exception {
    +		return collect(get());
    +	}
    +
    +	private <E> List<E> collect(Iterable<E> iterable) {
    --- End diff --
    
    Hi @azagrebin , little doubt that you say the  
    
    > return Iterable and avoid querying backend if not needed
    
    But when deal with the ListState the `original.get()` has already query the original `Iterable` from RocksDB doesn't it ? Is this way just lazy query the iterable element in memory?


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197719310
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +abstract class TtlStateTestBase<S, UV, GV> {
    --- End diff --
    
    I think it would make sense to extend this general test a bit to consider multiple keys and namespaces. Ideally, a test should really test the full contract specification of the tested subject. What is mean is, you could currently pass this test even if TTL would accidentally clear all states on the timeout of one state, or maybe clear all the states in the same namespace. The mock states can easily be extended to truly scope values by key and namespace. Then the test can, for example, create two keys in the same namespace and two keys in a different namespace and check that their timeouts are isolated from each other and the interaction works as expected,


---

[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/6186
  
    @StefanRRichter Thanks for you reply! I think that makes sense, there is still a workaround for the user to go. `+1` to implement the current approach firstly.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r196816259
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps list state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user entry value of state with TTL
    + */
    +class TtlListState<K, N, T> extends
    +	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
    +	implements InternalListState<K, N, T> {
    +	TtlListState(
    +		InternalListState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<List<T>> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public void update(List<T> values) throws Exception {
    +		updateInternal(values);
    +	}
    +
    +	@Override
    +	public void addAll(List<T> values) throws Exception {
    +		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
    +		original.addAll(withTs(values));
    +	}
    +
    +	@Override
    +	public Iterable<T> get() throws Exception {
    +		Iterable<TtlValue<T>> ttlValue = original.get();
    +		ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
    +		if (updateTsOnRead) {
    +			List<TtlValue<T>> collected = collect(ttlValue);
    +			ttlValue = collected;
    +			updateTs(collected);
    +		}
    +		final Iterable<TtlValue<T>> finalResult = ttlValue;
    +		return () -> new IteratorWithCleanup(finalResult.iterator());
    +	}
    +
    +	private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
    +		List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
    +			.filter(v -> !expired(v))
    +			.map(this::rewrapWithNewTs)
    +			.collect(Collectors.toList());
    +		if (!unexpiredWithUpdatedTs.isEmpty()) {
    +			original.update(unexpiredWithUpdatedTs);
    +		}
    +	}
    +
    +	@Override
    +	public void add(T value) throws Exception {
    +		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
    +		original.add(wrapWithTs(value));
    +	}
    +
    +	@Override
    +	public void clear() {
    +		original.clear();
    +	}
    +
    +	@Override
    +	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
    +		original.mergeNamespaces(target, sources);
    +	}
    +
    +	@Override
    +	public List<T> getInternal() throws Exception {
    +		return collect(get());
    +	}
    +
    +	private <E> List<E> collect(Iterable<E> iterable) {
    +		return StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
    +	}
    +
    +	@Override
    +	public void updateInternal(List<T> valueToStore) throws Exception {
    +		Preconditions.checkNotNull(valueToStore, "List of values to update cannot be null.");
    +		original.addAll(withTs(valueToStore));
    --- End diff --
    
    This seems to miss a `clear()`.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197458219
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +import org.apache.flink.util.function.ThrowingRunnable;
    +
    +/**
    + * Base class for TTL logic wrappers.
    + *
    + * @param <T> Type of originally wrapped object
    + */
    +abstract class AbstractTtlDecorator<T> {
    +	final T original;
    +	final TtlConfig config;
    +	final TtlTimeProvider timeProvider;
    +	final boolean updateTsOnRead;
    +	final boolean returnExpired;
    --- End diff --
    
    A bit more field comments might be good, especially about the exact meanings of those boolean flags. For example, it is not super obvious what `returnExpired` means for the code.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197498126
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalMapState;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.AbstractMap;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps map state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <UK> Type of the user entry key of state with TTL
    + * @param <UV> Type of the user entry value of state with TTL
    + */
    +class TtlMapState<K, N, UK, UV>
    +	extends AbstractTtlState<K, N, Map<UK, UV>, Map<UK, TtlValue<UV>>, InternalMapState<K, N, UK, TtlValue<UV>>>
    +	implements InternalMapState<K, N, UK, UV> {
    +	TtlMapState(
    +		InternalMapState<K, N, UK, TtlValue<UV>> original,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<Map<UK, UV>> valueSerializer) {
    +		super(original, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public UV get(UK key) throws Exception {
    +		return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
    +	}
    +
    +	@Override
    +	public void put(UK key, UV value) throws Exception {
    +		original.put(key, wrapWithTs(value));
    +	}
    +
    +	@Override
    +	public void putAll(Map<UK, UV> map) throws Exception {
    +		if (map == null) {
    +			return;
    +		}
    +		Map<UK, TtlValue<UV>> ttlMap = map.entrySet().stream()
    +			.collect(Collectors.toMap(Map.Entry::getKey, e -> wrapWithTs(e.getValue())));
    +		original.putAll(ttlMap);
    +	}
    +
    +	@Override
    +	public void remove(UK key) throws Exception {
    +		original.remove(key);
    +	}
    +
    +	@Override
    +	public boolean contains(UK key) throws Exception {
    +		return get(key) != null;
    +	}
    +
    +	@Override
    +	public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
    +		return entriesStream()::iterator;
    +	}
    +
    +	private Stream<Map.Entry<UK, UV>> entriesStream() throws Exception {
    +		Iterable<Map.Entry<UK, TtlValue<UV>>> withTs = original.entries();
    +		withTs = withTs == null ? Collections.emptyList() : withTs;
    +		return StreamSupport
    +			.stream(withTs.spliterator(), false)
    +			.filter(this::unexpiredAndUpdateOrCleanup)
    +			.map(TtlMapState::dropTs);
    +	}
    +
    +	private boolean unexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>> e) {
    +		UV unexpiredValue;
    +		try {
    +			unexpiredValue = getWithTtlCheckAndUpdate(
    +				e::getValue,
    +				v -> original.put(e.getKey(), v),
    +				() -> original.remove(e.getKey()));
    +		} catch (Exception ex) {
    +			throw new FlinkRuntimeException(ex);
    +		}
    +		return unexpiredValue != null;
    +	}
    +
    +	private static <UK, UV> Map.Entry<UK, UV> dropTs(Map.Entry<UK, TtlValue<UV>> e) {
    --- End diff --
    
    Again, to keep it more clear I would spell out `dropTs` as `unwrapTtlState` or something similar.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r196996839
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps list state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user entry value of state with TTL
    + */
    +class TtlListState<K, N, T> extends
    +	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
    +	implements InternalListState<K, N, T> {
    +	TtlListState(
    +		InternalListState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<List<T>> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public void update(List<T> values) throws Exception {
    +		updateInternal(values);
    +	}
    +
    +	@Override
    +	public void addAll(List<T> values) throws Exception {
    +		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
    +		original.addAll(withTs(values));
    +	}
    +
    +	@Override
    +	public Iterable<T> get() throws Exception {
    +		Iterable<TtlValue<T>> ttlValue = original.get();
    +		ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
    +		if (updateTsOnRead) {
    +			List<TtlValue<T>> collected = collect(ttlValue);
    --- End diff --
    
    In this block, we need to iterate the `ttlValue` twice, one for `collect()` and one for `updateTs()`. If we could make the updateTs to accept `Iterable` as the argument, then we can avoiding the `collect()` here, this way we only need to iterate the `ttlValue` once.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r196809755
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +import org.apache.flink.util.function.ThrowingRunnable;
    +
    +/**
    + * Base class for TTL logic wrappers.
    + *
    + * @param <T> Type of originally wrapped object
    + */
    +abstract class AbstractTtlDecorator<T> {
    +	final T original;
    +	final TtlConfig config;
    +	final TtlTimeProvider timeProvider;
    +	final boolean updateTsOnRead;
    +	final boolean returnExpired;
    +
    +	AbstractTtlDecorator(
    +		T original,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider) {
    +		Preconditions.checkNotNull(original);
    +		Preconditions.checkNotNull(config);
    +		Preconditions.checkNotNull(timeProvider);
    +		Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled,
    +			"State does not need to be wrapped with TTL if it is configured as disabled.");
    +		this.original = original;
    +		this.config = config;
    +		this.timeProvider = timeProvider;
    +		this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite;
    +		this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed;
    +	}
    +
    +	<V> V getUnexpried(TtlValue<V> ttlValue) {
    +		return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue();
    +	}
    +
    +	<V> boolean expired(TtlValue<V> ttlValue) {
    +		return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp();
    +	}
    +
    +	<V> TtlValue<V> wrapWithTs(V value) {
    +		return wrapWithTs(value, newExpirationTimestamp());
    +	}
    +
    +	static <V> TtlValue<V> wrapWithTs(V value, long ts) {
    +		return value == null ? null : new TtlValue<>(value, ts);
    +	}
    +
    +	<V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) {
    +		return wrapWithTs(ttlValue.getUserValue());
    +	}
    +
    +	private long newExpirationTimestamp() {
    +		return timeProvider.currentTimestamp() + config.getTtl().toMilliseconds();
    --- End diff --
    
    This will be called a lot often, so does it make sense to introduce a field to remember the `config.getTtl().toMilliseconds()`?


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r196819320
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +import org.apache.flink.util.function.ThrowingRunnable;
    +
    +/**
    + * Base class for TTL logic wrappers.
    + *
    + * @param <T> Type of originally wrapped object
    + */
    +abstract class AbstractTtlDecorator<T> {
    +	final T original;
    +	final TtlConfig config;
    +	final TtlTimeProvider timeProvider;
    +	final boolean updateTsOnRead;
    +	final boolean returnExpired;
    +
    +	AbstractTtlDecorator(
    +		T original,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider) {
    +		Preconditions.checkNotNull(original);
    +		Preconditions.checkNotNull(config);
    +		Preconditions.checkNotNull(timeProvider);
    +		Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled,
    +			"State does not need to be wrapped with TTL if it is configured as disabled.");
    +		this.original = original;
    +		this.config = config;
    +		this.timeProvider = timeProvider;
    +		this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite;
    +		this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed;
    +	}
    +
    +	<V> V getUnexpried(TtlValue<V> ttlValue) {
    +		return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue();
    +	}
    +
    +	<V> boolean expired(TtlValue<V> ttlValue) {
    +		return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp();
    --- End diff --
    
    In general Flink allows to operate in negative range for event time, but the overflow in case of very big TTL should be checked. TTL makes sense only non-negative. I have added fix for it.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r198431042
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.ThrowingConsumer;
    +import org.apache.flink.util.function.ThrowingRunnable;
    +
    +/**
    + * Base class for TTL logic wrappers.
    + *
    + * @param <T> Type of originally wrapped object
    + */
    +abstract class AbstractTtlDecorator<T> {
    +	final T original;
    +	final TtlConfig config;
    +	final TtlTimeProvider timeProvider;
    +	final boolean updateTsOnRead;
    +	final boolean returnExpired;
    +	final long ttl;
    +
    +	AbstractTtlDecorator(
    +		T original,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider) {
    +		Preconditions.checkNotNull(original);
    +		Preconditions.checkNotNull(config);
    +		Preconditions.checkNotNull(timeProvider);
    +		Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled,
    +			"State does not need to be wrapped with TTL if it is configured as disabled.");
    +		this.original = original;
    +		this.config = config;
    +		this.timeProvider = timeProvider;
    +		this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite;
    +		this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed;
    +		this.ttl = config.getTtl().toMilliseconds();
    +	}
    +
    +	<V> V getUnexpried(TtlValue<V> ttlValue) {
    +		return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue();
    +	}
    +
    +	<V> boolean expired(TtlValue<V> ttlValue) {
    +		return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp();
    +	}
    +
    +	<V> TtlValue<V> wrapWithTs(V value) {
    --- End diff --
    
    It actually wraps with expiration timestamp, TTL is not stored with value.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197714098
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java ---
    @@ -0,0 +1,172 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps list state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user entry value of state with TTL
    + */
    +class TtlListState<K, N, T> extends
    +	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
    +	implements InternalListState<K, N, T> {
    +	TtlListState(
    +		InternalListState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<List<T>> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public void update(List<T> values) throws Exception {
    +		updateInternal(values);
    +	}
    +
    +	@Override
    +	public void addAll(List<T> values) throws Exception {
    +		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
    +		original.addAll(withTs(values));
    +	}
    +
    +	@Override
    +	public Iterable<T> get() throws Exception {
    --- End diff --
    
    Currently this could return `List<T>` instead of `Iterable<T>` and you might get around the instance of check.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r196995095
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * Configuration of state TTL logic.
    + * TODO: builder
    + */
    +public class TtlConfig {
    +	private final TtlUpdateType ttlUpdateType;
    +	private final TtlStateVisibility stateVisibility;
    +	private final TtlTimeCharacteristic timeCharacteristic;
    +	private final Time ttl;
    +
    +	public TtlConfig(
    +		TtlUpdateType ttlUpdateType,
    +		TtlStateVisibility stateVisibility,
    +		TtlTimeCharacteristic timeCharacteristic,
    +		Time ttl) {
    +		Preconditions.checkNotNull(ttlUpdateType);
    +		Preconditions.checkNotNull(stateVisibility);
    +		Preconditions.checkNotNull(timeCharacteristic);
    --- End diff --
    
    Maybe we should also check that the `ttl` is greater than 0?


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197110068
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps list state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user entry value of state with TTL
    + */
    +class TtlListState<K, N, T> extends
    +	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
    +	implements InternalListState<K, N, T> {
    +	TtlListState(
    +		InternalListState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<List<T>> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public void update(List<T> values) throws Exception {
    +		updateInternal(values);
    +	}
    +
    +	@Override
    +	public void addAll(List<T> values) throws Exception {
    +		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
    +		original.addAll(withTs(values));
    +	}
    +
    +	@Override
    +	public Iterable<T> get() throws Exception {
    +		Iterable<TtlValue<T>> ttlValue = original.get();
    +		ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
    +		if (updateTsOnRead) {
    +			List<TtlValue<T>> collected = collect(ttlValue);
    +			ttlValue = collected;
    +			updateTs(collected);
    +		}
    +		final Iterable<TtlValue<T>> finalResult = ttlValue;
    +		return () -> new IteratorWithCleanup(finalResult.iterator());
    +	}
    +
    +	private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
    +		List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
    +			.filter(v -> !expired(v))
    +			.map(this::rewrapWithNewTs)
    +			.collect(Collectors.toList());
    +		if (!unexpiredWithUpdatedTs.isEmpty()) {
    +			original.update(unexpiredWithUpdatedTs);
    +		}
    +	}
    +
    +	@Override
    +	public void add(T value) throws Exception {
    +		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
    +		original.add(wrapWithTs(value));
    +	}
    +
    +	@Override
    +	public void clear() {
    +		original.clear();
    +	}
    +
    +	@Override
    +	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
    +		original.mergeNamespaces(target, sources);
    +	}
    +
    +	@Override
    +	public List<T> getInternal() throws Exception {
    +		return collect(get());
    +	}
    +
    +	private <E> List<E> collect(Iterable<E> iterable) {
    +		return StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
    +	}
    +
    +	@Override
    +	public void updateInternal(List<T> valueToStore) throws Exception {
    +		Preconditions.checkNotNull(valueToStore, "List of values to update cannot be null.");
    +		original.addAll(withTs(valueToStore));
    --- End diff --
    
    This should be `original.updateInternal(withTs(valueToStore));`, I will change it


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r199109435
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java ---
    @@ -0,0 +1,134 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalMapState;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.AbstractMap;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.stream.Stream;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps map state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <UK> Type of the user entry key of state with TTL
    + * @param <UV> Type of the user entry value of state with TTL
    + */
    +class TtlMapState<K, N, UK, UV>
    +	extends AbstractTtlState<K, N, Map<UK, UV>, Map<UK, TtlValue<UV>>, InternalMapState<K, N, UK, TtlValue<UV>>>
    +	implements InternalMapState<K, N, UK, UV> {
    +	TtlMapState(
    +		InternalMapState<K, N, UK, TtlValue<UV>> original,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<Map<UK, UV>> valueSerializer) {
    +		super(original, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public UV get(UK key) throws Exception {
    +		return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
    +	}
    +
    +	@Override
    +	public void put(UK key, UV value) throws Exception {
    +		original.put(key, wrapWithTs(value));
    +	}
    +
    +	@Override
    +	public void putAll(Map<UK, UV> map) throws Exception {
    +		if (map == null) {
    +			return;
    +		}
    +		Map<UK, TtlValue<UV>> ttlMap = new HashMap<>();
    --- End diff --
    
    We can already initialize the new map with `map.size()`.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r196817846
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalListState;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.stream.Collectors;
    +import java.util.stream.StreamSupport;
    +
    +/**
    + * This class wraps list state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <T> Type of the user entry value of state with TTL
    + */
    +class TtlListState<K, N, T> extends
    +	AbstractTtlState<K, N, List<T>, List<TtlValue<T>>, InternalListState<K, N, TtlValue<T>>>
    +	implements InternalListState<K, N, T> {
    +	TtlListState(
    +		InternalListState<K, N, TtlValue<T>> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<List<T>> valueSerializer) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +	}
    +
    +	@Override
    +	public void update(List<T> values) throws Exception {
    +		updateInternal(values);
    +	}
    +
    +	@Override
    +	public void addAll(List<T> values) throws Exception {
    +		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
    +		original.addAll(withTs(values));
    +	}
    +
    +	@Override
    +	public Iterable<T> get() throws Exception {
    +		Iterable<TtlValue<T>> ttlValue = original.get();
    +		ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
    +		if (updateTsOnRead) {
    +			List<TtlValue<T>> collected = collect(ttlValue);
    +			ttlValue = collected;
    +			updateTs(collected);
    +		}
    +		final Iterable<TtlValue<T>> finalResult = ttlValue;
    --- End diff --
    
    The var `finalResult` looks like redundant or I'm misunderstand.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197503773
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.internal.InternalAggregatingState;
    +
    +import java.util.Collection;
    +
    +/**
    + * This class wraps aggregating state with TTL logic.
    + *
    + * @param <K> The type of key the state is associated to
    + * @param <N> The type of the namespace
    + * @param <IN> Type of the value added to the state
    + * @param <ACC> The type of the accumulator (intermediate aggregate state).
    + * @param <OUT> Type of the value extracted from the state
    + *
    + */
    +class TtlAggregatingState<K, N, IN, ACC, OUT>
    +	extends AbstractTtlState<K, N, ACC, TtlValue<ACC>, InternalAggregatingState<K, N, IN, TtlValue<ACC>, OUT>>
    +	implements InternalAggregatingState<K, N, IN, ACC, OUT> {
    +
    +	TtlAggregatingState(
    +		InternalAggregatingState<K, N, IN, TtlValue<ACC>, OUT> originalState,
    +		TtlConfig config,
    +		TtlTimeProvider timeProvider,
    +		TypeSerializer<ACC> valueSerializer,
    +		TtlAggregateFunction<IN, ACC, OUT> aggregateFunction) {
    +		super(originalState, config, timeProvider, valueSerializer);
    +		aggregateFunction.stateClear = originalState::clear;
    --- End diff --
    
    Maybe it is better to pass a  `TtlAggregateFunctionBuilder` and then supply `stateClear` and `updater` before the object is created. I think then they can also become immutable. Similar changes in the other states.


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r197179238
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
    @@ -0,0 +1,47 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.Serializable;
    +
    +/**
    + * This class wraps user value of state with TTL.
    + *
    + * @param <T> Type of the user value of state with TTL
    + */
    +class TtlValue<T> implements Serializable {
    +	private final T userValue;
    +	private final long expirationTimestamp;
    --- End diff --
    
    This operation fits more for checkpoint full scan restoration with custom transformation of each state entry where expiration timestamp is optionally prolonged for downtime. The same as cleanup of expired state during full scan. I think it should be another issue and PR, because how can wrappers distinguish between old and new state before and after restart?


---

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6186#discussion_r196791555
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.util.Preconditions;
    +
    +/**
    + * Configuration of state TTL logic.
    + * TODO: builder
    + */
    +public class TtlConfig {
    +	private final TtlUpdateType ttlUpdateType;
    +	private final TtlStateVisibility stateVisibility;
    +	private final TtlTimeCharacteristic timeCharacteristic;
    +	private final Time ttl;
    +
    +	public TtlConfig(
    +		TtlUpdateType ttlUpdateType,
    +		TtlStateVisibility stateVisibility,
    +		TtlTimeCharacteristic timeCharacteristic,
    +		Time ttl) {
    +		Preconditions.checkNotNull(ttlUpdateType);
    +		Preconditions.checkNotNull(stateVisibility);
    +		Preconditions.checkNotNull(timeCharacteristic);
    --- End diff --
    
    Why not checking for `ttl`?


---