You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/06/13 13:31:49 UTC

[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/6159

    [FLINK-9487] Prepare InternalTimerHeap for asynchronous snapshots

    ## What is the purpose of the change
    
    This PR is the first step in the context of FLINK-9485. Purpose of this PR is to enhance ``InternalTimerHeap`` with the capability to produce asynchronous snapshots. This is very similar to the approach of asynchronous snapshots for the ``CopyOnWriteStateTable`` and we want to reuse as much of the existing code as possible to power both instances of snapshots. 
    
    
    ## Brief change log
    
    The first commit generalizes the key-group-partitioning algorithm for async snapshots from the ``CopyOnWriteStateTable``. The newly introduced ``StateSnapshot`` interface outlines the asynchronous snapshot life-cycle, which typically looks as follows. In the synchronous part of a checkpoint, an instance of {StateSnapshot} is produced for a state and captures the state at this point in time. Then, in the asynchronous part of the checkpoint, the user calls `` #partitionByKeyGroup()`` to ensure that the snapshot is partitioned into key-groups. For state that is already partitioned, this can be a NOP. The returned ``KeyGroupPartitionedSnapshot`` can be used by the caller to write the state by key-group. As a last step, when the state is completely written, the user calls ``#release()``.
    
    The partitioning algorithm is also slightly modified to cache computed key-group-ids per element. This is improves runtime at the cost of some additional memory.
    
    The second commit introduced an implementation of ``StateSnapshot`` for the ``InternalTimerHeap`` data structure.
    
    
    ## Verifying this change
    
    This change added tests: ``TimerPartitionerTest`` , ``StateTableKeyGroupPartitionerTest``, ``AbstractKeyGroupPartitionedSnapshotTest``
    
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink FLINK-9487

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6159.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6159
    
----
commit 9e95e1a5f36b6c4f0b021f38b2a2a4c2f8dacf3a
Author: Stefan Richter <s....@...>
Date:   2018-06-11T12:48:06Z

    Refactor/generalize key-group partitioning.

commit 800abd9743c5483f50835d0cf938ad50f550ae49
Author: Stefan Richter <s....@...>
Date:   2018-06-11T15:44:10Z

    Introduce TimerHeap snapshots and key-group-partitioning

----


---

[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6159


---

[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6159#discussion_r195156251
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java ---
    @@ -0,0 +1,227 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +
    +/**
    + * Abstract class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
    + * with two array (input, output) for optimal algorithmic complexity. Notice that this could also be implemented over a
    + * single array, using some cuckoo-hashing-style element replacement. This would have worse algorithmic complexity but
    + * better space efficiency. We currently prefer the trade-off in favor of better algorithmic complexity.
    + */
    +public abstract class AbstractKeyGroupPartitioner<T> {
    +
    +	/** Total number of input elements. */
    +	@Nonnegative
    +	protected final int numberOfElements;
    +
    +	/** The total number of key-groups in the job. */
    +	@Nonnegative
    +	protected final int totalKeyGroups;
    +
    +	/** The key-group range for the input data, covered in this partitioning. */
    +	@Nonnull
    +	protected final KeyGroupRange keyGroupRange;
    +
    +	/**
    +	 * This bookkeeping array is used to count the elements in each key-group. In a second step, it is transformed into
    +	 * a histogram by accumulation.
    +	 */
    +	@Nonnull
    +	protected final int[] counterHistogram;
    +
    +	/**
    +	 * This is a helper array that caches the key-group for each element, so we do not have to compute them twice.
    +	 */
    +	@Nonnull
    +	protected final int[] elementKeyGroups;
    +
    +	/** Cached value of keyGroupRange#firstKeyGroup. */
    +	@Nonnegative
    +	protected final int firstKeyGroup;
    +
    +	/** Cached result. */
    +	protected PartitioningResult<T> computedResult;
    +
    +	/**
    +	 * @param keyGroupRange the key-group range of the data that will be partitioned by this instance.
    +	 * @param totalKeyGroups the total number of key groups in the job.
    +	 */
    +	public AbstractKeyGroupPartitioner(
    +		@Nonnegative int numberOfElements,
    +		@Nonnull KeyGroupRange keyGroupRange,
    +		@Nonnegative int totalKeyGroups) {
    +
    +		this.numberOfElements = numberOfElements;
    +		this.keyGroupRange = keyGroupRange;
    +		this.totalKeyGroups = totalKeyGroups;
    +		this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
    +		this.elementKeyGroups = new int[numberOfElements];
    +		this.counterHistogram = new int[keyGroupRange.getNumberOfKeyGroups()];
    +		this.computedResult = null;
    +	}
    +
    +	/**
    +	 * Partitions the data into key-groups and returns the result via {@link PartitioningResult}.
    +	 */
    +	public PartitioningResult<T> partitionByKeyGroup() {
    +		if (computedResult == null) {
    +			reportAllElementKeyGroups();
    +			buildHistogramFromCounts();
    +			executePartitioning();
    +		}
    +		return computedResult;
    +	}
    +
    +	/**
    +	 * This method iterates over the input data and reports the key-group for each element.
    +	 */
    +	protected void reportAllElementKeyGroups() {
    +		final T[] input = getPartitioningInput();
    +
    +		Preconditions.checkState(input.length >= numberOfElements);
    +
    +		for (int i = 0; i < numberOfElements; ++i) {
    +			int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement(input[i]), totalKeyGroups);
    +			reportKeyGroupOfElementAtIndex(i, keyGroup);
    +		}
    +	}
    +
    +	/**
    +	 * Returns the key for the given element by which the key-group can be computed.
    +	 */
    +	@Nonnull
    +	protected abstract Object extractKeyFromElement(T element);
    +
    +	/**
    +	 * Returns the input data for the partitioning. All elements to consider must be densely in the index interval
    +	 * [0, {@link #numberOfElements}[, without null values.
    +	 */
    +	@Nonnull
    +	protected abstract T[] getPartitioningInput();
    +
    +	/**
    +	 * Returns the output array for the partitioning. The size must be {@link #numberOfElements} (or bigger).
    +	 */
    +	@Nonnull
    +	protected abstract T[] getPartitioningOutput();
    +
    +	/**
    +	 * This method reports in the bookkeeping data that the element at the given index belongs to the given key-group.
    +	 */
    +	protected void reportKeyGroupOfElementAtIndex(int index, int keyGroup) {
    +		final int keyGroupIndex = keyGroup - firstKeyGroup;
    +		elementKeyGroups[index] = keyGroupIndex;
    +		++counterHistogram[keyGroupIndex];
    +	}
    +
    +	/**
    +	 * This method creates a histogram from the counts per key-group in {@link #counterHistogram}.
    +	 */
    +	private void buildHistogramFromCounts() {
    +		int sum = 0;
    +		for (int i = 0; i < counterHistogram.length; ++i) {
    +			int currentSlotValue = counterHistogram[i];
    +			counterHistogram[i] = sum;
    +			sum += currentSlotValue;
    +		}
    +	}
    +
    +	private void executePartitioning() {
    +
    +		final T[] in = getPartitioningInput();
    +		final T[] out = getPartitioningOutput();
    +
    +		Preconditions.checkState(in != out);
    +		Preconditions.checkState(in.length >= numberOfElements);
    +		Preconditions.checkState(out.length >= numberOfElements);
    --- End diff --
    
    This looks like a sanity check, but for `CopyOnWriteStateTable` there seems to be a loop hole: if the number of elements in backend is more than `1 << 30`, this check could be failed, which means that the current version does support to storage more than `1 << 30` records in heap based backend(but the previous version support it, it just log some warning message when the state's size is too large), is this intended?


---

[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6159#discussion_r195443441
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshot.java ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.core.memory.DataOutputView;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +
    +
    +/**
    + * Abstract base class for implementations of
    + * {@link org.apache.flink.runtime.state.StateSnapshot.KeyGroupPartitionedSnapshot} based on the result of a
    + * {@link AbstractKeyGroupPartitioner}.
    + *
    + * @param <T> type of the written elements.
    + */
    +public abstract class AbstractKeyGroupPartitionedSnapshot<T> implements StateSnapshot.KeyGroupPartitionedSnapshot {
    +
    +	/** The partitioning result to be written by key-group. */
    +	@Nonnull
    +	private final AbstractKeyGroupPartitioner.PartitioningResult<T> partitioningResult;
    +
    +	public AbstractKeyGroupPartitionedSnapshot(
    +		@Nonnull AbstractKeyGroupPartitioner.PartitioningResult<T> partitioningResult) {
    +		this.partitioningResult = partitioningResult;
    +	}
    +
    +	@Override
    +	public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {
    --- End diff --
    
    @Nonnegative int keyGroupId, for consistency


---

[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6159#discussion_r195376144
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java ---
    @@ -0,0 +1,227 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +
    +/**
    + * Abstract class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
    + * with two array (input, output) for optimal algorithmic complexity. Notice that this could also be implemented over a
    + * single array, using some cuckoo-hashing-style element replacement. This would have worse algorithmic complexity but
    + * better space efficiency. We currently prefer the trade-off in favor of better algorithmic complexity.
    + */
    --- End diff --
    
    👍 


---

[GitHub] flink issue #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchronous s...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6159
  
    Thanks for the review @sihuazhou! 


---

[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6159#discussion_r195168815
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java ---
    @@ -224,6 +227,15 @@ void bulkAddRestoredTimers(Collection<? extends InternalTimer<K, N>> restoredTim
     		return result;
     	}
     
    +	@Nonnull
    +	StateSnapshot snapshot(TimerHeapInternalTimer.TimerSerializer<K, N> serializer) {
    +		return new InternalTimerHeapSnapshot<>(
    +			Arrays.copyOfRange(queue, 1, size + 1),
    +			serializer,
    --- End diff --
    
    The `serializer` was be duplicated here or in the constructor of `InternalTimerHeapSnapshot` in the current code, do you plan to duplicate it when calling the `snapshot()` in the next code?


---

[GitHub] flink issue #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchronous s...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6159
  
    @azagrebin @sihuazhou I have addressed the comments and also slightly refactored the PR. Please take another look if you want.


---

[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6159#discussion_r195299958
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java ---
    @@ -0,0 +1,227 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +
    +/**
    + * Abstract class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
    + * with two array (input, output) for optimal algorithmic complexity. Notice that this could also be implemented over a
    + * single array, using some cuckoo-hashing-style element replacement. This would have worse algorithmic complexity but
    + * better space efficiency. We currently prefer the trade-off in favor of better algorithmic complexity.
    + */
    --- End diff --
    
    Maybe we could add a param annotation for `T`.


---

[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6159#discussion_r195375851
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java ---
    @@ -0,0 +1,227 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +
    +/**
    + * Abstract class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
    + * with two array (input, output) for optimal algorithmic complexity. Notice that this could also be implemented over a
    + * single array, using some cuckoo-hashing-style element replacement. This would have worse algorithmic complexity but
    + * better space efficiency. We currently prefer the trade-off in favor of better algorithmic complexity.
    + */
    +public abstract class AbstractKeyGroupPartitioner<T> {
    +
    +	/** Total number of input elements. */
    +	@Nonnegative
    +	protected final int numberOfElements;
    +
    +	/** The total number of key-groups in the job. */
    +	@Nonnegative
    +	protected final int totalKeyGroups;
    +
    +	/** The key-group range for the input data, covered in this partitioning. */
    +	@Nonnull
    +	protected final KeyGroupRange keyGroupRange;
    +
    +	/**
    +	 * This bookkeeping array is used to count the elements in each key-group. In a second step, it is transformed into
    +	 * a histogram by accumulation.
    +	 */
    +	@Nonnull
    +	protected final int[] counterHistogram;
    +
    +	/**
    +	 * This is a helper array that caches the key-group for each element, so we do not have to compute them twice.
    +	 */
    +	@Nonnull
    +	protected final int[] elementKeyGroups;
    +
    +	/** Cached value of keyGroupRange#firstKeyGroup. */
    +	@Nonnegative
    +	protected final int firstKeyGroup;
    +
    +	/** Cached result. */
    +	protected PartitioningResult<T> computedResult;
    +
    +	/**
    +	 * @param keyGroupRange the key-group range of the data that will be partitioned by this instance.
    +	 * @param totalKeyGroups the total number of key groups in the job.
    +	 */
    +	public AbstractKeyGroupPartitioner(
    +		@Nonnegative int numberOfElements,
    +		@Nonnull KeyGroupRange keyGroupRange,
    +		@Nonnegative int totalKeyGroups) {
    +
    +		this.numberOfElements = numberOfElements;
    +		this.keyGroupRange = keyGroupRange;
    +		this.totalKeyGroups = totalKeyGroups;
    +		this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
    +		this.elementKeyGroups = new int[numberOfElements];
    +		this.counterHistogram = new int[keyGroupRange.getNumberOfKeyGroups()];
    +		this.computedResult = null;
    +	}
    +
    +	/**
    +	 * Partitions the data into key-groups and returns the result via {@link PartitioningResult}.
    +	 */
    +	public PartitioningResult<T> partitionByKeyGroup() {
    +		if (computedResult == null) {
    +			reportAllElementKeyGroups();
    +			buildHistogramFromCounts();
    +			executePartitioning();
    +		}
    +		return computedResult;
    +	}
    +
    +	/**
    +	 * This method iterates over the input data and reports the key-group for each element.
    +	 */
    +	protected void reportAllElementKeyGroups() {
    +		final T[] input = getPartitioningInput();
    +
    +		Preconditions.checkState(input.length >= numberOfElements);
    +
    +		for (int i = 0; i < numberOfElements; ++i) {
    +			int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement(input[i]), totalKeyGroups);
    +			reportKeyGroupOfElementAtIndex(i, keyGroup);
    +		}
    +	}
    +
    +	/**
    +	 * Returns the key for the given element by which the key-group can be computed.
    +	 */
    +	@Nonnull
    +	protected abstract Object extractKeyFromElement(T element);
    +
    +	/**
    +	 * Returns the input data for the partitioning. All elements to consider must be densely in the index interval
    +	 * [0, {@link #numberOfElements}[, without null values.
    +	 */
    +	@Nonnull
    +	protected abstract T[] getPartitioningInput();
    +
    +	/**
    +	 * Returns the output array for the partitioning. The size must be {@link #numberOfElements} (or bigger).
    +	 */
    +	@Nonnull
    +	protected abstract T[] getPartitioningOutput();
    +
    +	/**
    +	 * This method reports in the bookkeeping data that the element at the given index belongs to the given key-group.
    +	 */
    +	protected void reportKeyGroupOfElementAtIndex(int index, int keyGroup) {
    +		final int keyGroupIndex = keyGroup - firstKeyGroup;
    +		elementKeyGroups[index] = keyGroupIndex;
    +		++counterHistogram[keyGroupIndex];
    +	}
    +
    +	/**
    +	 * This method creates a histogram from the counts per key-group in {@link #counterHistogram}.
    +	 */
    +	private void buildHistogramFromCounts() {
    +		int sum = 0;
    +		for (int i = 0; i < counterHistogram.length; ++i) {
    +			int currentSlotValue = counterHistogram[i];
    +			counterHistogram[i] = sum;
    +			sum += currentSlotValue;
    +		}
    +	}
    +
    +	private void executePartitioning() {
    +
    +		final T[] in = getPartitioningInput();
    +		final T[] out = getPartitioningOutput();
    +
    +		Preconditions.checkState(in != out);
    +		Preconditions.checkState(in.length >= numberOfElements);
    +		Preconditions.checkState(out.length >= numberOfElements);
    --- End diff --
    
    I am aware of this corner case and this theoretical problem already existed before the PR so it is a bit out of scope. I have a fix to check i) in the resize if the maximum size was reached and then throw an exception ii) compute the resize threshold for array size 1 << 30 as the maximum array size (~ Integer.MAX - 8). 


---

[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6159#discussion_r195376108
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java ---
    @@ -224,6 +227,15 @@ void bulkAddRestoredTimers(Collection<? extends InternalTimer<K, N>> restoredTim
     		return result;
     	}
     
    +	@Nonnull
    +	StateSnapshot snapshot(TimerHeapInternalTimer.TimerSerializer<K, N> serializer) {
    +		return new InternalTimerHeapSnapshot<>(
    +			Arrays.copyOfRange(queue, 1, size + 1),
    +			serializer,
    --- End diff --
    
    This method is not called and the signature will probably change. Just have it here to roughly outline how it this could be used in the future. 


---

[GitHub] flink issue #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchronous s...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6159
  
    CC @azagrebin 


---

[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6159#discussion_r195441098
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshot.java ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.core.memory.DataOutputView;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +
    +
    +/**
    + * Abstract base class for implementations of
    + * {@link org.apache.flink.runtime.state.StateSnapshot.KeyGroupPartitionedSnapshot} based on the result of a
    + * {@link AbstractKeyGroupPartitioner}.
    + *
    + * @param <T> type of the written elements.
    + */
    +public abstract class AbstractKeyGroupPartitionedSnapshot<T> implements StateSnapshot.KeyGroupPartitionedSnapshot {
    +
    +	/** The partitioning result to be written by key-group. */
    +	@Nonnull
    +	private final AbstractKeyGroupPartitioner.PartitioningResult<T> partitioningResult;
    +
    +	public AbstractKeyGroupPartitionedSnapshot(
    +		@Nonnull AbstractKeyGroupPartitioner.PartitioningResult<T> partitioningResult) {
    +		this.partitioningResult = partitioningResult;
    +	}
    +
    +	@Override
    +	public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {
    +
    +		final T[] groupedOut = partitioningResult.getPartitionedElements();
    +
    +		int startOffset = partitioningResult.getKeyGroupStartOffsetInclusive(keyGroupId);
    +		int endOffset = partitioningResult.getKeyGroupEndOffsetExclusive(keyGroupId);
    +
    +		// write number of mappings in key-group
    +		dov.writeInt(endOffset - startOffset);
    +
    +		// write mappings
    +		for (int i = startOffset; i < endOffset; ++i) {
    +			if(groupedOut[i] == null) {
    +				throw new IllegalStateException();
    +			}
    +			writeElement(groupedOut[i], dov);
    +			groupedOut[i] = null; // free asap for GC
    +		}
    +	}
    +
    +	/**
    +	 * This method defines how to write a single element to the output.
    +	 *
    +	 * @param element the element to be written.
    +	 * @param dov the output view to write the element.
    +	 * @throws IOException on write-related problems.
    +	 */
    +	protected abstract void writeElement(@Nonnull T element, @Nonnull DataOutputView dov) throws IOException;
    --- End diff --
    
    `AbstractKeyGroupPartitionedSnapshot.writeElement` looks like strategy for `writeMappingsInKeyGroup`. It could be injected in constructor or `writeMappingsInKeyGroup` as lambda `ElementWriter` interface and eliminate inheritance abstractions. 
    
    The partitioner could just output PartitionedSnapshot which would be partitioningResult + writeMappingsInKeyGroup. The result seems to have just one purpose to be written as keyed snapshot.


---

[GitHub] flink issue #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchronous s...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/6159
  
    @StefanRRichter Sorry that I didn't take a look again yesterday, I was a bit too busy at that time. I had a look at the fix for the `CopyOnWriteStateTable`, I afraid it's still a partial fix, I just create a JIRA for it and will give a PR for it very soon. 
    
    In short, the another partial problem is that we reuse the `snaphotData` as the output array when partitioning the data, but the `snapshotData` is max length is `1 << 30`. So when the records in `CopyOnWriteStateTable` is more than `1 << 30` (e.g. 1 <<30 + 1), then the check `Preconditions.checkState(partitioningDestination.length >= numberOfElements);` could be failed.
    



---