You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/22 07:08:33 UTC

[GitHub] [flink] Zakelly opened a new pull request, #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Zakelly opened a new pull request, #21362:
URL: https://github.com/apache/flink/pull/21362

   ## What is the purpose of the change
   
   Currently the HeapStateBackend check whether the current key group index is a valid one while the RocksDBStateBackend will not.
   This PR do this check in InternalKeyContextImpl#setCurrentKeyGroupIndex, to expose the data correctness problem immediately when using RocksDB as well as heap.
   
   ## Brief change log
   
   * Add the checking logic in InternalKeyContextImpl#setCurrentKeyGroupIndex.
   * Add a test for the new logic.
   * Change some test code in CoBroadcastWithKeyedOperatorTest, since it fails after adding the check.
   
   ## Verifying this change
   
   This change added a new test class InternalKeyContextImplTest.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`:no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): **yes**
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21362:
URL: https://github.com/apache/flink/pull/21362#issuecomment-1323211848

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3465fa39bbfa6e701adbe7c153680bf2768c8954",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3465fa39bbfa6e701adbe7c153680bf2768c8954",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3465fa39bbfa6e701adbe7c153680bf2768c8954 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Zakelly commented on pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Posted by GitBox <gi...@apache.org>.
Zakelly commented on PR #21362:
URL: https://github.com/apache/flink/pull/21362#issuecomment-1327135147

   Thanks for your detailed review @rkhachatryan . I have updated the PR, would you please take another look? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Zakelly commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Posted by GitBox <gi...@apache.org>.
Zakelly commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1034413200


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java:
##########
@@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) {
 
     @Override
     public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+        if (!keyGroupRange.contains(currentKeyGroupIndex)) {
+            throw KeyGroupRangeOffsets.newIllegalKeyGroupException(
+                    currentKeyGroupIndex, keyGroupRange);
+        }

Review Comment:
   
   > How about the opposite: removing the check from HashMapStateBackend (StateTable) and relying on `InternalKeyContext`? So that it works for any backend. As a side benefit, that would be a bit faster because the check would be done once per input record, not per state access.
   
   I Agree. That's what I'm trying to say.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1033165454


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java:
##########
@@ -556,9 +559,27 @@ public void testScaleUp() throws Exception {
                                 3,
                                 2,
                                 operatorSubtaskState3)) {
-            testHarness1.processElement1(new StreamRecord<>("trigger"));
-            testHarness2.processElement1(new StreamRecord<>("trigger"));
-            testHarness3.processElement1(new StreamRecord<>("trigger"));
+
+            // Since there is a keyed operator, we should follow the key partition rules.
+            Map<TwoInputStreamOperatorTestHarness<String, Integer, String>, KeyGroupRange>
+                    keyGroupPartition = new HashMap<>();
+            keyGroupPartition.put(testHarness1, KeyGroupRange.of(0, 3));
+            keyGroupPartition.put(testHarness2, KeyGroupRange.of(4, 6));
+            keyGroupPartition.put(testHarness3, KeyGroupRange.of(7, 9));
+            while (!keyGroupPartition.isEmpty()) {
+                String triggerKey = String.valueOf(ThreadLocalRandom.current().nextLong());
+                for (Map.Entry<
+                                TwoInputStreamOperatorTestHarness<String, Integer, String>,
+                                KeyGroupRange>
+                        entry : keyGroupPartition.entrySet()) {
+                    if (entry.getValue()
+                            .contains(KeyGroupRangeAssignment.assignToKeyGroup(triggerKey, 10))) {
+                        entry.getKey().processElement1(new StreamRecord<>(triggerKey));
+                        keyGroupPartition.remove(entry.getKey());
+                        break;
+                    }
+                }
+            }

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myasuka closed pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Posted by GitBox <gi...@apache.org>.
Myasuka closed pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex
URL: https://github.com/apache/flink/pull/21362


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Zakelly commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Posted by GitBox <gi...@apache.org>.
Zakelly commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1032098044


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java:
##########
@@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) {
 
     @Override
     public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+        if (!keyGroupRange.contains(currentKeyGroupIndex)) {
+            throw KeyGroupRangeOffsets.newIllegalKeyGroupException(
+                    currentKeyGroupIndex, keyGroupRange);
+        }

Review Comment:
   I have run several tests in flink-benchmarks. The result are as follows:
   
   - With this change:
   
   ```
   Benchmark                    Mode  Cnt     Score     Error   Units
   KeyByBenchmarks.arrayKeyBy  thrpt   30  4907.599 ± 412.976  ops/ms
   KeyByBenchmarks.tupleKeyBy  thrpt   30  8157.488 ± 911.389  ops/ms
   
   Benchmark                                  (stateBackend)   Mode  Cnt     Score    Error   Units
   MemoryStateBackendBenchmark.stateBackends          MEMORY  thrpt  30  4749.925 ± 35.146  ops/ms
   MemoryStateBackendBenchmark.stateBackends              FS  thrpt   30 4815.565 ± 27.069  ops/ms
   MemoryStateBackendBenchmark.stateBackends        FS_ASYNC  thrpt  30  4741.209 ± 46.514  ops/ms
   
   Benchmark                                 (stateBackend)   Mode  Cnt    Score   Error   Units
   RocksStateBackendBenchmark.stateBackends           ROCKS  thrpt   30  289.627 ± 1.684  ops/ms
   RocksStateBackendBenchmark.stateBackends       ROCKS_INC  thrpt   30  287.924 ± 4.893  ops/ms
   
   ```
   
   - Without this change:
   
   ```
   Benchmark                    Mode  Cnt     Score     Error   Units
   KeyByBenchmarks.arrayKeyBy  thrpt   30  5004.647 ± 379.077  ops/ms
   KeyByBenchmarks.tupleKeyBy  thrpt   30  8241.331 ± 870.276  ops/ms
   
   Benchmark                                  (stateBackend)   Mode  Cnt     Score    Error   Units
   MemoryStateBackendBenchmark.stateBackends          MEMORY  thrpt   30  4822.507 ± 65.687  ops/ms
   MemoryStateBackendBenchmark.stateBackends              FS  thrpt   30  4748.527 ± 43.915  ops/ms
   MemoryStateBackendBenchmark.stateBackends        FS_ASYNC  thrpt   30  4768.075 ± 41.401  ops/ms
   
   Benchmark                                 (stateBackend)   Mode  Cnt    Score   Error   Units
   RocksStateBackendBenchmark.stateBackends           ROCKS  thrpt   30  287.613 ± 2.971  ops/ms
   RocksStateBackendBenchmark.stateBackends       ROCKS_INC  thrpt   30  288.627 ± 1.598  ops/ms
   ```
   
   As we can see, there is a slight impact (1%~2%) in pure keyby performance, but it is not obvious when involving state backends. I think it's acceptable, WDYT?
   
   And I'd rather not checking if the new KeyGroupIndex is the same as the current one, since the overhead is not significantly lower than current solution. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myasuka commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Posted by GitBox <gi...@apache.org>.
Myasuka commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1032801916


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java:
##########
@@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) {
 
     @Override
     public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+        if (!keyGroupRange.contains(currentKeyGroupIndex)) {
+            throw KeyGroupRangeOffsets.newIllegalKeyGroupException(
+                    currentKeyGroupIndex, keyGroupRange);
+        }

Review Comment:
   BTW, apart from changing the base class, we can also only change the `RocksDBKeyedStateBackend#setCurrentKey` to avoid the impact on HashMapStateBackend, as the HashMapStateBackend will always check the key group when accessing state tables.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myasuka commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Posted by GitBox <gi...@apache.org>.
