You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by azagrebin <gi...@git.apache.org> on 2018/06/21 13:34:59 UTC

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

GitHub user azagrebin opened a pull request:

    https://github.com/apache/flink/pull/6196

    [FLINK-9513] Implement TTL state wrappers factory and serializer for value with TTL

    ## What is the purpose of the change
    
    This PR introduces a state factory for wrapping state objects with TTL logic and serialiser of user value with expiration timestamp.
    NOTE: This PR is based on #6186 and only last commit makes difference with it and needs review.
    
    ## Brief change log
    
      - abstract state creation in backends with `KeyedStateFactory` interface
      - add `TtlStateFactory`
      - add `CompositeSerializer`
    
    ## Verifying this change
    
    This change is a trivial addition without any test coverage in this PR and should be covered together with TTL feature activation by final integration and e2e tests.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (yes)
      - The runtime per-record code paths (performance sensitive): (not yet)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (not yet)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (not applicable at the moment)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/azagrebin/flink FLINK-9513

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6196.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6196
    
----
commit 62faa8ee220c21fa824fec690073c27a0a994be5
Author: Andrey Zagrebin <az...@...>
Date:   2018-06-04T15:28:40Z

    [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

commit 74c689e1660d40176b3c131fb0f3f9dcafa33889
Author: Andrey Zagrebin <az...@...>
Date:   2018-06-20T15:05:28Z

    Check overflow in expiration timestamp, allow only non-negative TTL

commit 1164aa2a9c4298461eaa44322ef9cefa00b4f0fe
Author: Andrey Zagrebin <az...@...>
Date:   2018-06-21T12:24:04Z

    small fixes

commit 1d19d4ac2b73ac83290b4b117b82895c99b51865
Author: Andrey Zagrebin <az...@...>
Date:   2018-06-21T13:13:42Z

    Make AbstractTtlState.getSerializedValue() unsupported for now in case of queryable state

commit 99994dedb9a20244a2addd337617778b17fe8349
Author: Andrey Zagrebin <az...@...>
Date:   2018-06-11T17:34:47Z

    [FLINK-9513] Implement TTL state wrappers factory and serializer for value with TTL

----


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r198113175
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,204 @@
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a list of objects
    + *
    + * @param <T> type of custom serialized value
    + */
    +@SuppressWarnings("unchecked")
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private final List<TypeSerializer> originalSerializers;
    --- End diff --
    
    Why is this class using a lot of raw types instead of wildcards? E.g. why `List<TypeSerializer>`instead of `List<TypeSerializer<?>>`


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r197331145
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +public class TtlStateFactory {
    +	public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc,
    +		KeyedStateFactory originalStateFactory,
    +		TtlConfig ttlConfig,
    +		TtlTimeProvider timeProvider) throws Exception {
    +		return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
    +			originalStateFactory.createState(namespaceSerializer, stateDesc) :
    +			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
    +				.createState(namespaceSerializer, stateDesc);
    +	}
    +
    +	private final Map<Class<? extends StateDescriptor>, StateFactory> stateFactories;
    +
    +	private final KeyedStateFactory originalStateFactory;
    +	private final TtlConfig ttlConfig;
    +	private final TtlTimeProvider timeProvider;
    +
    +	private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
    --- End diff --
    
    Would be better to check the args are not null, or simply use the `@Nonnull` annotation.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199226833
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +public class TtlStateFactory {
    +	public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc,
    +		KeyedStateFactory originalStateFactory,
    +		TtlConfig ttlConfig,
    +		TtlTimeProvider timeProvider) throws Exception {
    +		return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
    +			originalStateFactory.createState(namespaceSerializer, stateDesc) :
    +			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
    +				.createState(namespaceSerializer, stateDesc);
    +	}
    +
    +	private final Map<Class<? extends StateDescriptor>, StateFactory> stateFactories;
    --- End diff --
    
    I think creating the map is not a big deal because it will happen on init without big pressure on GC. Also map is more declarative and flexible if new state needs to be added then just the map needs to be modified but not the method. Or if needed in future, it can be easier refactored to build the mapping somewhere else and inject it into factory.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r198015105
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,204 @@
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a list of objects
    + *
    + * @param <T> type of custom serialized value
    + */
    +@SuppressWarnings("unchecked")
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private final List<TypeSerializer> originalSerializers;
    +
    +	protected CompositeSerializer(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkNotNull(originalSerializers);
    +		this.originalSerializers = originalSerializers;
    +	}
    +
    +	protected abstract T composeValue(List values);
    +
    +	protected abstract List decomposeValue(T v);
    +
    +	protected abstract CompositeSerializer<T> createSerializerInstance(List<TypeSerializer> originalSerializers);
    +
    +	private T composeValueInternal(List values) {
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return composeValue(values);
    +	}
    +
    +	private List decomposeValueInternal(T v) {
    +		List values = decomposeValue(v);
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return values;
    +	}
    +
    +	private CompositeSerializer<T> createSerializerInstanceInternal(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size());
    +		return createSerializerInstance(originalSerializers);
    +	}
    +
    +	@Override
    +	public CompositeSerializer<T> duplicate() {
    +		return createSerializerInstanceInternal(originalSerializers.stream()
    +			.map(TypeSerializer::duplicate)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public boolean isImmutableType() {
    +		return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
    +	}
    +
    +	@Override
    +	public T createInstance() {
    +		return composeValueInternal(originalSerializers.stream()
    +			.map(TypeSerializer::createInstance)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from) {
    +		List originalValues = decomposeValueInternal(from);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i)))
    +				.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from, T reuse) {
    +		List originalFromValues = decomposeValueInternal(from);
    +		List originalReuseValues = decomposeValueInternal(reuse);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i)))
    +				.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public int getLength() {
    +		return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ?
    --- End diff --
    
    nit: if we don't use the high level API, we could do this in a single loop with a better performance.


---

[GitHub] flink issue #6196: [FLINK-9513] Implement TTL state wrappers factory and ser...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6196
  
    Had a few more comments, but they all are basically optimizations. I leave it up to you if you still want to address all or some of them. Please let me know. Otherwise, we can merge this. 👍 


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199474152
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a composite type using array of its field serializers.
    + * Fields are indexed the same way as their serializers.
    + *
    + * @param <T> type of custom serialized value
    + */
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private static final long serialVersionUID = 1L;
    +
    +	protected final TypeSerializer<Object>[] fieldSerializers;
    +	final boolean isImmutableTargetType;
    +	private final int length;
    +
    +	@SuppressWarnings("unchecked")
    +	protected CompositeSerializer(boolean isImmutableTargetType, TypeSerializer<?> ... fieldSerializers) {
    +		Preconditions.checkNotNull(fieldSerializers);
    +		Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
    +		this.isImmutableTargetType = isImmutableTargetType;
    +		this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
    +		this.length = calcLength();
    +	}
    +
    +	/** Create new instance from its fields.  */
    +	public abstract T createInstance(@Nonnull Object ... values);
    +
    +	/** Modify field of existing instance. Supported only by mutable types. */
    +	protected abstract void setField(@Nonnull T value, int index, Object fieldValue);
    +
    +	/** Get field of existing instance. */
    +	protected abstract Object getField(@Nonnull T value, int index);
    +
    +	/** Factory for concrete serializer. */
    +	protected abstract CompositeSerializer<T> createSerializerInstance(TypeSerializer<?> ... originalSerializers);
    +
    +	@Override
    +	public CompositeSerializer<T> duplicate() {
    +		TypeSerializer[] duplicatedSerializers = new TypeSerializer[fieldSerializers.length];
    +		boolean stateful = false;
    +		for (int index = 0; index < fieldSerializers.length; index++) {
    +			duplicatedSerializers[index] = fieldSerializers[index].duplicate();
    +			if (fieldSerializers[index] != duplicatedSerializers[index]) {
    --- End diff --
    
    I wonder if we need to do these checks every time `duplicate()` is called? We could check it once, remember if all field serializer are stateless and from that point return `this` immediately.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r198015848
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,204 @@
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a list of objects
    + *
    + * @param <T> type of custom serialized value
    + */
    +@SuppressWarnings("unchecked")
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private final List<TypeSerializer> originalSerializers;
    +
    +	protected CompositeSerializer(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkNotNull(originalSerializers);
    +		this.originalSerializers = originalSerializers;
    +	}
    +
    +	protected abstract T composeValue(List values);
    +
    +	protected abstract List decomposeValue(T v);
    +
    +	protected abstract CompositeSerializer<T> createSerializerInstance(List<TypeSerializer> originalSerializers);
    +
    +	private T composeValueInternal(List values) {
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return composeValue(values);
    +	}
    +
    +	private List decomposeValueInternal(T v) {
    +		List values = decomposeValue(v);
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return values;
    +	}
    +
    +	private CompositeSerializer<T> createSerializerInstanceInternal(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size());
    +		return createSerializerInstance(originalSerializers);
    +	}
    +
    +	@Override
    +	public CompositeSerializer<T> duplicate() {
    +		return createSerializerInstanceInternal(originalSerializers.stream()
    +			.map(TypeSerializer::duplicate)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public boolean isImmutableType() {
    +		return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
    +	}
    +
    +	@Override
    +	public T createInstance() {
    +		return composeValueInternal(originalSerializers.stream()
    +			.map(TypeSerializer::createInstance)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from) {
    +		List originalValues = decomposeValueInternal(from);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i)))
    +				.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from, T reuse) {
    +		List originalFromValues = decomposeValueInternal(from);
    +		List originalReuseValues = decomposeValueInternal(reuse);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i)))
    +				.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public int getLength() {
    +		return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ?
    +			originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
    +	}
    +
    +	@Override
    +	public void serialize(T record, DataOutputView target) throws IOException {
    +		List originalValues = decomposeValueInternal(record);
    +		for (int i = 0; i < originalSerializers.size(); i++) {
    +			originalSerializers.get(i).serialize(originalValues.get(i), target);
    +		}
    +	}
    +
    +	@Override
    +	public T deserialize(DataInputView source) throws IOException {
    +		List originalValues = new ArrayList();
    +		for (TypeSerializer typeSerializer : originalSerializers) {
    +			originalValues.add(typeSerializer.deserialize(source));
    +		}
    +		return composeValueInternal(originalValues);
    +	}
    +
    +	@Override
    +	public T deserialize(T reuse, DataInputView source) throws IOException {
    +		List originalValues = new ArrayList();
    +		List originalReuseValues = decomposeValueInternal(reuse);
    +		for (int i = 0; i < originalSerializers.size(); i++) {
    +			originalValues.add(originalSerializers.get(i).deserialize(originalReuseValues.get(i), source));
    +		}
    +		return composeValueInternal(originalValues);
    +	}
    +
    +	@Override
    +	public void copy(DataInputView source, DataOutputView target) throws IOException {
    +		for (TypeSerializer typeSerializer : originalSerializers) {
    +			typeSerializer.copy(source, target);
    +		}
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return originalSerializers.hashCode();
    +	}
    +
    +	@Override
    +	public boolean equals(Object obj) {
    +		if (obj instanceof CompositeSerializer) {
    +			CompositeSerializer<?> other = (CompositeSerializer<?>) obj;
    +			return other.canEqual(this) && originalSerializers.equals(other.originalSerializers);
    +		} else {
    +			return false;
    +		}
    +	}
    +
    +	@Override
    +	public boolean canEqual(Object obj) {
    +		return obj instanceof CompositeSerializer;
    +	}
    +
    +	@Override
    +	public TypeSerializerConfigSnapshot snapshotConfiguration() {
    +		return new CompositeTypeSerializerConfigSnapshot(originalSerializers.toArray(new TypeSerializer[]{ })) {
    +			@Override
    +			public int getVersion() {
    +				return 0;
    +			}
    +		};
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
    +		if (configSnapshot instanceof CompositeTypeSerializerConfigSnapshot) {
    +			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs =
    +				((CompositeTypeSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
    +
    +			if (previousSerializersAndConfigs.size() == originalSerializers.size()) {
    +
    +				List<TypeSerializer> convertSerializers = new ArrayList<>();
    +				boolean requiresMigration = false;
    +				CompatibilityResult<Object> compatResult;
    +				int i = 0;
    +				for (Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> f : previousSerializersAndConfigs) {
    +					compatResult = CompatibilityUtil.resolveCompatibilityResult(
    +						f.f0,
    +						UnloadableDummyTypeSerializer.class,
    +						f.f1,
    +						originalSerializers.get(i));
    --- End diff --
    
    Same here.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r198014886
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,204 @@
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a list of objects
    + *
    + * @param <T> type of custom serialized value
    + */
    +@SuppressWarnings("unchecked")
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private final List<TypeSerializer> originalSerializers;
    +
    +	protected CompositeSerializer(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkNotNull(originalSerializers);
    +		this.originalSerializers = originalSerializers;
    +	}
    +
    +	protected abstract T composeValue(List values);
    +
    +	protected abstract List decomposeValue(T v);
    +
    +	protected abstract CompositeSerializer<T> createSerializerInstance(List<TypeSerializer> originalSerializers);
    +
    +	private T composeValueInternal(List values) {
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return composeValue(values);
    +	}
    +
    +	private List decomposeValueInternal(T v) {
    +		List values = decomposeValue(v);
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return values;
    +	}
    +
    +	private CompositeSerializer<T> createSerializerInstanceInternal(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size());
    +		return createSerializerInstance(originalSerializers);
    +	}
    +
    +	@Override
    +	public CompositeSerializer<T> duplicate() {
    +		return createSerializerInstanceInternal(originalSerializers.stream()
    +			.map(TypeSerializer::duplicate)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public boolean isImmutableType() {
    +		return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
    +	}
    +
    +	@Override
    +	public T createInstance() {
    +		return composeValueInternal(originalSerializers.stream()
    +			.map(TypeSerializer::createInstance)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from) {
    +		List originalValues = decomposeValueInternal(from);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i)))
    +				.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from, T reuse) {
    +		List originalFromValues = decomposeValueInternal(from);
    +		List originalReuseValues = decomposeValueInternal(reuse);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i)))
    +				.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public int getLength() {
    +		return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ?
    +			originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
    +	}
    +
    +	@Override
    +	public void serialize(T record, DataOutputView target) throws IOException {
    +		List originalValues = decomposeValueInternal(record);
    +		for (int i = 0; i < originalSerializers.size(); i++) {
    +			originalSerializers.get(i).serialize(originalValues.get(i), target);
    --- End diff --
    
    If `originalSerializers` or the `originalValues` is a type of something like `LinkedList`, then the `originalSerializers.get(i)` and `originalValues.get(i)` will get a very poor performance. I think we might should use the `iterator` here.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r197329533
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,204 @@
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a list of objects
    + *
    + * @param <T> type of custom serialized value
    + */
    +@SuppressWarnings("unchecked")
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private final List<TypeSerializer> originalSerializers;
    +
    +	protected CompositeSerializer(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkNotNull(originalSerializers);
    +		this.originalSerializers = originalSerializers;
    +	}
    +
    +	protected abstract T composeValue(List values);
    +
    +	protected abstract List decomposeValue(T v);
    +
    +	protected abstract CompositeSerializer<T> createSerializerInstance(List<TypeSerializer> originalSerializers);
    +
    +	private T composeValueInternal(List values) {
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return composeValue(values);
    +	}
    +
    +	private List decomposeValueInternal(T v) {
    +		List values = decomposeValue(v);
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return values;
    +	}
    +
    +	private CompositeSerializer<T> createSerializerInstanceInternal(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size());
    --- End diff --
    
    I think this check looks like a bug.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r198062463
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +public class TtlStateFactory {
    +	public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc,
    +		KeyedStateFactory originalStateFactory,
    +		TtlConfig ttlConfig,
    +		TtlTimeProvider timeProvider) throws Exception {
    +		return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
    +			originalStateFactory.createState(namespaceSerializer, stateDesc) :
    +			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
    +				.createState(namespaceSerializer, stateDesc);
    +	}
    +
    +	private final Map<Class<? extends StateDescriptor>, StateFactory> stateFactories;
    +
    +	private final KeyedStateFactory originalStateFactory;
    +	private final TtlConfig ttlConfig;
    +	private final TtlTimeProvider timeProvider;
    +
    +	private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
    +		this.originalStateFactory = originalStateFactory;
    +		this.ttlConfig = ttlConfig;
    +		this.timeProvider = timeProvider;
    +		this.stateFactories = createStateFactories();
    +	}
    +
    +	private Map<Class<? extends StateDescriptor>, StateFactory> createStateFactories() {
    +		return Stream.of(
    +			Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState),
    +			Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState),
    +			Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState),
    +			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState),
    +			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState),
    +			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState)
    +		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
    +	}
    +
    +	private interface StateFactory {
    +		<N, SV, S extends State, IS extends S> IS create(
    +			TypeSerializer<N> namespaceSerializer,
    +			StateDescriptor<S, SV> stateDesc) throws Exception;
    +	}
    +
    +	private <N, SV, S extends State, IS extends S> IS createState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		StateFactory stateFactory = stateFactories.get(stateDesc.getClass());
    +		if (stateFactory == null) {
    +			String message = String.format("State %s is not supported by %s",
    +				stateDesc.getClass(), TtlStateFactory.class);
    +			throw new FlinkRuntimeException(message);
    +		}
    +		return stateFactory.create(namespaceSerializer, stateDesc);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <N, SV, S extends State, IS extends S> IS createValueState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		SV defVal = stateDesc.getDefaultValue();
    +		TtlValue<SV> ttlDefVal = defVal == null ? null : new TtlValue<>(defVal, Long.MAX_VALUE);
    +		ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
    +			stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()), ttlDefVal);
    +		return (IS) new TtlValueState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <T, N, SV, S extends State, IS extends S> IS createListState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;
    +		ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
    +			stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));
    +		return (IS) new TtlListState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, listStateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <UK, UV, N, SV, S extends State, IS extends S> IS createMapState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc;
    +		MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(
    +			stateDesc.getName(),
    +			mapStateDesc.getKeySerializer(),
    +			new TtlSerializer<>(mapStateDesc.getValueSerializer()));
    +		return (IS) new TtlMapState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, mapStateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <N, SV, S extends State, IS extends S> IS createReducingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		ReducingStateDescriptor<SV> reducingStateDesc = (ReducingStateDescriptor<SV>) stateDesc;
    +		ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor = new ReducingStateDescriptor<>(
    +			stateDesc.getName(),
    +			new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),
    +			new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlReducingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <IN, OUT, N, SV, S extends State, IS extends S> IS createAggregatingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		AggregatingStateDescriptor<IN, SV, OUT> aggregatingStateDescriptor =
    +			(AggregatingStateDescriptor<IN, SV, OUT>) stateDesc;
    +		TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction = new TtlAggregateFunction<>(
    +			aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider);
    +		AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(
    +			stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlAggregatingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <T, N, SV, S extends State, IS extends S> IS createFoldingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		FoldingStateDescriptor<T, SV> foldingStateDescriptor = (FoldingStateDescriptor<T, SV>) stateDesc;
    --- End diff --
    
    Could suppress deprecation warning on the because we need to support it.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199894217
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -272,4 +254,60 @@ public int getVersion() {
     			previousSerializersAndConfigs.get(index).f0, UnloadableDummyTypeSerializer.class,
     			previousSerializersAndConfigs.get(index).f1, fieldSerializers[index]);
     	}
    +
    +	/** This class holds composite serializer parameters which can be precomputed in advanced for better performance. */
    +	protected static class PrecomputedParameters implements Serializable {
    --- End diff --
    
    If this is serializable, we should add a version uid.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r197331764
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +public class TtlStateFactory {
    +	public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc,
    +		KeyedStateFactory originalStateFactory,
    +		TtlConfig ttlConfig,
    +		TtlTimeProvider timeProvider) throws Exception {
    +		return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
    +			originalStateFactory.createState(namespaceSerializer, stateDesc) :
    +			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
    +				.createState(namespaceSerializer, stateDesc);
    +	}
    +
    +	private final Map<Class<? extends StateDescriptor>, StateFactory> stateFactories;
    +
    +	private final KeyedStateFactory originalStateFactory;
    +	private final TtlConfig ttlConfig;
    +	private final TtlTimeProvider timeProvider;
    +
    +	private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
    +		this.originalStateFactory = originalStateFactory;
    +		this.ttlConfig = ttlConfig;
    +		this.timeProvider = timeProvider;
    +		this.stateFactories = createStateFactories();
    +	}
    +
    +	private Map<Class<? extends StateDescriptor>, StateFactory> createStateFactories() {
    +		return Stream.of(
    +			Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState),
    +			Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState),
    +			Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState),
    +			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState),
    +			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState),
    +			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState)
    +		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
    +	}
    +
    +	private interface StateFactory {
    +		<N, SV, S extends State, IS extends S> IS create(
    +			TypeSerializer<N> namespaceSerializer,
    +			StateDescriptor<S, SV> stateDesc) throws Exception;
    +	}
    +
    +	private <N, SV, S extends State, IS extends S> IS createState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		StateFactory stateFactory = stateFactories.get(stateDesc.getClass());
    +		if (stateFactory == null) {
    +			String message = String.format("State %s is not supported by %s",
    +				stateDesc.getClass(), TtlStateFactory.class);
    +			throw new FlinkRuntimeException(message);
    +		}
    +		return stateFactory.create(namespaceSerializer, stateDesc);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <N, SV, S extends State, IS extends S> IS createValueState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		SV defVal = stateDesc.getDefaultValue();
    +		TtlValue<SV> ttlDefVal = defVal == null ? null : new TtlValue<>(defVal, Long.MAX_VALUE);
    +		ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
    +			stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()), ttlDefVal);
    +		return (IS) new TtlValueState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <T, N, SV, S extends State, IS extends S> IS createListState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;
    +		ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
    +			stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));
    +		return (IS) new TtlListState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, listStateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <UK, UV, N, SV, S extends State, IS extends S> IS createMapState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc;
    +		MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(
    +			stateDesc.getName(),
    +			mapStateDesc.getKeySerializer(),
    +			new TtlSerializer<>(mapStateDesc.getValueSerializer()));
    +		return (IS) new TtlMapState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, mapStateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <N, SV, S extends State, IS extends S> IS createReducingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		ReducingStateDescriptor<SV> reducingStateDesc = (ReducingStateDescriptor<SV>) stateDesc;
    +		ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor = new ReducingStateDescriptor<>(
    +			stateDesc.getName(),
    +			new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),
    +			new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlReducingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <IN, OUT, N, SV, S extends State, IS extends S> IS createAggregatingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		AggregatingStateDescriptor<IN, SV, OUT> aggregatingStateDescriptor =
    +			(AggregatingStateDescriptor<IN, SV, OUT>) stateDesc;
    +		TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction = new TtlAggregateFunction<>(
    +			aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider);
    +		AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(
    +			stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlAggregatingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <T, N, SV, S extends State, IS extends S> IS createFoldingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		FoldingStateDescriptor<T, SV> foldingStateDescriptor = (FoldingStateDescriptor<T, SV>) stateDesc;
    +		SV initAcc = stateDesc.getDefaultValue();
    +		TtlValue<SV> ttlInitAcc = initAcc == null ? null : new TtlValue<>(initAcc, Long.MAX_VALUE);
    +		FoldingStateDescriptor<T, TtlValue<SV>> ttlDescriptor = new FoldingStateDescriptor<>(
    +			stateDesc.getName(),
    +			ttlInitAcc,
    +			new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider),
    +			new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlFoldingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	private static class TtlSerializer<T> extends CompositeSerializer<TtlValue<T>> {
    +		TtlSerializer(TypeSerializer<T> userValueSerializer) {
    +			super(Arrays.asList(userValueSerializer, new LongSerializer()));
    +		}
    +
    +		@Override
    +		@SuppressWarnings("unchecked")
    +		protected TtlValue<T> composeValue(List values) {
    +			return new TtlValue<>((T) values.get(0), (Long) values.get(1));
    +		}
    +
    +		@Override
    +		protected List decomposeValue(TtlValue<T> v) {
    +			return Arrays.asList(v.getUserValue(), v.getExpirationTimestamp());
    +		}
    +
    +		@Override
    +		@SuppressWarnings("unchecked")
    +		protected CompositeSerializer<TtlValue<T>> createSerializerInstance(List<TypeSerializer> typeSerializers) {
    --- End diff --
    
    Should we check that `typeSerializers != null && typeSerializers.size() == 1`?


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r198113448
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,204 @@
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a list of objects
    + *
    + * @param <T> type of custom serialized value
    + */
    +@SuppressWarnings("unchecked")
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private final List<TypeSerializer> originalSerializers;
    +
    +	protected CompositeSerializer(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkNotNull(originalSerializers);
    +		this.originalSerializers = originalSerializers;
    +	}
    +
    +	protected abstract T composeValue(List values);
    --- End diff --
    
    Also here, list is just used as raw type. I think also the abstract methods could use some documentation for people that would like to create new subclasses in the future.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r198115667
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,204 @@
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a list of objects
    + *
    + * @param <T> type of custom serialized value
    + */
    +@SuppressWarnings("unchecked")
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private final List<TypeSerializer> originalSerializers;
    +
    +	protected CompositeSerializer(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkNotNull(originalSerializers);
    +		this.originalSerializers = originalSerializers;
    +	}
    +
    +	protected abstract T composeValue(List values);
    +
    +	protected abstract List decomposeValue(T v);
    +
    +	protected abstract CompositeSerializer<T> createSerializerInstance(List<TypeSerializer> originalSerializers);
    +
    +	private T composeValueInternal(List values) {
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return composeValue(values);
    +	}
    +
    +	private List decomposeValueInternal(T v) {
    +		List values = decomposeValue(v);
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return values;
    +	}
    +
    +	private CompositeSerializer<T> createSerializerInstanceInternal(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size());
    +		return createSerializerInstance(originalSerializers);
    +	}
    +
    +	@Override
    +	public CompositeSerializer<T> duplicate() {
    +		return createSerializerInstanceInternal(originalSerializers.stream()
    +			.map(TypeSerializer::duplicate)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public boolean isImmutableType() {
    +		return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
    +	}
    +
    +	@Override
    +	public T createInstance() {
    +		return composeValueInternal(originalSerializers.stream()
    +			.map(TypeSerializer::createInstance)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from) {
    +		List originalValues = decomposeValueInternal(from);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i)))
    --- End diff --
    
    I would also suggest to consider lower-level constructs for potentially hot methods like this one because this api can easily introduce (non obvious) performance regression.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r197330095
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,204 @@
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a list of objects
    + *
    + * @param <T> type of custom serialized value
    + */
    +@SuppressWarnings("unchecked")
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private final List<TypeSerializer> originalSerializers;
    +
    +	protected CompositeSerializer(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkNotNull(originalSerializers);
    +		this.originalSerializers = originalSerializers;
    +	}
    +
    +	protected abstract T composeValue(List values);
    +
    +	protected abstract List decomposeValue(T v);
    +
    +	protected abstract CompositeSerializer<T> createSerializerInstance(List<TypeSerializer> originalSerializers);
    +
    +	private T composeValueInternal(List values) {
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return composeValue(values);
    +	}
    +
    +	private List decomposeValueInternal(T v) {
    +		List values = decomposeValue(v);
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return values;
    +	}
    +
    +	private CompositeSerializer<T> createSerializerInstanceInternal(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size());
    +		return createSerializerInstance(originalSerializers);
    +	}
    +
    +	@Override
    +	public CompositeSerializer<T> duplicate() {
    +		return createSerializerInstanceInternal(originalSerializers.stream()
    +			.map(TypeSerializer::duplicate)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public boolean isImmutableType() {
    +		return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
    +	}
    +
    +	@Override
    +	public T createInstance() {
    +		return composeValueInternal(originalSerializers.stream()
    +			.map(TypeSerializer::createInstance)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from) {
    +		List originalValues = decomposeValueInternal(from);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i)))
    +				.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from, T reuse) {
    +		List originalFromValues = decomposeValueInternal(from);
    +		List originalReuseValues = decomposeValueInternal(reuse);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i)))
    +				.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public int getLength() {
    +		return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ?
    +			originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
    +	}
    +
    +	@Override
    +	public void serialize(T record, DataOutputView target) throws IOException {
    +		List originalValues = decomposeValueInternal(record);
    +		for (int i = 0; i < originalSerializers.size(); i++) {
    +			originalSerializers.get(i).serialize(originalValues.get(i), target);
    +		}
    +	}
    +
    +	@Override
    +	public T deserialize(DataInputView source) throws IOException {
    +		List originalValues = new ArrayList();
    +		for (TypeSerializer typeSerializer : originalSerializers) {
    +			originalValues.add(typeSerializer.deserialize(source));
    +		}
    +		return composeValueInternal(originalValues);
    +	}
    +
    +	@Override
    +	public T deserialize(T reuse, DataInputView source) throws IOException {
    +		List originalValues = new ArrayList();
    --- End diff --
    
    Again, a init size for the List could be given.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r198015825
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,204 @@
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a list of objects
    + *
    + * @param <T> type of custom serialized value
    + */
    +@SuppressWarnings("unchecked")
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private final List<TypeSerializer> originalSerializers;
    +
    +	protected CompositeSerializer(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkNotNull(originalSerializers);
    +		this.originalSerializers = originalSerializers;
    +	}
    +
    +	protected abstract T composeValue(List values);
    +
    +	protected abstract List decomposeValue(T v);
    +
    +	protected abstract CompositeSerializer<T> createSerializerInstance(List<TypeSerializer> originalSerializers);
    +
    +	private T composeValueInternal(List values) {
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return composeValue(values);
    +	}
    +
    +	private List decomposeValueInternal(T v) {
    +		List values = decomposeValue(v);
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return values;
    +	}
    +
    +	private CompositeSerializer<T> createSerializerInstanceInternal(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size());
    +		return createSerializerInstance(originalSerializers);
    +	}
    +
    +	@Override
    +	public CompositeSerializer<T> duplicate() {
    +		return createSerializerInstanceInternal(originalSerializers.stream()
    +			.map(TypeSerializer::duplicate)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public boolean isImmutableType() {
    +		return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
    +	}
    +
    +	@Override
    +	public T createInstance() {
    +		return composeValueInternal(originalSerializers.stream()
    +			.map(TypeSerializer::createInstance)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from) {
    +		List originalValues = decomposeValueInternal(from);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i)))
    +				.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from, T reuse) {
    +		List originalFromValues = decomposeValueInternal(from);
    +		List originalReuseValues = decomposeValueInternal(reuse);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i)))
    +				.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public int getLength() {
    +		return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ?
    +			originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
    +	}
    +
    +	@Override
    +	public void serialize(T record, DataOutputView target) throws IOException {
    +		List originalValues = decomposeValueInternal(record);
    +		for (int i = 0; i < originalSerializers.size(); i++) {
    +			originalSerializers.get(i).serialize(originalValues.get(i), target);
    +		}
    +	}
    +
    +	@Override
    +	public T deserialize(DataInputView source) throws IOException {
    +		List originalValues = new ArrayList();
    +		for (TypeSerializer typeSerializer : originalSerializers) {
    +			originalValues.add(typeSerializer.deserialize(source));
    +		}
    +		return composeValueInternal(originalValues);
    +	}
    +
    +	@Override
    +	public T deserialize(T reuse, DataInputView source) throws IOException {
    +		List originalValues = new ArrayList();
    +		List originalReuseValues = decomposeValueInternal(reuse);
    +		for (int i = 0; i < originalSerializers.size(); i++) {
    +			originalValues.add(originalSerializers.get(i).deserialize(originalReuseValues.get(i), source));
    --- End diff --
    
    Same here.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199231639
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +public class TtlStateFactory {
    +	public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc,
    +		KeyedStateFactory originalStateFactory,
    +		TtlConfig ttlConfig,
    +		TtlTimeProvider timeProvider) throws Exception {
    +		return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
    +			originalStateFactory.createState(namespaceSerializer, stateDesc) :
    +			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
    +				.createState(namespaceSerializer, stateDesc);
    +	}
    +
    +	private final Map<Class<? extends StateDescriptor>, StateFactory> stateFactories;
    +
    +	private final KeyedStateFactory originalStateFactory;
    +	private final TtlConfig ttlConfig;
    +	private final TtlTimeProvider timeProvider;
    +
    +	private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
    --- End diff --
    
    I added checks in `createStateAndWrapWithTtlIfEnabled`, the only user of this private constructor.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199782860
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,275 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a composite type using array of its field serializers.
    + * Fields are indexed the same way as their serializers.
    + *
    + * @param <T> type of custom serialized value
    + */
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private static final long serialVersionUID = 1L;
    +
    +	/** Serializers for fields which constitute T. */
    +	protected final TypeSerializer<Object>[] fieldSerializers;
    +
    +	/** Whether T is an immutable type. */
    +	final boolean immutableTargetType;
    +
    +	/** Byte length of target object in serialized form. */
    +	private final int length;
    +
    +	/** Whether any field serializer is stateful. */
    +	private final boolean stateful;
    +
    +	private final int hashCode;
    +
    +	@SuppressWarnings("unchecked")
    +	protected CompositeSerializer(boolean immutableTargetType, TypeSerializer<?> ... fieldSerializers) {
    +		Preconditions.checkNotNull(fieldSerializers);
    +		Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
    +		this.immutableTargetType = immutableTargetType &&
    +			Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType);
    +		this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
    +		this.length = calcLength();
    +		this.stateful = isStateful();
    +		this.hashCode = Arrays.hashCode(fieldSerializers);
    +	}
    +
    +	private boolean isStateful() {
    +		TypeSerializer[] duplicatedSerializers = duplicateFieldSerializers();
    --- End diff --
    
    The flag for `isStateful()` is the only one that I suggested as a candidate for lazy init when `duplicate()` is called for the first time. Reason is that duplicating some types of inner serializers can sometimes be a bit expensive. But again, I feel that this can also be changed in followup work, if needed.


---

[GitHub] flink issue #6196: [FLINK-9513] Implement TTL state wrappers factory and ser...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on the issue:

    https://github.com/apache/flink/pull/6196
  
    @StefanRRichter @sihuazhou 
    Thanks guys for the helpful review. 
    I refactored the `CompositeSerializer` to rely rather on loops than streams and added tests for it.
    Please, have a look.
    
    cc @twalthr 
    Maybe, the implementation of `CompositeSerializer` in this PR could be potentially interesting for other complex types, e.g. tuples or pojos allowing code reuse where it is possible without performance loss.


---

[GitHub] flink issue #6196: [FLINK-9513] Implement TTL state wrappers factory and ser...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6196
  
    Thanks for the nice contribution. I had some comments inline. In particular three points about the serializer. I would suggest to avoid the use of raw types. I would also suggest to avoid using streaming API at least methods that can appear in hot loops (mainly copy, de/serialize) for performance reasons. I think the imperative style code will not even be (much) longer in those cases. Last, I suggest to always test new serializers via the `SerializerTestBase` because this catches many problems with little effort.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r198060715
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +public class TtlStateFactory {
    +	public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc,
    +		KeyedStateFactory originalStateFactory,
    +		TtlConfig ttlConfig,
    +		TtlTimeProvider timeProvider) throws Exception {
    +		return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
    +			originalStateFactory.createState(namespaceSerializer, stateDesc) :
    +			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
    +				.createState(namespaceSerializer, stateDesc);
    +	}
    +
    +	private final Map<Class<? extends StateDescriptor>, StateFactory> stateFactories;
    +
    +	private final KeyedStateFactory originalStateFactory;
    +	private final TtlConfig ttlConfig;
    +	private final TtlTimeProvider timeProvider;
    +
    +	private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
    +		this.originalStateFactory = originalStateFactory;
    +		this.ttlConfig = ttlConfig;
    +		this.timeProvider = timeProvider;
    +		this.stateFactories = createStateFactories();
    +	}
    +
    +	private Map<Class<? extends StateDescriptor>, StateFactory> createStateFactories() {
    +		return Stream.of(
    +			Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState),
    +			Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState),
    +			Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState),
    +			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState),
    +			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState),
    +			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState)
    +		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
    +	}
    +
    +	private interface StateFactory {
    --- End diff --
    
    Do we really need this interface? It looks identical to `KeyedStateFactory` and seems that interface would fit here as well?


---

[GitHub] flink issue #6196: [FLINK-9513] Implement TTL state wrappers factory and ser...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6196
  
    I had a few more comments, in particular some improvements for the new serializer. I think when those are addressed this is good to merge.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199228508
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +public class TtlStateFactory {
    +	public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc,
    +		KeyedStateFactory originalStateFactory,
    +		TtlConfig ttlConfig,
    +		TtlTimeProvider timeProvider) throws Exception {
    +		return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
    +			originalStateFactory.createState(namespaceSerializer, stateDesc) :
    +			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
    +				.createState(namespaceSerializer, stateDesc);
    +	}
    +
    +	private final Map<Class<? extends StateDescriptor>, StateFactory> stateFactories;
    +
    +	private final KeyedStateFactory originalStateFactory;
    +	private final TtlConfig ttlConfig;
    +	private final TtlTimeProvider timeProvider;
    +
    +	private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
    +		this.originalStateFactory = originalStateFactory;
    +		this.ttlConfig = ttlConfig;
    +		this.timeProvider = timeProvider;
    +		this.stateFactories = createStateFactories();
    +	}
    +
    +	private Map<Class<? extends StateDescriptor>, StateFactory> createStateFactories() {
    +		return Stream.of(
    +			Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState),
    +			Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState),
    +			Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState),
    +			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState),
    +			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState),
    +			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState)
    +		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
    +	}
    +
    +	private interface StateFactory {
    +		<N, SV, S extends State, IS extends S> IS create(
    +			TypeSerializer<N> namespaceSerializer,
    +			StateDescriptor<S, SV> stateDesc) throws Exception;
    +	}
    +
    +	private <N, SV, S extends State, IS extends S> IS createState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		StateFactory stateFactory = stateFactories.get(stateDesc.getClass());
    +		if (stateFactory == null) {
    +			String message = String.format("State %s is not supported by %s",
    +				stateDesc.getClass(), TtlStateFactory.class);
    +			throw new FlinkRuntimeException(message);
    +		}
    +		return stateFactory.create(namespaceSerializer, stateDesc);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <N, SV, S extends State, IS extends S> IS createValueState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		SV defVal = stateDesc.getDefaultValue();
    +		TtlValue<SV> ttlDefVal = defVal == null ? null : new TtlValue<>(defVal, Long.MAX_VALUE);
    +		ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
    +			stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()), ttlDefVal);
    +		return (IS) new TtlValueState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <T, N, SV, S extends State, IS extends S> IS createListState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;
    +		ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
    +			stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));
    +		return (IS) new TtlListState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, listStateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <UK, UV, N, SV, S extends State, IS extends S> IS createMapState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc;
    +		MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(
    +			stateDesc.getName(),
    +			mapStateDesc.getKeySerializer(),
    +			new TtlSerializer<>(mapStateDesc.getValueSerializer()));
    +		return (IS) new TtlMapState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, mapStateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <N, SV, S extends State, IS extends S> IS createReducingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		ReducingStateDescriptor<SV> reducingStateDesc = (ReducingStateDescriptor<SV>) stateDesc;
    +		ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor = new ReducingStateDescriptor<>(
    +			stateDesc.getName(),
    +			new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),
    +			new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlReducingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <IN, OUT, N, SV, S extends State, IS extends S> IS createAggregatingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		AggregatingStateDescriptor<IN, SV, OUT> aggregatingStateDescriptor =
    +			(AggregatingStateDescriptor<IN, SV, OUT>) stateDesc;
    +		TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction = new TtlAggregateFunction<>(
    +			aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider);
    +		AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(
    +			stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlAggregatingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <T, N, SV, S extends State, IS extends S> IS createFoldingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		FoldingStateDescriptor<T, SV> foldingStateDescriptor = (FoldingStateDescriptor<T, SV>) stateDesc;
    +		SV initAcc = stateDesc.getDefaultValue();
    +		TtlValue<SV> ttlInitAcc = initAcc == null ? null : new TtlValue<>(initAcc, Long.MAX_VALUE);
    +		FoldingStateDescriptor<T, TtlValue<SV>> ttlDescriptor = new FoldingStateDescriptor<>(
    +			stateDesc.getName(),
    +			ttlInitAcc,
    +			new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider),
    +			new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlFoldingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	private static class TtlSerializer<T> extends CompositeSerializer<TtlValue<T>> {
    +		TtlSerializer(TypeSerializer<T> userValueSerializer) {
    +			super(Arrays.asList(userValueSerializer, new LongSerializer()));
    +		}
    +
    +		@Override
    +		@SuppressWarnings("unchecked")
    +		protected TtlValue<T> composeValue(List values) {
    --- End diff --
    
    I think we can avoid checks in hot paths for protected methods and rely on code sanity of abstract class which is the only user of them.


---

[GitHub] flink issue #6196: [FLINK-9513] Implement TTL state wrappers factory and ser...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6196
  
    LGTM 👍 merging.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199783587
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,275 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a composite type using array of its field serializers.
    + * Fields are indexed the same way as their serializers.
    + *
    + * @param <T> type of custom serialized value
    + */
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private static final long serialVersionUID = 1L;
    +
    +	/** Serializers for fields which constitute T. */
    +	protected final TypeSerializer<Object>[] fieldSerializers;
    +
    +	/** Whether T is an immutable type. */
    +	final boolean immutableTargetType;
    +
    +	/** Byte length of target object in serialized form. */
    +	private final int length;
    +
    +	/** Whether any field serializer is stateful. */
    +	private final boolean stateful;
    +
    +	private final int hashCode;
    +
    +	@SuppressWarnings("unchecked")
    +	protected CompositeSerializer(boolean immutableTargetType, TypeSerializer<?> ... fieldSerializers) {
    +		Preconditions.checkNotNull(fieldSerializers);
    +		Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
    +		this.immutableTargetType = immutableTargetType &&
    +			Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType);
    +		this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
    +		this.length = calcLength();
    +		this.stateful = isStateful();
    +		this.hashCode = Arrays.hashCode(fieldSerializers);
    +	}
    +
    +	private boolean isStateful() {
    +		TypeSerializer[] duplicatedSerializers = duplicateFieldSerializers();
    +		return IntStream.range(0, fieldSerializers.length)
    +			.anyMatch(i -> fieldSerializers[i] != duplicatedSerializers[i]);
    +	}
    +
    +	/** Create new instance from its fields.  */
    +	public abstract T createInstance(@Nonnull Object ... values);
    +
    +	/** Modify field of existing instance. Supported only by mutable types. */
    +	protected abstract void setField(@Nonnull T value, int index, Object fieldValue);
    +
    +	/** Get field of existing instance. */
    +	protected abstract Object getField(@Nonnull T value, int index);
    +
    +	/** Factory for concrete serializer. */
    +	protected abstract CompositeSerializer<T> createSerializerInstance(TypeSerializer<?> ... originalSerializers);
    +
    +	@Override
    +	public CompositeSerializer<T> duplicate() {
    +		return stateful ? createSerializerInstance(duplicateFieldSerializers()) : this;
    --- End diff --
    
    Another small point here for `createSerializerInstance(...)`: we have no (non-public) constructor that can also take all boolean flags, length, and (maybe) hash directly. So if we copy the serializer, I guess it always goes through the whole process again to figure this out, but we could just copy it from the previous instance.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r198015385
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,204 @@
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a list of objects
    + *
    + * @param <T> type of custom serialized value
    + */
    +@SuppressWarnings("unchecked")
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private final List<TypeSerializer> originalSerializers;
    +
    +	protected CompositeSerializer(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkNotNull(originalSerializers);
    +		this.originalSerializers = originalSerializers;
    +	}
    +
    +	protected abstract T composeValue(List values);
    +
    +	protected abstract List decomposeValue(T v);
    +
    +	protected abstract CompositeSerializer<T> createSerializerInstance(List<TypeSerializer> originalSerializers);
    +
    +	private T composeValueInternal(List values) {
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return composeValue(values);
    +	}
    +
    +	private List decomposeValueInternal(T v) {
    +		List values = decomposeValue(v);
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return values;
    +	}
    +
    +	private CompositeSerializer<T> createSerializerInstanceInternal(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size());
    +		return createSerializerInstance(originalSerializers);
    +	}
    +
    +	@Override
    +	public CompositeSerializer<T> duplicate() {
    +		return createSerializerInstanceInternal(originalSerializers.stream()
    +			.map(TypeSerializer::duplicate)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public boolean isImmutableType() {
    +		return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
    +	}
    +
    +	@Override
    +	public T createInstance() {
    +		return composeValueInternal(originalSerializers.stream()
    +			.map(TypeSerializer::createInstance)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from) {
    +		List originalValues = decomposeValueInternal(from);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i)))
    --- End diff --
    
    Same here might result in a bad performance because of the random access of the list.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r197330813
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---
    @@ -48,7 +48,8 @@
     	KeyedStateBackend<K>,
     	Snapshotable<SnapshotResult<KeyedStateHandle>, Collection<KeyedStateHandle>>,
     	Closeable,
    -	CheckpointListener {
    +	CheckpointListener,
    +	KeyedStateFactory{
    --- End diff --
    
    I think this seems to miss a space ` `.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r197331898
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +public class TtlStateFactory {
    +	public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc,
    +		KeyedStateFactory originalStateFactory,
    +		TtlConfig ttlConfig,
    +		TtlTimeProvider timeProvider) throws Exception {
    +		return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
    +			originalStateFactory.createState(namespaceSerializer, stateDesc) :
    +			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
    +				.createState(namespaceSerializer, stateDesc);
    +	}
    +
    +	private final Map<Class<? extends StateDescriptor>, StateFactory> stateFactories;
    +
    +	private final KeyedStateFactory originalStateFactory;
    +	private final TtlConfig ttlConfig;
    +	private final TtlTimeProvider timeProvider;
    +
    +	private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
    +		this.originalStateFactory = originalStateFactory;
    +		this.ttlConfig = ttlConfig;
    +		this.timeProvider = timeProvider;
    +		this.stateFactories = createStateFactories();
    +	}
    +
    +	private Map<Class<? extends StateDescriptor>, StateFactory> createStateFactories() {
    +		return Stream.of(
    +			Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState),
    +			Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState),
    +			Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState),
    +			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState),
    +			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState),
    +			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState)
    +		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
    +	}
    +
    +	private interface StateFactory {
    +		<N, SV, S extends State, IS extends S> IS create(
    +			TypeSerializer<N> namespaceSerializer,
    +			StateDescriptor<S, SV> stateDesc) throws Exception;
    +	}
    +
    +	private <N, SV, S extends State, IS extends S> IS createState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		StateFactory stateFactory = stateFactories.get(stateDesc.getClass());
    +		if (stateFactory == null) {
    +			String message = String.format("State %s is not supported by %s",
    +				stateDesc.getClass(), TtlStateFactory.class);
    +			throw new FlinkRuntimeException(message);
    +		}
    +		return stateFactory.create(namespaceSerializer, stateDesc);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <N, SV, S extends State, IS extends S> IS createValueState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		SV defVal = stateDesc.getDefaultValue();
    +		TtlValue<SV> ttlDefVal = defVal == null ? null : new TtlValue<>(defVal, Long.MAX_VALUE);
    +		ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
    +			stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()), ttlDefVal);
    +		return (IS) new TtlValueState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <T, N, SV, S extends State, IS extends S> IS createListState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;
    +		ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
    +			stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));
    +		return (IS) new TtlListState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, listStateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <UK, UV, N, SV, S extends State, IS extends S> IS createMapState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc;
    +		MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(
    +			stateDesc.getName(),
    +			mapStateDesc.getKeySerializer(),
    +			new TtlSerializer<>(mapStateDesc.getValueSerializer()));
    +		return (IS) new TtlMapState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, mapStateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <N, SV, S extends State, IS extends S> IS createReducingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		ReducingStateDescriptor<SV> reducingStateDesc = (ReducingStateDescriptor<SV>) stateDesc;
    +		ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor = new ReducingStateDescriptor<>(
    +			stateDesc.getName(),
    +			new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),
    +			new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlReducingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <IN, OUT, N, SV, S extends State, IS extends S> IS createAggregatingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		AggregatingStateDescriptor<IN, SV, OUT> aggregatingStateDescriptor =
    +			(AggregatingStateDescriptor<IN, SV, OUT>) stateDesc;
    +		TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction = new TtlAggregateFunction<>(
    +			aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider);
    +		AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(
    +			stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlAggregatingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <T, N, SV, S extends State, IS extends S> IS createFoldingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		FoldingStateDescriptor<T, SV> foldingStateDescriptor = (FoldingStateDescriptor<T, SV>) stateDesc;
    +		SV initAcc = stateDesc.getDefaultValue();
    +		TtlValue<SV> ttlInitAcc = initAcc == null ? null : new TtlValue<>(initAcc, Long.MAX_VALUE);
    +		FoldingStateDescriptor<T, TtlValue<SV>> ttlDescriptor = new FoldingStateDescriptor<>(
    +			stateDesc.getName(),
    +			ttlInitAcc,
    +			new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider),
    +			new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlFoldingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	private static class TtlSerializer<T> extends CompositeSerializer<TtlValue<T>> {
    +		TtlSerializer(TypeSerializer<T> userValueSerializer) {
    --- End diff --
    
    The `userValueSerializer` seems can't be null?


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r198115714
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,204 @@
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a list of objects
    + *
    + * @param <T> type of custom serialized value
    + */
    +@SuppressWarnings("unchecked")
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private final List<TypeSerializer> originalSerializers;
    +
    +	protected CompositeSerializer(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkNotNull(originalSerializers);
    +		this.originalSerializers = originalSerializers;
    +	}
    +
    +	protected abstract T composeValue(List values);
    +
    +	protected abstract List decomposeValue(T v);
    +
    +	protected abstract CompositeSerializer<T> createSerializerInstance(List<TypeSerializer> originalSerializers);
    +
    +	private T composeValueInternal(List values) {
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return composeValue(values);
    +	}
    +
    +	private List decomposeValueInternal(T v) {
    +		List values = decomposeValue(v);
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return values;
    +	}
    +
    +	private CompositeSerializer<T> createSerializerInstanceInternal(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size());
    +		return createSerializerInstance(originalSerializers);
    +	}
    +
    +	@Override
    +	public CompositeSerializer<T> duplicate() {
    +		return createSerializerInstanceInternal(originalSerializers.stream()
    +			.map(TypeSerializer::duplicate)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public boolean isImmutableType() {
    +		return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
    +	}
    +
    +	@Override
    +	public T createInstance() {
    +		return composeValueInternal(originalSerializers.stream()
    +			.map(TypeSerializer::createInstance)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from) {
    +		List originalValues = decomposeValueInternal(from);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i)))
    +				.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from, T reuse) {
    +		List originalFromValues = decomposeValueInternal(from);
    +		List originalReuseValues = decomposeValueInternal(reuse);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i)))
    --- End diff --
    
    +1


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199485075
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a composite type using array of its field serializers.
    + * Fields are indexed the same way as their serializers.
    + *
    + * @param <T> type of custom serialized value
    + */
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private static final long serialVersionUID = 1L;
    +
    +	protected final TypeSerializer<Object>[] fieldSerializers;
    +	final boolean isImmutableTargetType;
    --- End diff --
    
    I think in Java code style, a boolean field name should not be prefixed with `is...`, only the getter should be prefixed with `is...`


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r197331211
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +public class TtlStateFactory {
    +	public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc,
    +		KeyedStateFactory originalStateFactory,
    +		TtlConfig ttlConfig,
    +		TtlTimeProvider timeProvider) throws Exception {
    +		return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
    +			originalStateFactory.createState(namespaceSerializer, stateDesc) :
    +			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
    +				.createState(namespaceSerializer, stateDesc);
    +	}
    +
    +	private final Map<Class<? extends StateDescriptor>, StateFactory> stateFactories;
    --- End diff --
    
    Why this couldn't be static?


