You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/21 14:34:35 UTC

[GitHub] [beam] mxm opened a new pull request #11478: [BEAM-9794] Reduce state cells needed for BufferingDoFnRunner

mxm opened a new pull request #11478:
URL: https://github.com/apache/beam/pull/11478


   The BufferingDoFnRunner create a new state cell for buffering data during each
   checkpoint. This led to the number of state cells to reach Short.MAX_VALUE, the
   maximum of supported Flink states.
   
   This change keeps a fixed number of state cells depending on the maximum
   configured parallelism for checkpoints. The state cells are reused after the
   checkpoint has been acknowledged.
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #11478: [BEAM-9794] Reduce state cells needed for BufferingDoFnRunner

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #11478:
URL: https://github.com/apache/beam/pull/11478#discussion_r412714540



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
##########
@@ -170,44 +201,48 @@ public void checkpointCompleted(long checkpointId) throws Exception {
     }
   }
 
-  private void addToBeAcknowledgedCheckpoint(long checkpointId, String internalId)
-      throws Exception {
+  private void addToBeAcknowledgedCheckpoint(long checkpointId, int internalId) throws Exception {
     notYetAcknowledgedSnapshots.addAll(
-        Collections.singletonList(new CheckpointElement(internalId, checkpointId)));
+        Collections.singletonList(new CheckpointIdentifier(internalId, checkpointId)));
   }
 
