You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/27 16:32:39 UTC

[GitHub] [flink-statefun] igalshilman opened a new pull request #168: [FLINK-19692] Write all assigned key groups into the keyed raw stream with a Header.

igalshilman opened a new pull request #168:
URL: https://github.com/apache/flink-statefun/pull/168


   This PR allows creating and restoring from savepoints that contain elements in the feedback log, taken as of this PR and moving forward. 
   Restoring from savepoints taken prior to this PR (with elements in the feedback log) requires a followup work.
   
   This PR changes the on the wire checkpoint format for the feedback log.
   For each key group written to the raw keyed stream (empty or not), we now write the following header:  `<statefun version: int><statefun magic header: int>`
   
   We start with the version 0 and a magic header that is a constant random number. Writing the version and the magic header are made for safety and format evolution.
   
   The version number `0` is carefully chosen as a workaround to FLINK-19692. This is because the InternalTimerManager reads that first 0 (that we wrote as a version) and it will not try to read anything else from the stream.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] tzulitai commented on pull request #168: [FLINK-19692] Write all assigned key groups into the keyed raw stream with a Header.

Posted by GitBox <gi...@apache.org>.
tzulitai commented on pull request #168:
URL: https://github.com/apache/flink-statefun/pull/168#issuecomment-717892546


   Thanks for addressing my comments @igalshilman!
   I'll proceed to merge this now to `master` and `release-2.2`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] tzulitai commented on a change in pull request #168: [FLINK-19692] Write all assigned key groups into the keyed raw stream with a Header.

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #168:
URL: https://github.com/apache/flink-statefun/pull/168#discussion_r513184588



##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
##########
@@ -99,11 +98,19 @@ private void flushToKeyedStateOutputStream() throws IOException {
     checkState(keyedStateOutputStream != null, "Trying to flush envelopes not in a logging state");
 
     final DataOutputView target = new DataOutputViewStreamWrapper(keyedStateOutputStream);
-    for (Entry<Integer, KeyGroupStream<T>> entry : keyGroupStreams.entrySet()) {
-      checkpointedStreamOperations.startNewKeyGroup(keyedStateOutputStream, entry.getKey());
+    final Iterable<Integer> assignedKeyGroupIds =
+        checkpointedStreamOperations.keyGroupList(keyedStateOutputStream);
+    // the underlying checkpointed raw stream, requires that all key groups assigned
+    // to this operator must be written to the underlying stream.

Review comment:
       nit: I'm wondering if it makes sense to add a TODO here to help remind us in the future that after FLINK-19748 (allow skipping key groups) is merged, we may choose to revert writing empty key groups?

##########
File path: statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
##########
@@ -79,6 +81,21 @@ public void roundTripWithSpill() throws Exception {
     roundTrip(1_000_000, 0);
   }
 
+  @Test
+  public void testHeader() throws IOException {

Review comment:
       As I understand it, this test verifies the header serde round trip, in the case that the header was written.
   
   As a counterpart, could you add a test that verifies `Header.skipHeaderSilently` is effectively a no-op if the header was missing in the input stream?
   
   i.e.,
   another variant of this test where the line `UnboundedFeedbackLogger.Header.writeHeader(out);` is removed should be passing as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] tzulitai commented on a change in pull request #168: [FLINK-19692] Write all assigned key groups into the keyed raw stream with a Header.

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #168:
URL: https://github.com/apache/flink-statefun/pull/168#discussion_r513379873



##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
##########
@@ -99,11 +98,19 @@ private void flushToKeyedStateOutputStream() throws IOException {
     checkState(keyedStateOutputStream != null, "Trying to flush envelopes not in a logging state");
 
     final DataOutputView target = new DataOutputViewStreamWrapper(keyedStateOutputStream);
-    for (Entry<Integer, KeyGroupStream<T>> entry : keyGroupStreams.entrySet()) {
-      checkpointedStreamOperations.startNewKeyGroup(keyedStateOutputStream, entry.getKey());
+    final Iterable<Integer> assignedKeyGroupIds =
+        checkpointedStreamOperations.keyGroupList(keyedStateOutputStream);
+    // the underlying checkpointed raw stream, requires that all key groups assigned
+    // to this operator must be written to the underlying stream.

Review comment:
       I don’t have a strong opinion on whether or not the empty key groups should stay there in the long term, so fine by me to keep this as is without the TODO comment to revisit πŸ‘ 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] tzulitai closed pull request #168: [FLINK-19692] Write all assigned key groups into the keyed raw stream with a Header.

Posted by GitBox <gi...@apache.org>.
tzulitai closed pull request #168:
URL: https://github.com/apache/flink-statefun/pull/168


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] igalshilman commented on a change in pull request #168: [FLINK-19692] Write all assigned key groups into the keyed raw stream with a Header.

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #168:
URL: https://github.com/apache/flink-statefun/pull/168#discussion_r513269330



##########
File path: statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
##########
@@ -79,6 +81,21 @@ public void roundTripWithSpill() throws Exception {
     roundTrip(1_000_000, 0);
   }
 
+  @Test
+  public void testHeader() throws IOException {

Review comment:
       Yes definitely. Would add that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] igalshilman commented on a change in pull request #168: [FLINK-19692] Write all assigned key groups into the keyed raw stream with a Header.

Posted by GitBox <gi...@apache.org>.
igalshilman commented on a change in pull request #168:
URL: https://github.com/apache/flink-statefun/pull/168#discussion_r513369798



##########
File path: statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java
##########
@@ -99,11 +98,19 @@ private void flushToKeyedStateOutputStream() throws IOException {
     checkState(keyedStateOutputStream != null, "Trying to flush envelopes not in a logging state");
 
     final DataOutputView target = new DataOutputViewStreamWrapper(keyedStateOutputStream);
-    for (Entry<Integer, KeyGroupStream<T>> entry : keyGroupStreams.entrySet()) {
-      checkpointedStreamOperations.startNewKeyGroup(keyedStateOutputStream, entry.getKey());
+    final Iterable<Integer> assignedKeyGroupIds =
+        checkpointedStreamOperations.keyGroupList(keyedStateOutputStream);
+    // the underlying checkpointed raw stream, requires that all key groups assigned
+    // to this operator must be written to the underlying stream.

Review comment:
       I think that it would be better to write down the empty groups anyways, because they are presented to us on restore.
   And having a header there, would help us to differentiate between different versions.
   But I can definitely add a TODO: revist this after 19748 is merged. 
   would that work for you?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] tzulitai commented on pull request #168: [FLINK-19692] Write all assigned key groups into the keyed raw stream with a Header.

Posted by GitBox <gi...@apache.org>.
tzulitai commented on pull request #168:
URL: https://github.com/apache/flink-statefun/pull/168#issuecomment-717889642


   Thanks for addressing my comments @igalshilman!
   I'll merge the PR now πŸ‘ 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-statefun] tzulitai removed a comment on pull request #168: [FLINK-19692] Write all assigned key groups into the keyed raw stream with a Header.

Posted by GitBox <gi...@apache.org>.
tzulitai removed a comment on pull request #168:
URL: https://github.com/apache/flink-statefun/pull/168#issuecomment-717889642


   Thanks for addressing my comments @igalshilman!
   I'll merge the PR now πŸ‘ 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org