Myasuka commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1034731440


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java:
##########
@@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) {
 
     @Override
     public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+        if (!keyGroupRange.contains(currentKeyGroupIndex)) {
+            throw KeyGroupRangeOffsets.newIllegalKeyGroupException(
+                    currentKeyGroupIndex, keyGroupRange);
+        }

Review Comment:
   I think Roman's suggestion is fine for developers, and for general users, as FLINK-23908 talked about, the current check would be fine during snapshotting. Maybe we can keep this implementation as current Zakelly did in this PR considering the performance impact is so small.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Zakelly commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Posted by GitBox <gi...@apache.org>.
Zakelly commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1033205960


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java:
##########
@@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) {
 
     @Override
     public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+        if (!keyGroupRange.contains(currentKeyGroupIndex)) {
+            throw KeyGroupRangeOffsets.newIllegalKeyGroupException(
+                    currentKeyGroupIndex, keyGroupRange);
+        }

Review Comment:
   > I'm afraid that we cannot avoid the check on accessing state as users might provide a non-deterministic hashCode for the current key.
   
   Hi @Myasuka , I checked the code in ```StateTable```, and it seems that in most state accessing cases (except for queryable state) we are checking the key group from ```keyContext.getCurrentKeyGroupIndex()``` instead of calculating it by ```hashCode``` of the partitioned key. So actually we are checking the same value when ```setCurrentKeyGroupIndex``` or state accessing, whether the hashCode implementation is deterministic or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myasuka commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Posted by GitBox <gi...@apache.org>.
Myasuka commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1033158877


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java:
##########
@@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) {
 
     @Override
     public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+        if (!keyGroupRange.contains(currentKeyGroupIndex)) {
+            throw KeyGroupRangeOffsets.newIllegalKeyGroupException(
+                    currentKeyGroupIndex, keyGroupRange);
+        }

Review Comment:
   I'm afraid that we cannot avoid the check on accessing state as users might provide a non-deterministic hashCode for the current key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Zakelly commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Posted by GitBox <gi...@apache.org>.
Zakelly commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1033104544


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java:
##########
@@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) {
 
     @Override
     public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+        if (!keyGroupRange.contains(currentKeyGroupIndex)) {
+            throw KeyGroupRangeOffsets.newIllegalKeyGroupException(
+                    currentKeyGroupIndex, keyGroupRange);
+        }

Review Comment:
   Actually I don't like the idea of checking if current key group is valid when accessing the state, since the problem is from setting the key group instead of accessing the state. Besides, user may set key group once and access the state several times, so for performance concern, I'd rather remove the check in each state access and only keep the check in this PR. WDYT? @Myasuka 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myasuka commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Posted by GitBox <gi...@apache.org>.
Myasuka commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1033444859


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java:
##########
@@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) {
 
     @Override
     public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+        if (!keyGroupRange.contains(currentKeyGroupIndex)) {
+            throw KeyGroupRangeOffsets.newIllegalKeyGroupException(
+                    currentKeyGroupIndex, keyGroupRange);
+        }

Review Comment:
   Maybe we can take a look at FLINK-23908, from which Flink community introduced the key group check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1030940479


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java:
##########
@@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) {
 
     @Override
     public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+        if (!keyGroupRange.contains(currentKeyGroupIndex)) {
+            throw KeyGroupRangeOffsets.newIllegalKeyGroupException(
+                    currentKeyGroupIndex, keyGroupRange);
+        }

Review Comment:
   This is executed in the hot path, i.e. per record, right?
   But since `contains()` isn't heavy, I think it's fine to optimistically rely on micro-benchmarks to detect any regression.
   Nevertheless, should we avoid this call by checking if the new KeyGroupIndex is the same as the current one?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/InternalKeyContextImplTest.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.KeyGroupRange;
+
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+/** Tests for {@link InternalKeyContextImpl}. */
+public class InternalKeyContextImplTest {
+
+    @Test
+    public void testSetKeyGroupOutOfRange() {
+        InternalKeyContextImpl<Integer> integerInternalKeyContext =
+                new InternalKeyContextImpl<>(KeyGroupRange.of(0, 128), 4096);
+        integerInternalKeyContext.setCurrentKeyGroupIndex(64);
+        try {
+            integerInternalKeyContext.setCurrentKeyGroupIndex(2048);
+            fail("Expected IllegalArgumentException");
+        } catch (IllegalArgumentException e) {
+        }

Review Comment:
   nit: I'd split this test into two and use `@Test(expected = IllegalArgumentException.class)` instead of empty `catch+fail` for readabiliy.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java:
##########
@@ -556,9 +559,27 @@ public void testScaleUp() throws Exception {
                                 3,
                                 2,
                                 operatorSubtaskState3)) {
-            testHarness1.processElement1(new StreamRecord<>("trigger"));
-            testHarness2.processElement1(new StreamRecord<>("trigger"));
-            testHarness3.processElement1(new StreamRecord<>("trigger"));
+
+            // Since there is a keyed operator, we should follow the key partition rules.
+            Map<TwoInputStreamOperatorTestHarness<String, Integer, String>, KeyGroupRange>
+                    keyGroupPartition = new HashMap<>();
+            keyGroupPartition.put(testHarness1, KeyGroupRange.of(0, 3));
+            keyGroupPartition.put(testHarness2, KeyGroupRange.of(4, 6));
+            keyGroupPartition.put(testHarness3, KeyGroupRange.of(7, 9));
+            while (!keyGroupPartition.isEmpty()) {
+                String triggerKey = String.valueOf(ThreadLocalRandom.current().nextLong());
+                for (Map.Entry<
+                                TwoInputStreamOperatorTestHarness<String, Integer, String>,
+                                KeyGroupRange>
+                        entry : keyGroupPartition.entrySet()) {
+                    if (entry.getValue()
+                            .contains(KeyGroupRangeAssignment.assignToKeyGroup(triggerKey, 10))) {
+                        entry.getKey().processElement1(new StreamRecord<>(triggerKey));
+                        keyGroupPartition.remove(entry.getKey());
+                        break;
+                    }
+                }
+            }

Review Comment:
   This part doesn't seem obvious to me; and mapping from harnesses to key ranges can break.
   How about this:
   ```
               for (TwoInputStreamOperatorTestHarness<String, Integer, String> harness :
                       Arrays.asList(testHarness1, testHarness2, testHarness3)) {
                   int subtask = harness.getEnvironment().getTaskInfo().getIndexOfThisSubtask();
                   // find the right input element for this subtask
                   int element = 0;
                   while (assignKeyToParallelOperator(Integer.toString(element), 10, 3) != subtask) {
                       element++;
                   }
                   harness.processElement1(new StreamRecord<>(Integer.toString(element)));
               }
   ```
   ?
   
   ditto: `testScaleDown` - extract a function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Zakelly commented on pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Posted by GitBox <gi...@apache.org>.
Zakelly commented on PR #21362:
URL: https://github.com/apache/flink/pull/21362#issuecomment-1324777782

    @rkhachatryan @Myasuka Would you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Zakelly commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Posted by GitBox <gi...@apache.org>.
Zakelly commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1033205960


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java:
##########
@@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) {
 
     @Override
     public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+        if (!keyGroupRange.contains(currentKeyGroupIndex)) {
+            throw KeyGroupRangeOffsets.newIllegalKeyGroupException(
+                    currentKeyGroupIndex, keyGroupRange);
+        }

Review Comment:
   > I'm afraid that we cannot avoid the check on accessing state as users might provide a non-deterministic hashCode for the current key.
   
   Hi @Myasuka , I checked the code in ```StateTable```, and it seems that in most state accessing cases (expect queryable state) we are checking the key group from ```keyContext.getCurrentKeyGroupIndex()``` instead of calculating it by ```hashCode``` of the partitioned key. So actually we are checking the same value when ```setCurrentKeyGroupIndex``` or state accessing, whether the hashCode implementation is deterministic or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1034045588


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java:
##########
@@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) {
 
     @Override
     public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+        if (!keyGroupRange.contains(currentKeyGroupIndex)) {
+            throw KeyGroupRangeOffsets.newIllegalKeyGroupException(
+                    currentKeyGroupIndex, keyGroupRange);
+        }

Review Comment:
   > BTW, apart from changing the base class, we can also only change the RocksDBKeyedStateBackend#setCurrentKey to avoid the impact on HashMapStateBackend, as the HashMapStateBackend will always check the key group when accessing state tables.
   
   How about the opposite: removing the check from HashMapStateBackend (StateTable) and relying on `InternalKeyContext`? So that it works for any backend.
   As a side benefit, that would be a bit faster because the check would be done once per input record, not per state access.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Zakelly commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Posted by GitBox <gi...@apache.org>.
Zakelly commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1032087008


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java:
##########
@@ -556,9 +559,27 @@ public void testScaleUp() throws Exception {
                                 3,
                                 2,
                                 operatorSubtaskState3)) {
-            testHarness1.processElement1(new StreamRecord<>("trigger"));
-            testHarness2.processElement1(new StreamRecord<>("trigger"));
-            testHarness3.processElement1(new StreamRecord<>("trigger"));
+
+            // Since there is a keyed operator, we should follow the key partition rules.
+            Map<TwoInputStreamOperatorTestHarness<String, Integer, String>, KeyGroupRange>
+                    keyGroupPartition = new HashMap<>();
+            keyGroupPartition.put(testHarness1, KeyGroupRange.of(0, 3));
+            keyGroupPartition.put(testHarness2, KeyGroupRange.of(4, 6));
+            keyGroupPartition.put(testHarness3, KeyGroupRange.of(7, 9));
+            while (!keyGroupPartition.isEmpty()) {
+                String triggerKey = String.valueOf(ThreadLocalRandom.current().nextLong());
+                for (Map.Entry<
+                                TwoInputStreamOperatorTestHarness<String, Integer, String>,
+                                KeyGroupRange>
+                        entry : keyGroupPartition.entrySet()) {
+                    if (entry.getValue()
+                            .contains(KeyGroupRangeAssignment.assignToKeyGroup(triggerKey, 10))) {
+                        entry.getKey().processElement1(new StreamRecord<>(triggerKey));
+                        keyGroupPartition.remove(entry.getKey());
+                        break;
+                    }
+                }
+            }

Review Comment:
   Sure, I extracted a function for finding a valid key for a subtask.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Zakelly commented on a diff in pull request #21362: [FLINK-29430] Add sanity check when setCurrentKeyGroupIndex

Posted by GitBox <gi...@apache.org>.
Zakelly commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1036946214


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java:
##########
@@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) {
 
     @Override
     public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+        if (!keyGroupRange.contains(currentKeyGroupIndex)) {
+            throw KeyGroupRangeOffsets.newIllegalKeyGroupException(
+                    currentKeyGroupIndex, keyGroupRange);
+        }

Review Comment:
   Since we agree to keep this change as it is, would you please approve and merge this PR? Or do you have any other comments? Thanks @rkhachatryan @Myasuka . 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org