---

[GitHub] flink issue #6196: [FLINK-9513] Implement TTL state wrappers factory and ser...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6196
  
    I found there is still a small issue with the equals/hashCode but will just fix it before merging.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199477483
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +@SuppressWarnings("unchecked")
    +public class TtlStateFactory {
    --- End diff --
    
    It might make sense to also have a test when we introduce a new class that provides some kind of "service", e.g. to check that all the types are correctly mapped and also prevent that somebody breaks the mapping by accident.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r198114853
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,204 @@
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a list of objects
    + *
    + * @param <T> type of custom serialized value
    + */
    +@SuppressWarnings("unchecked")
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    --- End diff --
    
    You could simply have tests for all the new serializer by extending `SerializerTestBase`, which I would recommend to do to catch mistakes.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199474421
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a composite type using array of its field serializers.
    + * Fields are indexed the same way as their serializers.
    + *
    + * @param <T> type of custom serialized value
    + */
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private static final long serialVersionUID = 1L;
    +
    +	protected final TypeSerializer<Object>[] fieldSerializers;
    +	final boolean isImmutableTargetType;
    +	private final int length;
    +
    +	@SuppressWarnings("unchecked")
    +	protected CompositeSerializer(boolean isImmutableTargetType, TypeSerializer<?> ... fieldSerializers) {
    +		Preconditions.checkNotNull(fieldSerializers);
    +		Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
    +		this.isImmutableTargetType = isImmutableTargetType;
    +		this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
    +		this.length = calcLength();
    +	}
    +
    +	/** Create new instance from its fields.  */
    +	public abstract T createInstance(@Nonnull Object ... values);
    +
    +	/** Modify field of existing instance. Supported only by mutable types. */
    +	protected abstract void setField(@Nonnull T value, int index, Object fieldValue);
    +
    +	/** Get field of existing instance. */
    +	protected abstract Object getField(@Nonnull T value, int index);
    +
    +	/** Factory for concrete serializer. */
    +	protected abstract CompositeSerializer<T> createSerializerInstance(TypeSerializer<?> ... originalSerializers);
    +
    +	@Override
    +	public CompositeSerializer<T> duplicate() {
    +		TypeSerializer[] duplicatedSerializers = new TypeSerializer[fieldSerializers.length];
    +		boolean stateful = false;
    +		for (int index = 0; index < fieldSerializers.length; index++) {
    +			duplicatedSerializers[index] = fieldSerializers[index].duplicate();
    +			if (fieldSerializers[index] != duplicatedSerializers[index]) {
    +				stateful = true;
    +			}
    +		}
    +		return stateful ? createSerializerInstance(duplicatedSerializers) : this;
    +	}
    +
    +	@Override
    +	public boolean isImmutableType() {
    +		for (TypeSerializer<Object> fieldSerializer : fieldSerializers) {
    +			if (!fieldSerializer.isImmutableType()) {
    +				return false;
    +			}
    +		}
    +		return isImmutableTargetType;
    +	}
    +
    +	@Override
    +	public T createInstance() {
    +		Object[] fields = new Object[fieldSerializers.length];
    +		for (int index = 0; index < fieldSerializers.length; index++) {
    +			fields[index] = fieldSerializers[index].createInstance();
    +		}
    +		return createInstance(fields);
    +	}
    +
    +	@Override
    +	public T copy(T from) {
    +		Preconditions.checkNotNull(from);
    +		Object[] fields = new Object[fieldSerializers.length];
    +		for (int index = 0; index < fieldSerializers.length; index++) {
    +			fields[index] = fieldSerializers[index].copy(getField(from, index));
    +		}
    +		return createInstance(fields);
    --- End diff --
    
    This method can return `from` if `isImmutableType()`


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199226982
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +public class TtlStateFactory {
    +	public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc,
    +		KeyedStateFactory originalStateFactory,
    +		TtlConfig ttlConfig,
    +		TtlTimeProvider timeProvider) throws Exception {
    +		return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
    +			originalStateFactory.createState(namespaceSerializer, stateDesc) :
    +			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
    +				.createState(namespaceSerializer, stateDesc);
    +	}
    +
    +	private final Map<Class<? extends StateDescriptor>, StateFactory> stateFactories;
    +
    +	private final KeyedStateFactory originalStateFactory;
    +	private final TtlConfig ttlConfig;
    +	private final TtlTimeProvider timeProvider;
    +
    +	private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
    +		this.originalStateFactory = originalStateFactory;
    +		this.ttlConfig = ttlConfig;
    +		this.timeProvider = timeProvider;
    +		this.stateFactories = createStateFactories();
    +	}
    +
    +	private Map<Class<? extends StateDescriptor>, StateFactory> createStateFactories() {
    +		return Stream.of(
    +			Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState),
    +			Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState),
    +			Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState),
    +			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState),
    +			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState),
    +			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState)
    +		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
    +	}
    +
    +	private interface StateFactory {
    --- End diff --
    
    Leftover from previous iteration 👍


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199894398
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -272,4 +254,60 @@ public int getVersion() {
     			previousSerializersAndConfigs.get(index).f0, UnloadableDummyTypeSerializer.class,
     			previousSerializersAndConfigs.get(index).f1, fieldSerializers[index]);
     	}
    +
    +	/** This class holds composite serializer parameters which can be precomputed in advanced for better performance. */
    +	protected static class PrecomputedParameters implements Serializable {
    +		/** Whether target type is immutable. */
    +		final boolean immutableTargetType;
    +
    +		/** Whether target type and its fields are immutable. */
    +		final boolean immutable;
    +
    +		/** Byte length of target object in serialized form. */
    +		private final int length;
    +
    +		/** Whether any field serializer is stateful. */
    +		final boolean stateful;
    +
    +		final int hashCode;
    --- End diff --
    
    I wonder if this should be `transient` in a serializable class, the hash code could be based on object identity.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r197498439
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +public class TtlStateFactory {
    +	public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc,
    +		KeyedStateFactory originalStateFactory,
    +		TtlConfig ttlConfig,
    +		TtlTimeProvider timeProvider) throws Exception {
    +		return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
    +			originalStateFactory.createState(namespaceSerializer, stateDesc) :
    +			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
    +				.createState(namespaceSerializer, stateDesc);
    +	}
    +
    +	private final Map<Class<? extends StateDescriptor>, StateFactory> stateFactories;
    --- End diff --
    
    It could be static. The idea was to create `TtlStateFactory` object for each call of `createStateAndWrapWithTtlIfEnabled`. I think creating of state object will not happen often, only at init. The object contains non static references to originalStateFactory, ttlConfig, timeProvider to make `createXxxState` methods less verbose. `stateFactories` contains reference to this object in `createStateFactories`, so it cannot be static unless everything is static and `createXxxState` are more verbose and accept additional parameters: originalStateFactory, ttlConfig, timeProvider.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r198058861
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +
    +/** This factory produces concrete state objects in backends. */
    +public interface KeyedStateFactory {
    --- End diff --
    
    I think this interface can go into the file of `KeyedStateBackend` because it belongs into that context.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6196


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r197330596
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,204 @@
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a list of objects
    + *
    + * @param <T> type of custom serialized value
    + */
    +@SuppressWarnings("unchecked")
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private final List<TypeSerializer> originalSerializers;
    +
    +	protected CompositeSerializer(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkNotNull(originalSerializers);
    +		this.originalSerializers = originalSerializers;
    +	}
    +
    +	protected abstract T composeValue(List values);
    +
    +	protected abstract List decomposeValue(T v);
    +
    +	protected abstract CompositeSerializer<T> createSerializerInstance(List<TypeSerializer> originalSerializers);
    +
    +	private T composeValueInternal(List values) {
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return composeValue(values);
    +	}
    +
    +	private List decomposeValueInternal(T v) {
    +		List values = decomposeValue(v);
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return values;
    +	}
    +
    +	private CompositeSerializer<T> createSerializerInstanceInternal(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size());
    +		return createSerializerInstance(originalSerializers);
    +	}
    +
    +	@Override
    +	public CompositeSerializer<T> duplicate() {
    +		return createSerializerInstanceInternal(originalSerializers.stream()
    +			.map(TypeSerializer::duplicate)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public boolean isImmutableType() {
    +		return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
    +	}
    +
    +	@Override
    +	public T createInstance() {
    +		return composeValueInternal(originalSerializers.stream()
    +			.map(TypeSerializer::createInstance)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from) {
    +		List originalValues = decomposeValueInternal(from);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i)))
    +				.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from, T reuse) {
    +		List originalFromValues = decomposeValueInternal(from);
    +		List originalReuseValues = decomposeValueInternal(reuse);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i)))
    +				.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public int getLength() {
    +		return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ?
    +			originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
    +	}
    +
    +	@Override
    +	public void serialize(T record, DataOutputView target) throws IOException {
    +		List originalValues = decomposeValueInternal(record);
    +		for (int i = 0; i < originalSerializers.size(); i++) {
    +			originalSerializers.get(i).serialize(originalValues.get(i), target);
    +		}
    +	}
    +
    +	@Override
    +	public T deserialize(DataInputView source) throws IOException {
    +		List originalValues = new ArrayList();
    +		for (TypeSerializer typeSerializer : originalSerializers) {
    +			originalValues.add(typeSerializer.deserialize(source));
    +		}
    +		return composeValueInternal(originalValues);
    +	}
    +
    +	@Override
    +	public T deserialize(T reuse, DataInputView source) throws IOException {
    +		List originalValues = new ArrayList();
    +		List originalReuseValues = decomposeValueInternal(reuse);
    +		for (int i = 0; i < originalSerializers.size(); i++) {
    +			originalValues.add(originalSerializers.get(i).deserialize(originalReuseValues.get(i), source));
    +		}
    +		return composeValueInternal(originalValues);
    +	}
    +
    +	@Override
    +	public void copy(DataInputView source, DataOutputView target) throws IOException {
    +		for (TypeSerializer typeSerializer : originalSerializers) {
    +			typeSerializer.copy(source, target);
    +		}
    +	}
    +
    +	@Override
    +	public int hashCode() {
    +		return originalSerializers.hashCode();
    +	}
    +
    +	@Override
    +	public boolean equals(Object obj) {
    +		if (obj instanceof CompositeSerializer) {
    +			CompositeSerializer<?> other = (CompositeSerializer<?>) obj;
    +			return other.canEqual(this) && originalSerializers.equals(other.originalSerializers);
    +		} else {
    +			return false;
    +		}
    +	}
    +
    +	@Override
    +	public boolean canEqual(Object obj) {
    +		return obj instanceof CompositeSerializer;
    +	}
    +
    +	@Override
    +	public TypeSerializerConfigSnapshot snapshotConfiguration() {
    +		return new CompositeTypeSerializerConfigSnapshot(originalSerializers.toArray(new TypeSerializer[]{ })) {
    +			@Override
    +			public int getVersion() {
    +				return 0;
    +			}
    +		};
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
    +		if (configSnapshot instanceof CompositeTypeSerializerConfigSnapshot) {
    +			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs =
    +				((CompositeTypeSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
    +
    +			if (previousSerializersAndConfigs.size() == originalSerializers.size()) {
    +
    +				List<TypeSerializer> convertSerializers = new ArrayList<>();
    --- End diff --
    
    Again, could give a init size for the list.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r198061894
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +public class TtlStateFactory {
    +	public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc,
    +		KeyedStateFactory originalStateFactory,
    +		TtlConfig ttlConfig,
    +		TtlTimeProvider timeProvider) throws Exception {
    +		return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
    +			originalStateFactory.createState(namespaceSerializer, stateDesc) :
    +			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
    +				.createState(namespaceSerializer, stateDesc);
    +	}
    +
    +	private final Map<Class<? extends StateDescriptor>, StateFactory> stateFactories;
    +
    +	private final KeyedStateFactory originalStateFactory;
    +	private final TtlConfig ttlConfig;
    +	private final TtlTimeProvider timeProvider;
    +
    +	private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
    +		this.originalStateFactory = originalStateFactory;
    +		this.ttlConfig = ttlConfig;
    +		this.timeProvider = timeProvider;
    +		this.stateFactories = createStateFactories();
    +	}
    +
    +	private Map<Class<? extends StateDescriptor>, StateFactory> createStateFactories() {
    +		return Stream.of(
    +			Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState),
    +			Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState),
    +			Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState),
    +			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState),
    +			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState),
    +			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState)
    +		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
    +	}
    +
    +	private interface StateFactory {
    +		<N, SV, S extends State, IS extends S> IS create(
    +			TypeSerializer<N> namespaceSerializer,
    +			StateDescriptor<S, SV> stateDesc) throws Exception;
    +	}
    +
    +	private <N, SV, S extends State, IS extends S> IS createState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		StateFactory stateFactory = stateFactories.get(stateDesc.getClass());
    +		if (stateFactory == null) {
    +			String message = String.format("State %s is not supported by %s",
    +				stateDesc.getClass(), TtlStateFactory.class);
    +			throw new FlinkRuntimeException(message);
    +		}
    +		return stateFactory.create(namespaceSerializer, stateDesc);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <N, SV, S extends State, IS extends S> IS createValueState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		SV defVal = stateDesc.getDefaultValue();
    +		TtlValue<SV> ttlDefVal = defVal == null ? null : new TtlValue<>(defVal, Long.MAX_VALUE);
    +		ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
    --- End diff --
    
    Why are we using the deprecated constructor instead of following the suggested replacement of handling `null` values properly ourselves?


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r197331741
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +public class TtlStateFactory {
    +	public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc,
    +		KeyedStateFactory originalStateFactory,
    +		TtlConfig ttlConfig,
    +		TtlTimeProvider timeProvider) throws Exception {
    +		return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
    +			originalStateFactory.createState(namespaceSerializer, stateDesc) :
    +			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
    +				.createState(namespaceSerializer, stateDesc);
    +	}
    +
    +	private final Map<Class<? extends StateDescriptor>, StateFactory> stateFactories;
    +
    +	private final KeyedStateFactory originalStateFactory;
    +	private final TtlConfig ttlConfig;
    +	private final TtlTimeProvider timeProvider;
    +
    +	private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
    +		this.originalStateFactory = originalStateFactory;
    +		this.ttlConfig = ttlConfig;
    +		this.timeProvider = timeProvider;
    +		this.stateFactories = createStateFactories();
    +	}
    +
    +	private Map<Class<? extends StateDescriptor>, StateFactory> createStateFactories() {
    +		return Stream.of(
    +			Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState),
    +			Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState),
    +			Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState),
    +			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState),
    +			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState),
    +			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState)
    +		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
    +	}
    +
    +	private interface StateFactory {
    +		<N, SV, S extends State, IS extends S> IS create(
    +			TypeSerializer<N> namespaceSerializer,
    +			StateDescriptor<S, SV> stateDesc) throws Exception;
    +	}
    +
    +	private <N, SV, S extends State, IS extends S> IS createState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		StateFactory stateFactory = stateFactories.get(stateDesc.getClass());
    +		if (stateFactory == null) {
    +			String message = String.format("State %s is not supported by %s",
    +				stateDesc.getClass(), TtlStateFactory.class);
    +			throw new FlinkRuntimeException(message);
    +		}
    +		return stateFactory.create(namespaceSerializer, stateDesc);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <N, SV, S extends State, IS extends S> IS createValueState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		SV defVal = stateDesc.getDefaultValue();
    +		TtlValue<SV> ttlDefVal = defVal == null ? null : new TtlValue<>(defVal, Long.MAX_VALUE);
    +		ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
    +			stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()), ttlDefVal);
    +		return (IS) new TtlValueState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <T, N, SV, S extends State, IS extends S> IS createListState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;
    +		ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
    +			stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));
    +		return (IS) new TtlListState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, listStateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <UK, UV, N, SV, S extends State, IS extends S> IS createMapState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc;
    +		MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(
    +			stateDesc.getName(),
    +			mapStateDesc.getKeySerializer(),
    +			new TtlSerializer<>(mapStateDesc.getValueSerializer()));
    +		return (IS) new TtlMapState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, mapStateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <N, SV, S extends State, IS extends S> IS createReducingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		ReducingStateDescriptor<SV> reducingStateDesc = (ReducingStateDescriptor<SV>) stateDesc;
    +		ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor = new ReducingStateDescriptor<>(
    +			stateDesc.getName(),
    +			new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),
    +			new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlReducingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <IN, OUT, N, SV, S extends State, IS extends S> IS createAggregatingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		AggregatingStateDescriptor<IN, SV, OUT> aggregatingStateDescriptor =
    +			(AggregatingStateDescriptor<IN, SV, OUT>) stateDesc;
    +		TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction = new TtlAggregateFunction<>(
    +			aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider);
    +		AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(
    +			stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlAggregatingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <T, N, SV, S extends State, IS extends S> IS createFoldingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		FoldingStateDescriptor<T, SV> foldingStateDescriptor = (FoldingStateDescriptor<T, SV>) stateDesc;
    +		SV initAcc = stateDesc.getDefaultValue();
    +		TtlValue<SV> ttlInitAcc = initAcc == null ? null : new TtlValue<>(initAcc, Long.MAX_VALUE);
    +		FoldingStateDescriptor<T, TtlValue<SV>> ttlDescriptor = new FoldingStateDescriptor<>(
    +			stateDesc.getName(),
    +			ttlInitAcc,
    +			new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider),
    +			new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlFoldingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	private static class TtlSerializer<T> extends CompositeSerializer<TtlValue<T>> {
    +		TtlSerializer(TypeSerializer<T> userValueSerializer) {
    +			super(Arrays.asList(userValueSerializer, new LongSerializer()));
    +		}
    +
    +		@Override
    +		@SuppressWarnings("unchecked")
    +		protected TtlValue<T> composeValue(List values) {
    --- End diff --
    
    Should we check that `values != null && values.size() == 2 `?


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199782303
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,275 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a composite type using array of its field serializers.
    + * Fields are indexed the same way as their serializers.
    + *
    + * @param <T> type of custom serialized value
    + */
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private static final long serialVersionUID = 1L;
    +
    +	/** Serializers for fields which constitute T. */
    +	protected final TypeSerializer<Object>[] fieldSerializers;
    +
    +	/** Whether T is an immutable type. */
    +	final boolean immutableTargetType;
    +
    +	/** Byte length of target object in serialized form. */
    +	private final int length;
    +
    +	/** Whether any field serializer is stateful. */
    +	private final boolean stateful;
    +
    +	private final int hashCode;
    +
    +	@SuppressWarnings("unchecked")
    +	protected CompositeSerializer(boolean immutableTargetType, TypeSerializer<?> ... fieldSerializers) {
    +		Preconditions.checkNotNull(fieldSerializers);
    +		Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
    +		this.immutableTargetType = immutableTargetType &&
    +			Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType);
    +		this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
    +		this.length = calcLength();
    +		this.stateful = isStateful();
    +		this.hashCode = Arrays.hashCode(fieldSerializers);
    --- End diff --
    
    I think up to this point, the code is iterating `fieldSerializers` 5 times (null checks, immutable check, length calc, stateful check, and hash code computation. It could be done in one iteration, but since this method should typically not be called in hot loops, this is an optional improvement.


---

[GitHub] flink issue #6196: [FLINK-9513] Implement TTL state wrappers factory and ser...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on the issue:

    https://github.com/apache/flink/pull/6196
  
    Regarding CI, seems to be unrelated test `YARNSessionCapacitySchedulerITCase`, [passed in my CI](https://travis-ci.org/azagrebin/flink/builds/398352328)


---

[GitHub] flink issue #6196: [FLINK-9513] Implement TTL state wrappers factory and ser...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on the issue:

    https://github.com/apache/flink/pull/6196
  
    @StefanRRichter 
    I added more precomputed fields to `CompositeSerializer` constructor and included `TtlStateFactory` in TTL tests.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199474258
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a composite type using array of its field serializers.
    + * Fields are indexed the same way as their serializers.
    + *
    + * @param <T> type of custom serialized value
    + */
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private static final long serialVersionUID = 1L;
    +
    +	protected final TypeSerializer<Object>[] fieldSerializers;
    +	final boolean isImmutableTargetType;
    +	private final int length;
    +
    +	@SuppressWarnings("unchecked")
    +	protected CompositeSerializer(boolean isImmutableTargetType, TypeSerializer<?> ... fieldSerializers) {
    +		Preconditions.checkNotNull(fieldSerializers);
    +		Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
    +		this.isImmutableTargetType = isImmutableTargetType;
    +		this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
    +		this.length = calcLength();
    +	}
    +
    +	/** Create new instance from its fields.  */
    +	public abstract T createInstance(@Nonnull Object ... values);
    +
    +	/** Modify field of existing instance. Supported only by mutable types. */
    +	protected abstract void setField(@Nonnull T value, int index, Object fieldValue);
    +
    +	/** Get field of existing instance. */
    +	protected abstract Object getField(@Nonnull T value, int index);
    +
    +	/** Factory for concrete serializer. */
    +	protected abstract CompositeSerializer<T> createSerializerInstance(TypeSerializer<?> ... originalSerializers);
    +
    +	@Override
    +	public CompositeSerializer<T> duplicate() {
    +		TypeSerializer[] duplicatedSerializers = new TypeSerializer[fieldSerializers.length];
    +		boolean stateful = false;
    +		for (int index = 0; index < fieldSerializers.length; index++) {
    +			duplicatedSerializers[index] = fieldSerializers[index].duplicate();
    +			if (fieldSerializers[index] != duplicatedSerializers[index]) {
    +				stateful = true;
    +			}
    +		}
    +		return stateful ? createSerializerInstance(duplicatedSerializers) : this;
    +	}
    +
    +	@Override
    +	public boolean isImmutableType() {
    +		for (TypeSerializer<Object> fieldSerializer : fieldSerializers) {
    --- End diff --
    
    Why not compute this once in the constructor and remember in a boolean flag??


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r197331820
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +public class TtlStateFactory {
    +	public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc,
    +		KeyedStateFactory originalStateFactory,
    +		TtlConfig ttlConfig,
    +		TtlTimeProvider timeProvider) throws Exception {
    +		return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
    +			originalStateFactory.createState(namespaceSerializer, stateDesc) :
    +			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
    +				.createState(namespaceSerializer, stateDesc);
    +	}
    +
    +	private final Map<Class<? extends StateDescriptor>, StateFactory> stateFactories;
    +
    +	private final KeyedStateFactory originalStateFactory;
    +	private final TtlConfig ttlConfig;
    +	private final TtlTimeProvider timeProvider;
    +
    +	private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
    +		this.originalStateFactory = originalStateFactory;
    +		this.ttlConfig = ttlConfig;
    +		this.timeProvider = timeProvider;
    +		this.stateFactories = createStateFactories();
    +	}
    +
    +	private Map<Class<? extends StateDescriptor>, StateFactory> createStateFactories() {
    +		return Stream.of(
    +			Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState),
    +			Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState),
    +			Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState),
    +			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState),
    +			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState),
    +			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState)
    +		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
    +	}
    +
    +	private interface StateFactory {
    +		<N, SV, S extends State, IS extends S> IS create(
    +			TypeSerializer<N> namespaceSerializer,
    +			StateDescriptor<S, SV> stateDesc) throws Exception;
    +	}
    +
    +	private <N, SV, S extends State, IS extends S> IS createState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		StateFactory stateFactory = stateFactories.get(stateDesc.getClass());
    +		if (stateFactory == null) {
    +			String message = String.format("State %s is not supported by %s",
    +				stateDesc.getClass(), TtlStateFactory.class);
    +			throw new FlinkRuntimeException(message);
    +		}
    +		return stateFactory.create(namespaceSerializer, stateDesc);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <N, SV, S extends State, IS extends S> IS createValueState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		SV defVal = stateDesc.getDefaultValue();
    +		TtlValue<SV> ttlDefVal = defVal == null ? null : new TtlValue<>(defVal, Long.MAX_VALUE);
    +		ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
    +			stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()), ttlDefVal);
    +		return (IS) new TtlValueState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <T, N, SV, S extends State, IS extends S> IS createListState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;
    +		ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
    +			stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));
    +		return (IS) new TtlListState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, listStateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <UK, UV, N, SV, S extends State, IS extends S> IS createMapState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc;
    +		MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(
    +			stateDesc.getName(),
    +			mapStateDesc.getKeySerializer(),
    +			new TtlSerializer<>(mapStateDesc.getValueSerializer()));
    +		return (IS) new TtlMapState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, mapStateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <N, SV, S extends State, IS extends S> IS createReducingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		ReducingStateDescriptor<SV> reducingStateDesc = (ReducingStateDescriptor<SV>) stateDesc;
    +		ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor = new ReducingStateDescriptor<>(
    +			stateDesc.getName(),
    +			new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),
    +			new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlReducingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <IN, OUT, N, SV, S extends State, IS extends S> IS createAggregatingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		AggregatingStateDescriptor<IN, SV, OUT> aggregatingStateDescriptor =
    +			(AggregatingStateDescriptor<IN, SV, OUT>) stateDesc;
    +		TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction = new TtlAggregateFunction<>(
    +			aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider);
    +		AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(
    +			stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlAggregatingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <T, N, SV, S extends State, IS extends S> IS createFoldingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		FoldingStateDescriptor<T, SV> foldingStateDescriptor = (FoldingStateDescriptor<T, SV>) stateDesc;
    +		SV initAcc = stateDesc.getDefaultValue();
    +		TtlValue<SV> ttlInitAcc = initAcc == null ? null : new TtlValue<>(initAcc, Long.MAX_VALUE);
    +		FoldingStateDescriptor<T, TtlValue<SV>> ttlDescriptor = new FoldingStateDescriptor<>(
    +			stateDesc.getName(),
    +			ttlInitAcc,
    +			new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider),
    +			new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlFoldingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	private static class TtlSerializer<T> extends CompositeSerializer<TtlValue<T>> {
    +		TtlSerializer(TypeSerializer<T> userValueSerializer) {
    +			super(Arrays.asList(userValueSerializer, new LongSerializer()));
    +		}
    +
    +		@Override
    +		@SuppressWarnings("unchecked")
    +		protected TtlValue<T> composeValue(List values) {
    +			return new TtlValue<>((T) values.get(0), (Long) values.get(1));
    +		}
    +
    +		@Override
    +		protected List decomposeValue(TtlValue<T> v) {
    --- End diff --
    
    Should we check `v != null`?


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r197329976
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,204 @@
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a list of objects
    + *
    + * @param <T> type of custom serialized value
    + */
    +@SuppressWarnings("unchecked")
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private final List<TypeSerializer> originalSerializers;
    +
    +	protected CompositeSerializer(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkNotNull(originalSerializers);
    +		this.originalSerializers = originalSerializers;
    +	}
    +
    +	protected abstract T composeValue(List values);
    +
    +	protected abstract List decomposeValue(T v);
    +
    +	protected abstract CompositeSerializer<T> createSerializerInstance(List<TypeSerializer> originalSerializers);
    +
    +	private T composeValueInternal(List values) {
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return composeValue(values);
    +	}
    +
    +	private List decomposeValueInternal(T v) {
    +		List values = decomposeValue(v);
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return values;
    +	}
    +
    +	private CompositeSerializer<T> createSerializerInstanceInternal(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size());
    +		return createSerializerInstance(originalSerializers);
    +	}
    +
    +	@Override
    +	public CompositeSerializer<T> duplicate() {
    +		return createSerializerInstanceInternal(originalSerializers.stream()
    +			.map(TypeSerializer::duplicate)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public boolean isImmutableType() {
    +		return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
    +	}
    +
    +	@Override
    +	public T createInstance() {
    +		return composeValueInternal(originalSerializers.stream()
    +			.map(TypeSerializer::createInstance)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from) {
    +		List originalValues = decomposeValueInternal(from);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i)))
    +				.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from, T reuse) {
    +		List originalFromValues = decomposeValueInternal(from);
    +		List originalReuseValues = decomposeValueInternal(reuse);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i)))
    +				.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public int getLength() {
    +		return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ?
    +			originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
    +	}
    +
    +	@Override
    +	public void serialize(T record, DataOutputView target) throws IOException {
    +		List originalValues = decomposeValueInternal(record);
    +		for (int i = 0; i < originalSerializers.size(); i++) {
    +			originalSerializers.get(i).serialize(originalValues.get(i), target);
    +		}
    +	}
    +
    +	@Override
    +	public T deserialize(DataInputView source) throws IOException {
    +		List originalValues = new ArrayList();
    --- End diff --
    
    I would suggest to give a init size for `originalValues`, e.g. `List originalValues = new ArrayList(originalSerializers.size());`.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199474494
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a composite type using array of its field serializers.
    + * Fields are indexed the same way as their serializers.
    + *
    + * @param <T> type of custom serialized value
    + */
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private static final long serialVersionUID = 1L;
    +
    +	protected final TypeSerializer<Object>[] fieldSerializers;
    +	final boolean isImmutableTargetType;
    +	private final int length;
    +
    +	@SuppressWarnings("unchecked")
    +	protected CompositeSerializer(boolean isImmutableTargetType, TypeSerializer<?> ... fieldSerializers) {
    +		Preconditions.checkNotNull(fieldSerializers);
    +		Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
    +		this.isImmutableTargetType = isImmutableTargetType;
    +		this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
    +		this.length = calcLength();
    +	}
    +
    +	/** Create new instance from its fields.  */
    +	public abstract T createInstance(@Nonnull Object ... values);
    +
    +	/** Modify field of existing instance. Supported only by mutable types. */
    +	protected abstract void setField(@Nonnull T value, int index, Object fieldValue);
    +
    +	/** Get field of existing instance. */
    +	protected abstract Object getField(@Nonnull T value, int index);
    +
    +	/** Factory for concrete serializer. */
    +	protected abstract CompositeSerializer<T> createSerializerInstance(TypeSerializer<?> ... originalSerializers);
    +
    +	@Override
    +	public CompositeSerializer<T> duplicate() {
    +		TypeSerializer[] duplicatedSerializers = new TypeSerializer[fieldSerializers.length];
    +		boolean stateful = false;
    +		for (int index = 0; index < fieldSerializers.length; index++) {
    +			duplicatedSerializers[index] = fieldSerializers[index].duplicate();
    +			if (fieldSerializers[index] != duplicatedSerializers[index]) {
    +				stateful = true;
    +			}
    +		}
    +		return stateful ? createSerializerInstance(duplicatedSerializers) : this;
    +	}
    +
    +	@Override
    +	public boolean isImmutableType() {
    +		for (TypeSerializer<Object> fieldSerializer : fieldSerializers) {
    +			if (!fieldSerializer.isImmutableType()) {
    +				return false;
    +			}
    +		}
    +		return isImmutableTargetType;
    +	}
    +
    +	@Override
    +	public T createInstance() {
    +		Object[] fields = new Object[fieldSerializers.length];
    +		for (int index = 0; index < fieldSerializers.length; index++) {
    +			fields[index] = fieldSerializers[index].createInstance();
    +		}
    +		return createInstance(fields);
    +	}
    +
    +	@Override
    +	public T copy(T from) {
    +		Preconditions.checkNotNull(from);
    +		Object[] fields = new Object[fieldSerializers.length];
    +		for (int index = 0; index < fieldSerializers.length; index++) {
    +			fields[index] = fieldSerializers[index].copy(getField(from, index));
    +		}
    +		return createInstance(fields);
    +	}
    +
    +	@Override
    +	public T copy(T from, T reuse) {
    +		Preconditions.checkNotNull(from);
    +		Preconditions.checkNotNull(reuse);
    +		Object[] fields = new Object[fieldSerializers.length];
    +		for (int index = 0; index < fieldSerializers.length; index++) {
    +			fields[index] = fieldSerializers[index].copy(getField(from, index), getField(reuse, index));
    +		}
    +		return fromFields(fields, reuse);
    --- End diff --
    
    Same here, could return `this` for immutable types.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199474873
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Objects;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a composite type using array of its field serializers.
    + * Fields are indexed the same way as their serializers.
    + *
    + * @param <T> type of custom serialized value
    + */
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private static final long serialVersionUID = 1L;
    +
    +	protected final TypeSerializer<Object>[] fieldSerializers;
    +	final boolean isImmutableTargetType;
    +	private final int length;
    +
    +	@SuppressWarnings("unchecked")
    +	protected CompositeSerializer(boolean isImmutableTargetType, TypeSerializer<?> ... fieldSerializers) {
    +		Preconditions.checkNotNull(fieldSerializers);
    +		Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
    +		this.isImmutableTargetType = isImmutableTargetType;
    +		this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
    +		this.length = calcLength();
    +	}
    +
    +	/** Create new instance from its fields.  */
    +	public abstract T createInstance(@Nonnull Object ... values);
    +
    +	/** Modify field of existing instance. Supported only by mutable types. */
    +	protected abstract void setField(@Nonnull T value, int index, Object fieldValue);
    +
    +	/** Get field of existing instance. */
    +	protected abstract Object getField(@Nonnull T value, int index);
    +
    +	/** Factory for concrete serializer. */
    +	protected abstract CompositeSerializer<T> createSerializerInstance(TypeSerializer<?> ... originalSerializers);
    +
    +	@Override
    +	public CompositeSerializer<T> duplicate() {
    +		TypeSerializer[] duplicatedSerializers = new TypeSerializer[fieldSerializers.length];
    +		boolean stateful = false;
    +		for (int index = 0; index < fieldSerializers.length; index++) {
    +			duplicatedSerializers[index] = fieldSerializers[index].duplicate();
    +			if (fieldSerializers[index] != duplicatedSerializers[index]) {
    +				stateful = true;
    +			}
    +		}
    +		return stateful ? createSerializerInstance(duplicatedSerializers) : this;
    +	}
    +
    +	@Override
    +	public boolean isImmutableType() {
    +		for (TypeSerializer<Object> fieldSerializer : fieldSerializers) {
    --- End diff --
    
    We can compute many things like length, immutability, etc already in the constructor. Statelessness is the one thing that we might want to figure out and remember on the first attempt.


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r198014021
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +public class TtlStateFactory {
    +	public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc,
    +		KeyedStateFactory originalStateFactory,
    +		TtlConfig ttlConfig,
    +		TtlTimeProvider timeProvider) throws Exception {
    +		return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
    +			originalStateFactory.createState(namespaceSerializer, stateDesc) :
    +			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
    +				.createState(namespaceSerializer, stateDesc);
    +	}
    +
    +	private final Map<Class<? extends StateDescriptor>, StateFactory> stateFactories;
    --- End diff --
    
    I see, but this looks a bit weird, it looks like that the `stateFactories` is only used for looking up the corresponding state factory for the `stateDesc`, and we need to firstly create it every time when calling the `createStateAndWrapWithTtlIfEnabled()`, the flow of the `createStateAndWrapWithTtlIfEnabled()` looks like:
    
    - create a map of `state factory` (stateFactories).
    - use the stateFactories to look up the corresponding state factory for the `stateDesc`.
    - ...
    
    In that case, maybe use the `switch case` to find the corresponding state factory is better, at lest we don't need to firstly create a map of `state factory` for every call  this way. What do you think?


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r198115772
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,204 @@
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a list of objects
    + *
    + * @param <T> type of custom serialized value
    + */
    +@SuppressWarnings("unchecked")
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private final List<TypeSerializer> originalSerializers;
    +
    +	protected CompositeSerializer(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkNotNull(originalSerializers);
    +		this.originalSerializers = originalSerializers;
    +	}
    +
    +	protected abstract T composeValue(List values);
    +
    +	protected abstract List decomposeValue(T v);
    +
    +	protected abstract CompositeSerializer<T> createSerializerInstance(List<TypeSerializer> originalSerializers);
    +
    +	private T composeValueInternal(List values) {
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return composeValue(values);
    +	}
    +
    +	private List decomposeValueInternal(T v) {
    +		List values = decomposeValue(v);
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return values;
    +	}
    +
    +	private CompositeSerializer<T> createSerializerInstanceInternal(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size());
    +		return createSerializerInstance(originalSerializers);
    +	}
    +
    +	@Override
    +	public CompositeSerializer<T> duplicate() {
    +		return createSerializerInstanceInternal(originalSerializers.stream()
    +			.map(TypeSerializer::duplicate)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public boolean isImmutableType() {
    +		return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
    +	}
    +
    +	@Override
    +	public T createInstance() {
    +		return composeValueInternal(originalSerializers.stream()
    +			.map(TypeSerializer::createInstance)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from) {
    +		List originalValues = decomposeValueInternal(from);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i)))
    +				.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from, T reuse) {
    +		List originalFromValues = decomposeValueInternal(from);
    +		List originalReuseValues = decomposeValueInternal(reuse);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i)))
    +				.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public int getLength() {
    +		return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ?
    --- End diff --
    
    +1


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199227770
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +public class TtlStateFactory {
    +	public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc,
    +		KeyedStateFactory originalStateFactory,
    +		TtlConfig ttlConfig,
    +		TtlTimeProvider timeProvider) throws Exception {
    +		return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
    +			originalStateFactory.createState(namespaceSerializer, stateDesc) :
    +			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
    +				.createState(namespaceSerializer, stateDesc);
    +	}
    +
    +	private final Map<Class<? extends StateDescriptor>, StateFactory> stateFactories;
    +
    +	private final KeyedStateFactory originalStateFactory;
    +	private final TtlConfig ttlConfig;
    +	private final TtlTimeProvider timeProvider;
    +
    +	private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
    +		this.originalStateFactory = originalStateFactory;
    +		this.ttlConfig = ttlConfig;
    +		this.timeProvider = timeProvider;
    +		this.stateFactories = createStateFactories();
    +	}
    +
    +	private Map<Class<? extends StateDescriptor>, StateFactory> createStateFactories() {
    +		return Stream.of(
    +			Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState),
    +			Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState),
    +			Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState),
    +			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState),
    +			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState),
    +			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState)
    +		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
    +	}
    +
    +	private interface StateFactory {
    +		<N, SV, S extends State, IS extends S> IS create(
    +			TypeSerializer<N> namespaceSerializer,
    +			StateDescriptor<S, SV> stateDesc) throws Exception;
    +	}
    +
    +	private <N, SV, S extends State, IS extends S> IS createState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		StateFactory stateFactory = stateFactories.get(stateDesc.getClass());
    +		if (stateFactory == null) {
    +			String message = String.format("State %s is not supported by %s",
    +				stateDesc.getClass(), TtlStateFactory.class);
    +			throw new FlinkRuntimeException(message);
    +		}
    +		return stateFactory.create(namespaceSerializer, stateDesc);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <N, SV, S extends State, IS extends S> IS createValueState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		SV defVal = stateDesc.getDefaultValue();
    +		TtlValue<SV> ttlDefVal = defVal == null ? null : new TtlValue<>(defVal, Long.MAX_VALUE);
    +		ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
    --- End diff --
    
    agree, I think we may actually not expose default value for value state with TTL or new descriptor builders in general


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199227970
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import java.util.Arrays;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +public class TtlStateFactory {
    +	public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc,
    +		KeyedStateFactory originalStateFactory,
    +		TtlConfig ttlConfig,
    +		TtlTimeProvider timeProvider) throws Exception {
    +		return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
    +			originalStateFactory.createState(namespaceSerializer, stateDesc) :
    +			new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
    +				.createState(namespaceSerializer, stateDesc);
    +	}
    +
    +	private final Map<Class<? extends StateDescriptor>, StateFactory> stateFactories;
    +
    +	private final KeyedStateFactory originalStateFactory;
    +	private final TtlConfig ttlConfig;
    +	private final TtlTimeProvider timeProvider;
    +
    +	private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
    +		this.originalStateFactory = originalStateFactory;
    +		this.ttlConfig = ttlConfig;
    +		this.timeProvider = timeProvider;
    +		this.stateFactories = createStateFactories();
    +	}
    +
    +	private Map<Class<? extends StateDescriptor>, StateFactory> createStateFactories() {
    +		return Stream.of(
    +			Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState),
    +			Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState),
    +			Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState),
    +			Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState),
    +			Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState),
    +			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState)
    +		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
    +	}
    +
    +	private interface StateFactory {
    +		<N, SV, S extends State, IS extends S> IS create(
    +			TypeSerializer<N> namespaceSerializer,
    +			StateDescriptor<S, SV> stateDesc) throws Exception;
    +	}
    +
    +	private <N, SV, S extends State, IS extends S> IS createState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		StateFactory stateFactory = stateFactories.get(stateDesc.getClass());
    +		if (stateFactory == null) {
    +			String message = String.format("State %s is not supported by %s",
    +				stateDesc.getClass(), TtlStateFactory.class);
    +			throw new FlinkRuntimeException(message);
    +		}
    +		return stateFactory.create(namespaceSerializer, stateDesc);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <N, SV, S extends State, IS extends S> IS createValueState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		SV defVal = stateDesc.getDefaultValue();
    +		TtlValue<SV> ttlDefVal = defVal == null ? null : new TtlValue<>(defVal, Long.MAX_VALUE);
    +		ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
    +			stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()), ttlDefVal);
    +		return (IS) new TtlValueState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <T, N, SV, S extends State, IS extends S> IS createListState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;
    +		ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
    +			stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));
    +		return (IS) new TtlListState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, listStateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <UK, UV, N, SV, S extends State, IS extends S> IS createMapState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc;
    +		MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(
    +			stateDesc.getName(),
    +			mapStateDesc.getKeySerializer(),
    +			new TtlSerializer<>(mapStateDesc.getValueSerializer()));
    +		return (IS) new TtlMapState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, mapStateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <N, SV, S extends State, IS extends S> IS createReducingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		ReducingStateDescriptor<SV> reducingStateDesc = (ReducingStateDescriptor<SV>) stateDesc;
    +		ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor = new ReducingStateDescriptor<>(
    +			stateDesc.getName(),
    +			new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),
    +			new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlReducingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <IN, OUT, N, SV, S extends State, IS extends S> IS createAggregatingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		AggregatingStateDescriptor<IN, SV, OUT> aggregatingStateDescriptor =
    +			(AggregatingStateDescriptor<IN, SV, OUT>) stateDesc;
    +		TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction = new TtlAggregateFunction<>(
    +			aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider);
    +		AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(
    +			stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlAggregatingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction);
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	private <T, N, SV, S extends State, IS extends S> IS createFoldingState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		FoldingStateDescriptor<T, SV> foldingStateDescriptor = (FoldingStateDescriptor<T, SV>) stateDesc;
    +		SV initAcc = stateDesc.getDefaultValue();
    +		TtlValue<SV> ttlInitAcc = initAcc == null ? null : new TtlValue<>(initAcc, Long.MAX_VALUE);
    +		FoldingStateDescriptor<T, TtlValue<SV>> ttlDescriptor = new FoldingStateDescriptor<>(
    +			stateDesc.getName(),
    +			ttlInitAcc,
    +			new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider),
    +			new TtlSerializer<>(stateDesc.getSerializer()));
    +		return (IS) new TtlFoldingState<>(
    +			originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
    +			ttlConfig, timeProvider, stateDesc.getSerializer());
    +	}
    +
    +	private static class TtlSerializer<T> extends CompositeSerializer<TtlValue<T>> {
    +		TtlSerializer(TypeSerializer<T> userValueSerializer) {
    --- End diff --
    
    I will add check in `CompositeSerializer`


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r198015282
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---
    @@ -0,0 +1,204 @@
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.Preconditions;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +
    +/**
    + * Base class for composite serializers.
    + *
    + * <p>This class serializes a list of objects
    + *
    + * @param <T> type of custom serialized value
    + */
    +@SuppressWarnings("unchecked")
    +public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
    +	private final List<TypeSerializer> originalSerializers;
    +
    +	protected CompositeSerializer(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkNotNull(originalSerializers);
    +		this.originalSerializers = originalSerializers;
    +	}
    +
    +	protected abstract T composeValue(List values);
    +
    +	protected abstract List decomposeValue(T v);
    +
    +	protected abstract CompositeSerializer<T> createSerializerInstance(List<TypeSerializer> originalSerializers);
    +
    +	private T composeValueInternal(List values) {
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return composeValue(values);
    +	}
    +
    +	private List decomposeValueInternal(T v) {
    +		List values = decomposeValue(v);
    +		Preconditions.checkArgument(values.size() == originalSerializers.size());
    +		return values;
    +	}
    +
    +	private CompositeSerializer<T> createSerializerInstanceInternal(List<TypeSerializer> originalSerializers) {
    +		Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size());
    +		return createSerializerInstance(originalSerializers);
    +	}
    +
    +	@Override
    +	public CompositeSerializer<T> duplicate() {
    +		return createSerializerInstanceInternal(originalSerializers.stream()
    +			.map(TypeSerializer::duplicate)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public boolean isImmutableType() {
    +		return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
    +	}
    +
    +	@Override
    +	public T createInstance() {
    +		return composeValueInternal(originalSerializers.stream()
    +			.map(TypeSerializer::createInstance)
    +			.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from) {
    +		List originalValues = decomposeValueInternal(from);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i)))
    +				.collect(Collectors.toList()));
    +	}
    +
    +	@Override
    +	public T copy(T from, T reuse) {
    +		List originalFromValues = decomposeValueInternal(from);
    +		List originalReuseValues = decomposeValueInternal(reuse);
    +		return composeValueInternal(
    +			IntStream.range(0, originalSerializers.size())
    +				.mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i)))
    --- End diff --
    
    I would suggest to use the `Iterator` instead of `xxx.get(i)` here, because if the `originalSerializers` (or the other instances) is type of LinkedList the performance could be bad. 


---

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6196#discussion_r199476870
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---
    @@ -0,0 +1,210 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.ttl;
    +
    +import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    +import org.apache.flink.api.common.state.FoldingStateDescriptor;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.ReducingStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.CompositeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.state.KeyedStateFactory;
    +import org.apache.flink.util.FlinkRuntimeException;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * This state factory wraps state objects, produced by backends, with TTL logic.
    + */
    +@SuppressWarnings("unchecked")
    --- End diff --
    
    I would not suppress warnings in the scope of a full class, better more fine grained on methods.


---