You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/06/13 13:31:49 UTC
[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...
GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/6159
[FLINK-9487] Prepare InternalTimerHeap for asynchronous snapshots
## What is the purpose of the change
This PR is the first step in the context of FLINK-9485. Purpose of this PR is to enhance ``InternalTimerHeap`` with the capability to produce asynchronous snapshots. This is very similar to the approach of asynchronous snapshots for the ``CopyOnWriteStateTable`` and we want to reuse as much of the existing code as possible to power both instances of snapshots.
## Brief change log
The first commit generalizes the key-group-partitioning algorithm for async snapshots from the ``CopyOnWriteStateTable``. The newly introduced ``StateSnapshot`` interface outlines the asynchronous snapshot life-cycle, which typically looks as follows. In the synchronous part of a checkpoint, an instance of {StateSnapshot} is produced for a state and captures the state at this point in time. Then, in the asynchronous part of the checkpoint, the user calls `` #partitionByKeyGroup()`` to ensure that the snapshot is partitioned into key-groups. For state that is already partitioned, this can be a NOP. The returned ``KeyGroupPartitionedSnapshot`` can be used by the caller to write the state by key-group. As a last step, when the state is completely written, the user calls ``#release()``.
The partitioning algorithm is also slightly modified to cache computed key-group-ids per element. This is improves runtime at the cost of some additional memory.
The second commit introduced an implementation of ``StateSnapshot`` for the ``InternalTimerHeap`` data structure.
## Verifying this change
This change added tests: ``TimerPartitionerTest`` , ``StateTableKeyGroupPartitionerTest``, ``AbstractKeyGroupPartitionedSnapshotTest``
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (yes)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink FLINK-9487
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6159.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6159
----
commit 9e95e1a5f36b6c4f0b021f38b2a2a4c2f8dacf3a
Author: Stefan Richter <s....@...>
Date: 2018-06-11T12:48:06Z
Refactor/generalize key-group partitioning.
commit 800abd9743c5483f50835d0cf938ad50f550ae49
Author: Stefan Richter <s....@...>
Date: 2018-06-11T15:44:10Z
Introduce TimerHeap snapshots and key-group-partitioning
----
---
[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6159
---
[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6159#discussion_r195156251
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java ---
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+/**
+ * Abstract class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
+ * with two array (input, output) for optimal algorithmic complexity. Notice that this could also be implemented over a
+ * single array, using some cuckoo-hashing-style element replacement. This would have worse algorithmic complexity but
+ * better space efficiency. We currently prefer the trade-off in favor of better algorithmic complexity.
+ */
+public abstract class AbstractKeyGroupPartitioner<T> {
+
+ /** Total number of input elements. */
+ @Nonnegative
+ protected final int numberOfElements;
+
+ /** The total number of key-groups in the job. */
+ @Nonnegative
+ protected final int totalKeyGroups;
+
+ /** The key-group range for the input data, covered in this partitioning. */
+ @Nonnull
+ protected final KeyGroupRange keyGroupRange;
+
+ /**
+ * This bookkeeping array is used to count the elements in each key-group. In a second step, it is transformed into
+ * a histogram by accumulation.
+ */
+ @Nonnull
+ protected final int[] counterHistogram;
+
+ /**
+ * This is a helper array that caches the key-group for each element, so we do not have to compute them twice.
+ */
+ @Nonnull
+ protected final int[] elementKeyGroups;
+
+ /** Cached value of keyGroupRange#firstKeyGroup. */
+ @Nonnegative
+ protected final int firstKeyGroup;
+
+ /** Cached result. */
+ protected PartitioningResult<T> computedResult;
+
+ /**
+ * @param keyGroupRange the key-group range of the data that will be partitioned by this instance.
+ * @param totalKeyGroups the total number of key groups in the job.
+ */
+ public AbstractKeyGroupPartitioner(
+ @Nonnegative int numberOfElements,
+ @Nonnull KeyGroupRange keyGroupRange,
+ @Nonnegative int totalKeyGroups) {
+
+ this.numberOfElements = numberOfElements;
+ this.keyGroupRange = keyGroupRange;
+ this.totalKeyGroups = totalKeyGroups;
+ this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
+ this.elementKeyGroups = new int[numberOfElements];
+ this.counterHistogram = new int[keyGroupRange.getNumberOfKeyGroups()];
+ this.computedResult = null;
+ }
+
+ /**
+ * Partitions the data into key-groups and returns the result via {@link PartitioningResult}.
+ */
+ public PartitioningResult<T> partitionByKeyGroup() {
+ if (computedResult == null) {
+ reportAllElementKeyGroups();
+ buildHistogramFromCounts();
+ executePartitioning();
+ }
+ return computedResult;
+ }
+
+ /**
+ * This method iterates over the input data and reports the key-group for each element.
+ */
+ protected void reportAllElementKeyGroups() {
+ final T[] input = getPartitioningInput();
+
+ Preconditions.checkState(input.length >= numberOfElements);
+
+ for (int i = 0; i < numberOfElements; ++i) {
+ int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement(input[i]), totalKeyGroups);
+ reportKeyGroupOfElementAtIndex(i, keyGroup);
+ }
+ }
+
+ /**
+ * Returns the key for the given element by which the key-group can be computed.
+ */
+ @Nonnull
+ protected abstract Object extractKeyFromElement(T element);
+
+ /**
+ * Returns the input data for the partitioning. All elements to consider must be densely in the index interval
+ * [0, {@link #numberOfElements}[, without null values.
+ */
+ @Nonnull
+ protected abstract T[] getPartitioningInput();
+
+ /**
+ * Returns the output array for the partitioning. The size must be {@link #numberOfElements} (or bigger).
+ */
+ @Nonnull
+ protected abstract T[] getPartitioningOutput();
+
+ /**
+ * This method reports in the bookkeeping data that the element at the given index belongs to the given key-group.
+ */
+ protected void reportKeyGroupOfElementAtIndex(int index, int keyGroup) {
+ final int keyGroupIndex = keyGroup - firstKeyGroup;
+ elementKeyGroups[index] = keyGroupIndex;
+ ++counterHistogram[keyGroupIndex];
+ }
+
+ /**
+ * This method creates a histogram from the counts per key-group in {@link #counterHistogram}.
+ */
+ private void buildHistogramFromCounts() {
+ int sum = 0;
+ for (int i = 0; i < counterHistogram.length; ++i) {
+ int currentSlotValue = counterHistogram[i];
+ counterHistogram[i] = sum;
+ sum += currentSlotValue;
+ }
+ }
+
+ private void executePartitioning() {
+
+ final T[] in = getPartitioningInput();
+ final T[] out = getPartitioningOutput();
+
+ Preconditions.checkState(in != out);
+ Preconditions.checkState(in.length >= numberOfElements);
+ Preconditions.checkState(out.length >= numberOfElements);
--- End diff --
This looks like a sanity check, but for `CopyOnWriteStateTable` there seems to be a loop hole: if the number of elements in backend is more than `1 << 30`, this check could be failed, which means that the current version does support to storage more than `1 << 30` records in heap based backend(but the previous version support it, it just log some warning message when the state's size is too large), is this intended?
---
[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6159#discussion_r195443441
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshot.java ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.memory.DataOutputView;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+
+/**
+ * Abstract base class for implementations of
+ * {@link org.apache.flink.runtime.state.StateSnapshot.KeyGroupPartitionedSnapshot} based on the result of a
+ * {@link AbstractKeyGroupPartitioner}.
+ *
+ * @param <T> type of the written elements.
+ */
+public abstract class AbstractKeyGroupPartitionedSnapshot<T> implements StateSnapshot.KeyGroupPartitionedSnapshot {
+
+ /** The partitioning result to be written by key-group. */
+ @Nonnull
+ private final AbstractKeyGroupPartitioner.PartitioningResult<T> partitioningResult;
+
+ public AbstractKeyGroupPartitionedSnapshot(
+ @Nonnull AbstractKeyGroupPartitioner.PartitioningResult<T> partitioningResult) {
+ this.partitioningResult = partitioningResult;
+ }
+
+ @Override
+ public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {
--- End diff --
@Nonnegative int keyGroupId, for consistency
---
[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6159#discussion_r195376144
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java ---
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+/**
+ * Abstract class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
+ * with two array (input, output) for optimal algorithmic complexity. Notice that this could also be implemented over a
+ * single array, using some cuckoo-hashing-style element replacement. This would have worse algorithmic complexity but
+ * better space efficiency. We currently prefer the trade-off in favor of better algorithmic complexity.
+ */
--- End diff --
👍
---
[GitHub] flink issue #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchronous s...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/6159
Thanks for the review @sihuazhou!
---
[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6159#discussion_r195168815
--- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java ---
@@ -224,6 +227,15 @@ void bulkAddRestoredTimers(Collection<? extends InternalTimer<K, N>> restoredTim
return result;
}
+ @Nonnull
+ StateSnapshot snapshot(TimerHeapInternalTimer.TimerSerializer<K, N> serializer) {
+ return new InternalTimerHeapSnapshot<>(
+ Arrays.copyOfRange(queue, 1, size + 1),
+ serializer,
--- End diff --
The `serializer` was be duplicated here or in the constructor of `InternalTimerHeapSnapshot` in the current code, do you plan to duplicate it when calling the `snapshot()` in the next code?
---
[GitHub] flink issue #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchronous s...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/6159
@azagrebin @sihuazhou I have addressed the comments and also slightly refactored the PR. Please take another look if you want.
---
[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6159#discussion_r195299958
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java ---
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+/**
+ * Abstract class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
+ * with two array (input, output) for optimal algorithmic complexity. Notice that this could also be implemented over a
+ * single array, using some cuckoo-hashing-style element replacement. This would have worse algorithmic complexity but
+ * better space efficiency. We currently prefer the trade-off in favor of better algorithmic complexity.
+ */
--- End diff --
Maybe we could add a param annotation for `T`.
---
[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6159#discussion_r195375851
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitioner.java ---
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+/**
+ * Abstract class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
+ * with two array (input, output) for optimal algorithmic complexity. Notice that this could also be implemented over a
+ * single array, using some cuckoo-hashing-style element replacement. This would have worse algorithmic complexity but
+ * better space efficiency. We currently prefer the trade-off in favor of better algorithmic complexity.
+ */
+public abstract class AbstractKeyGroupPartitioner<T> {
+
+ /** Total number of input elements. */
+ @Nonnegative
+ protected final int numberOfElements;
+
+ /** The total number of key-groups in the job. */
+ @Nonnegative
+ protected final int totalKeyGroups;
+
+ /** The key-group range for the input data, covered in this partitioning. */
+ @Nonnull
+ protected final KeyGroupRange keyGroupRange;
+
+ /**
+ * This bookkeeping array is used to count the elements in each key-group. In a second step, it is transformed into
+ * a histogram by accumulation.
+ */
+ @Nonnull
+ protected final int[] counterHistogram;
+
+ /**
+ * This is a helper array that caches the key-group for each element, so we do not have to compute them twice.
+ */
+ @Nonnull
+ protected final int[] elementKeyGroups;
+
+ /** Cached value of keyGroupRange#firstKeyGroup. */
+ @Nonnegative
+ protected final int firstKeyGroup;
+
+ /** Cached result. */
+ protected PartitioningResult<T> computedResult;
+
+ /**
+ * @param keyGroupRange the key-group range of the data that will be partitioned by this instance.
+ * @param totalKeyGroups the total number of key groups in the job.
+ */
+ public AbstractKeyGroupPartitioner(
+ @Nonnegative int numberOfElements,
+ @Nonnull KeyGroupRange keyGroupRange,
+ @Nonnegative int totalKeyGroups) {
+
+ this.numberOfElements = numberOfElements;
+ this.keyGroupRange = keyGroupRange;
+ this.totalKeyGroups = totalKeyGroups;
+ this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
+ this.elementKeyGroups = new int[numberOfElements];
+ this.counterHistogram = new int[keyGroupRange.getNumberOfKeyGroups()];
+ this.computedResult = null;
+ }
+
+ /**
+ * Partitions the data into key-groups and returns the result via {@link PartitioningResult}.
+ */
+ public PartitioningResult<T> partitionByKeyGroup() {
+ if (computedResult == null) {
+ reportAllElementKeyGroups();
+ buildHistogramFromCounts();
+ executePartitioning();
+ }
+ return computedResult;
+ }
+
+ /**
+ * This method iterates over the input data and reports the key-group for each element.
+ */
+ protected void reportAllElementKeyGroups() {
+ final T[] input = getPartitioningInput();
+
+ Preconditions.checkState(input.length >= numberOfElements);
+
+ for (int i = 0; i < numberOfElements; ++i) {
+ int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement(input[i]), totalKeyGroups);
+ reportKeyGroupOfElementAtIndex(i, keyGroup);
+ }
+ }
+
+ /**
+ * Returns the key for the given element by which the key-group can be computed.
+ */
+ @Nonnull
+ protected abstract Object extractKeyFromElement(T element);
+
+ /**
+ * Returns the input data for the partitioning. All elements to consider must be densely in the index interval
+ * [0, {@link #numberOfElements}[, without null values.
+ */
+ @Nonnull
+ protected abstract T[] getPartitioningInput();
+
+ /**
+ * Returns the output array for the partitioning. The size must be {@link #numberOfElements} (or bigger).
+ */
+ @Nonnull
+ protected abstract T[] getPartitioningOutput();
+
+ /**
+ * This method reports in the bookkeeping data that the element at the given index belongs to the given key-group.
+ */
+ protected void reportKeyGroupOfElementAtIndex(int index, int keyGroup) {
+ final int keyGroupIndex = keyGroup - firstKeyGroup;
+ elementKeyGroups[index] = keyGroupIndex;
+ ++counterHistogram[keyGroupIndex];
+ }
+
+ /**
+ * This method creates a histogram from the counts per key-group in {@link #counterHistogram}.
+ */
+ private void buildHistogramFromCounts() {
+ int sum = 0;
+ for (int i = 0; i < counterHistogram.length; ++i) {
+ int currentSlotValue = counterHistogram[i];
+ counterHistogram[i] = sum;
+ sum += currentSlotValue;
+ }
+ }
+
+ private void executePartitioning() {
+
+ final T[] in = getPartitioningInput();
+ final T[] out = getPartitioningOutput();
+
+ Preconditions.checkState(in != out);
+ Preconditions.checkState(in.length >= numberOfElements);
+ Preconditions.checkState(out.length >= numberOfElements);
--- End diff --
I am aware of this corner case and this theoretical problem already existed before the PR so it is a bit out of scope. I have a fix to check i) in the resize if the maximum size was reached and then throw an exception ii) compute the resize threshold for array size 1 << 30 as the maximum array size (~ Integer.MAX - 8).
---
[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6159#discussion_r195376108
--- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java ---
@@ -224,6 +227,15 @@ void bulkAddRestoredTimers(Collection<? extends InternalTimer<K, N>> restoredTim
return result;
}
+ @Nonnull
+ StateSnapshot snapshot(TimerHeapInternalTimer.TimerSerializer<K, N> serializer) {
+ return new InternalTimerHeapSnapshot<>(
+ Arrays.copyOfRange(queue, 1, size + 1),
+ serializer,
--- End diff --
This method is not called and the signature will probably change. Just have it here to roughly outline how it this could be used in the future.
---
[GitHub] flink issue #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchronous s...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/6159
CC @azagrebin
---
[GitHub] flink pull request #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchr...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6159#discussion_r195441098
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyGroupPartitionedSnapshot.java ---
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.memory.DataOutputView;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+
+/**
+ * Abstract base class for implementations of
+ * {@link org.apache.flink.runtime.state.StateSnapshot.KeyGroupPartitionedSnapshot} based on the result of a
+ * {@link AbstractKeyGroupPartitioner}.
+ *
+ * @param <T> type of the written elements.
+ */
+public abstract class AbstractKeyGroupPartitionedSnapshot<T> implements StateSnapshot.KeyGroupPartitionedSnapshot {
+
+ /** The partitioning result to be written by key-group. */
+ @Nonnull
+ private final AbstractKeyGroupPartitioner.PartitioningResult<T> partitioningResult;
+
+ public AbstractKeyGroupPartitionedSnapshot(
+ @Nonnull AbstractKeyGroupPartitioner.PartitioningResult<T> partitioningResult) {
+ this.partitioningResult = partitioningResult;
+ }
+
+ @Override
+ public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {
+
+ final T[] groupedOut = partitioningResult.getPartitionedElements();
+
+ int startOffset = partitioningResult.getKeyGroupStartOffsetInclusive(keyGroupId);
+ int endOffset = partitioningResult.getKeyGroupEndOffsetExclusive(keyGroupId);
+
+ // write number of mappings in key-group
+ dov.writeInt(endOffset - startOffset);
+
+ // write mappings
+ for (int i = startOffset; i < endOffset; ++i) {
+ if(groupedOut[i] == null) {
+ throw new IllegalStateException();
+ }
+ writeElement(groupedOut[i], dov);
+ groupedOut[i] = null; // free asap for GC
+ }
+ }
+
+ /**
+ * This method defines how to write a single element to the output.
+ *
+ * @param element the element to be written.
+ * @param dov the output view to write the element.
+ * @throws IOException on write-related problems.
+ */
+ protected abstract void writeElement(@Nonnull T element, @Nonnull DataOutputView dov) throws IOException;
--- End diff --
`AbstractKeyGroupPartitionedSnapshot.writeElement` looks like strategy for `writeMappingsInKeyGroup`. It could be injected in constructor or `writeMappingsInKeyGroup` as lambda `ElementWriter` interface and eliminate inheritance abstractions.
The partitioner could just output PartitionedSnapshot which would be partitioningResult + writeMappingsInKeyGroup. The result seems to have just one purpose to be written as keyed snapshot.
---
[GitHub] flink issue #6159: [FLINK-9487] Prepare InternalTimerHeap for asynchronous s...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/6159
@StefanRRichter Sorry that I didn't take a look again yesterday, I was a bit too busy at that time. I had a look at the fix for the `CopyOnWriteStateTable`, I afraid it's still a partial fix, I just create a JIRA for it and will give a PR for it very soon.
In short, the another partial problem is that we reuse the `snaphotData` as the output array when partitioning the data, but the `snapshotData` is max length is `1 << 30`. So when the records in `CopyOnWriteStateTable` is more than `1 << 30` (e.g. 1 <<30 + 1), then the check `Preconditions.checkState(partitioningDestination.length >= numberOfElements);` could be failed.
---