-  private List<CheckpointElement> removeToBeAcknowledgedCheckpoints(long checkpointId)
+  private List<CheckpointIdentifier> gatherToBeAcknowledgedCheckpoints(long checkpointId)
       throws Exception {
-    List<CheckpointElement> toBeAcknowledged = new ArrayList<>();
-    List<CheckpointElement> checkpoints = new ArrayList<>();
-    for (CheckpointElement element : notYetAcknowledgedSnapshots.get()) {
+    List<CheckpointIdentifier> toBeAcknowledged = new ArrayList<>();
+    List<CheckpointIdentifier> remaining = new ArrayList<>();
+    for (CheckpointIdentifier element : notYetAcknowledgedSnapshots.get()) {
       if (element.checkpointId <= checkpointId) {
         toBeAcknowledged.add(element);
       } else {
-        checkpoints.add(element);
+        remaining.add(element);
       }
     }
-    notYetAcknowledgedSnapshots.update(checkpoints);
+    notYetAcknowledgedSnapshots.update(remaining);
     // Sort by checkpoint id to preserve order
     toBeAcknowledged.sort(Comparator.comparingLong(o -> o.checkpointId));
     return toBeAcknowledged;
   }
 
-  private static String generateNewId() {
-    return UUID.randomUUID().toString();
+  private int rotateAndGetStateIndex() {
+    currentStateIndex = (currentStateIndex + 1) % numCheckpointBuffers;
+    return currentStateIndex;
+  }
+
+  private int getStateIndex() {
+    return currentStateIndex;
   }
 
   /** Constructs a new instance of BufferingElementsHandler with a provided state namespace. */
   private interface BufferingElementsHandlerFactory {
-    BufferingElementsHandler get(String stateId) throws Exception;
+    BufferingElementsHandler get(int stateIndex) throws Exception;
   }
 
-  private static class CheckpointElement {
+  static class CheckpointIdentifier {

Review comment:
       nit: `@VisibleForTesting`

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
##########
@@ -143,15 +174,15 @@ public void checkpoint(long checkpointId) throws Exception {
     // We are about to get checkpointed. The elements buffered thus far
     // have to be added to the global CheckpointElement state which will
     // be used to emit elements later when this checkpoint is acknowledged.
-    addToBeAcknowledgedCheckpoint(checkpointId, currentStateId);
-    currentStateId = generateNewId();
-    currentBufferingElementsHandler = bufferingElementsHandlerFactory.get(currentStateId);
+    addToBeAcknowledgedCheckpoint(checkpointId, getStateIndex());
+    int newStateIndex = rotateAndGetStateIndex();
+    currentBufferingElementsHandler = bufferingElementsHandlerFactory.get(newStateIndex);
   }
 
   /** Should be called when a checkpoint is completed. */
   public void checkpointCompleted(long checkpointId) throws Exception {

Review comment:
       just to check if I understand this correctly:
    
   - We are guaranteed that checkpoints complete in sequential order.
   - We might get gaps in case of failed / expired checkpoints, so any completed checkpoint must complete all prior checkpoints.

##########
File path: runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunnerTest.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Tests for {@link BufferingDoFnRunner}.
+ *
+ * <p>For more tests see:
+ *
+ * <p>- {@link org.apache.beam.runners.flink.FlinkRequiresStableInputTest}
+ *
+ * <p>-{@link org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperatorTest}
+ *
+ * <p>- {@link BufferedElementsTest}
+ */
+public class BufferingDoFnRunnerTest {

Review comment:
       👍




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #11478: [BEAM-9794] Reduce state cells needed for BufferingDoFnRunner

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11478:
URL: https://github.com/apache/beam/pull/11478#discussion_r412821901



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
##########
@@ -143,15 +174,15 @@ public void checkpoint(long checkpointId) throws Exception {
     // We are about to get checkpointed. The elements buffered thus far
     // have to be added to the global CheckpointElement state which will
     // be used to emit elements later when this checkpoint is acknowledged.
-    addToBeAcknowledgedCheckpoint(checkpointId, currentStateId);
-    currentStateId = generateNewId();
-    currentBufferingElementsHandler = bufferingElementsHandlerFactory.get(currentStateId);
+    addToBeAcknowledgedCheckpoint(checkpointId, getStateIndex());
+    int newStateIndex = rotateAndGetStateIndex();
+    currentBufferingElementsHandler = bufferingElementsHandlerFactory.get(newStateIndex);
   }
 
   /** Should be called when a checkpoint is completed. */
   public void checkpointCompleted(long checkpointId) throws Exception {

Review comment:
       I wonder whether we should make the flushing backwards-compatible for users who want to migrate.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #11478: [BEAM-9794] Reduce state cells needed for BufferingDoFnRunner

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11478:
URL: https://github.com/apache/beam/pull/11478#discussion_r412796641



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
##########
@@ -143,15 +174,15 @@ public void checkpoint(long checkpointId) throws Exception {
     // We are about to get checkpointed. The elements buffered thus far
     // have to be added to the global CheckpointElement state which will
     // be used to emit elements later when this checkpoint is acknowledged.
-    addToBeAcknowledgedCheckpoint(checkpointId, currentStateId);
-    currentStateId = generateNewId();
-    currentBufferingElementsHandler = bufferingElementsHandlerFactory.get(currentStateId);
+    addToBeAcknowledgedCheckpoint(checkpointId, getStateIndex());
+    int newStateIndex = rotateAndGetStateIndex();
+    currentBufferingElementsHandler = bufferingElementsHandlerFactory.get(newStateIndex);
   }
 
   /** Should be called when a checkpoint is completed. */
   public void checkpointCompleted(long checkpointId) throws Exception {

Review comment:
       It's good that you bring this up because I wanted to add a comment to explain this. Since every checkpoint runs independently we are not guaranteed to complete in sequential order. In case a later checkpoint completes before an earlier one, we can safely process the elements of the earlier one. This method is guaranteed not to overlap because Flink uses a single thread to execute async task calls lie (a) triggering a checkpoint (b) completing a checkpoint. When an earlier checkpoint completes after a later one has already completed, we will do nothing here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on a change in pull request #11478: [BEAM-9794] Reduce state cells needed for BufferingDoFnRunner

Posted by GitBox <gi...@apache.org>.
dmvk commented on a change in pull request #11478:
URL: https://github.com/apache/beam/pull/11478#discussion_r413848662



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
##########
@@ -143,15 +174,15 @@ public void checkpoint(long checkpointId) throws Exception {
     // We are about to get checkpointed. The elements buffered thus far
     // have to be added to the global CheckpointElement state which will
     // be used to emit elements later when this checkpoint is acknowledged.
-    addToBeAcknowledgedCheckpoint(checkpointId, currentStateId);
-    currentStateId = generateNewId();
-    currentBufferingElementsHandler = bufferingElementsHandlerFactory.get(currentStateId);
+    addToBeAcknowledgedCheckpoint(checkpointId, getStateIndex());
+    int newStateIndex = rotateAndGetStateIndex();
+    currentBufferingElementsHandler = bufferingElementsHandlerFactory.get(newStateIndex);
   }
 
   /** Should be called when a checkpoint is completed. */
   public void checkpointCompleted(long checkpointId) throws Exception {

Review comment:
       I don't have strong opinions about this. In this case I'd incline to bw incompatible change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dmvk commented on issue #11478: [BEAM-9794] Reduce state cells needed for BufferingDoFnRunner

Posted by GitBox <gi...@apache.org>.
dmvk commented on issue #11478:
URL: https://github.com/apache/beam/pull/11478#issuecomment-617607070


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on issue #11478: [BEAM-9794] Reduce state cells needed for BufferingDoFnRunner

Posted by GitBox <gi...@apache.org>.
mxm commented on issue #11478:
URL: https://github.com/apache/beam/pull/11478#issuecomment-617259192


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on issue #11478: [BEAM-9794] Reduce state cells needed for BufferingDoFnRunner

Posted by GitBox <gi...@apache.org>.
mxm commented on issue #11478:
URL: https://github.com/apache/beam/pull/11478#issuecomment-617650683


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org