You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/14 19:45:02 UTC

[GitHub] [beam] mxm opened a new pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

mxm opened a new pull request #13116:
URL: https://github.com/apache/beam/pull/13116


   We should make it easier to configure a Flink state backend. At the moment,
   users have to either:
   
   (A)  Configure the default state backend in their Flink cluster
   
   (B1) Include the dependency in their Gradle/Maven
        project (e.g. "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version"
        for RocksDB).
   (B2) Set the state backend factory in the FlinkPipelineOptions. This only works
        in Java due to the factory specification being in Java!
   
   We can make it easier by simple adding pipeline options for the state backend
   name and the checkpoint directory which will be enough for configuring the state
   backend. We bundle the RocksDB state backend by default.
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #13116:
URL: https://github.com/apache/beam/pull/13116#issuecomment-725031201


   I think this has broken Java PreCommit (https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/3471/), probably just a missing import
   
   ```
   10:18:56 > Task :runners:flink:1.10:compileTestJava
   10:18:56 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Cron/src/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java:474: error: cannot find symbol
   10:18:56     FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
   10:18:56                                    ^
   10:18:56   symbol:   variable PipelineOptionsFactory
   10:18:56   location: class FlinkExecutionEnvironmentsTest
   10:18:56 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Cron/src/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java:489: error: cannot find symbol
   10:18:56     FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
   10:18:56                                    ^
   10:18:56   symbol:   variable PipelineOptionsFactory
   10:18:56   location: class FlinkExecutionEnvironmentsTest
   10:18:56 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Cron/src/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java:503: error: cannot find symbol
   10:18:56     FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
   10:18:56                                    ^
   10:18:56   symbol:   variable PipelineOptionsFactory
   10:18:56   location: class FlinkExecutionEnvironmentsTest
   10:18:56 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Cron/src/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java:517: error: cannot find symbol
   10:18:56     FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
   10:18:56                                    ^
   10:18:56   symbol:   variable PipelineOptionsFactory
   10:18:56   location: class FlinkExecutionEnvironmentsTest
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #13116:
URL: https://github.com/apache/beam/pull/13116#issuecomment-710080530


   Unrelated flaky tests in Java PreCommit.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iemejia commented on a change in pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
iemejia commented on a change in pull request #13116:
URL: https://github.com/apache/beam/pull/13116#discussion_r509318085



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
##########
@@ -259,28 +278,44 @@ static StreamExecutionEnvironment createStreamExecutionEnvironment(
         options.setShutdownSourcesAfterIdleMs(0L);
       }
     }
+  }
 
-    applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
-
-    if (options.getAutoWatermarkInterval() != null) {
-      flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
-    }
-
-    // State backend
-    if (options.getStateBackendFactory() != null) {
+  private static void configureStateBackend(
+      FlinkPipelineOptions options, StreamExecutionEnvironment env) {
+    if (options.getStateBackend() != null) {
+      final String storagePath = options.getStateBackendStoragePath();
+      Preconditions.checkArgument(
+          storagePath != null,
+          "State backend was set to '%s' but no storage path was provided.",
+          options.getStateBackend());
+
+      final StateBackend stateBackend;
+      if (options.getStateBackend().equalsIgnoreCase("rocksdb")) {
+        try {
+          stateBackend = new RocksDBStateBackend(storagePath);
+        } catch (IOException e) {
+          throw new RuntimeException("Could not create RocksDB state backend.", e);
+        }
+      } else if (options.getStateBackend().equalsIgnoreCase("filesystem")) {
+        stateBackend = new FsStateBackend(storagePath);
+      } else {
+        throw new IllegalArgumentException(
+            String.format(
+                "Unknown state backend '%s'. Use 'rocksdb' or 'filesystem' or configure via Flink config file.",
+                options.getStateBackend()));
+      }
+      env.setStateBackend(stateBackend);
+    } else if (options.getStateBackendFactory() != null) {
+      // Legacy way of setting the state backend
       final StateBackend stateBackend =
           InstanceBuilder.ofType(FlinkStateBackendFactory.class)
               .fromClass(options.getStateBackendFactory())
               .build()
               .createStateBackend(options);
-      flinkStreamEnv.setStateBackend(stateBackend);
+      env.setStateBackend(stateBackend);

Review comment:
       We can do this `env.setStateBackend` out of the if/else block.

##########
File path: runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
##########
@@ -464,6 +467,63 @@ public void shouldSetSavepointRestoreForRemoteStreaming() {
     assertThat(getSavepointPath(sev), is(path));
   }
 
+  @Test
+  public void shouldFailOnUnknownStateBackend() {
+    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setStreaming(true);
+    options.setStateBackend("unknown");
+    options.setStateBackendStoragePath("/path");
+
+    assertThrows(
+        "State backend was set to 'unkown' but no storage path was provided.",
+        IllegalArgumentException.class,
+        () ->
+            FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+                options, Collections.emptyList()));
+  }
+
+  @Test
+  public void shouldFailOnNoStoragePathProvided() {
+    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setStreaming(true);
+    options.setStateBackend("unknown");
+
+    assertThrows(
+        "State backend was set to 'unkown' but no storage path was provided.",

Review comment:
       s/unkown/unknown

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
##########
@@ -259,28 +278,44 @@ static StreamExecutionEnvironment createStreamExecutionEnvironment(
         options.setShutdownSourcesAfterIdleMs(0L);
       }
     }
+  }
 
-    applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
-
-    if (options.getAutoWatermarkInterval() != null) {
-      flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
-    }
-
-    // State backend
-    if (options.getStateBackendFactory() != null) {
+  private static void configureStateBackend(

Review comment:
       Is the intention of supporting this to be able to configure new Backends too? I mean like the new one by the RISE team? Is this the intended Nexmark use?

##########
File path: runners/flink/flink_runner.gradle
##########
@@ -148,6 +148,8 @@ dependencies {
   compile "org.apache.flink:flink-java:$flink_version"
   compile "org.apache.flink:flink-runtime_2.11:$flink_version"
   compile "org.apache.flink:flink-streaming-java_2.11:$flink_version"
+  // RocksDB state backend
+  compile "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version"

Review comment:
       I checked and the dependency does not seem to be part of the default Flink distribution (quite surprising for me btw). However the way of instantiation makes the dependency needed, making it provided will solve it but it is a bit less user-friendly.

##########
File path: runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkExecutionEnvironmentsTest.java
##########
@@ -464,6 +467,63 @@ public void shouldSetSavepointRestoreForRemoteStreaming() {
     assertThat(getSavepointPath(sev), is(path));
   }
 
+  @Test
+  public void shouldFailOnUnknownStateBackend() {
+    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setStreaming(true);
+    options.setStateBackend("unknown");
+    options.setStateBackendStoragePath("/path");
+
+    assertThrows(
+        "State backend was set to 'unkown' but no storage path was provided.",

Review comment:
       s/unkown/unknown

##########
File path: website/www/site/layouts/shortcodes/flink_java_pipeline_options.html
##########
@@ -157,9 +157,19 @@
   <td>Shuts down sources which have been idle for the configured time of milliseconds. Once a source has been shut down, checkpointing is not possible anymore. Shutting down the sources eventually leads to pipeline shutdown (=Flink job finishes) once all input has been processed. Unless explicitly set, this will default to Long.MAX_VALUE when checkpointing is enabled and to 0 when checkpointing is disabled. See https://issues.apache.org/jira/browse/FLINK-2491 for progress on this issue.</td>
   <td>Default: <code>-1</code></td>
 </tr>
+<tr>
+  <td><code>stateBackend</code></td>
+  <td>State backend to store Beam's state. Use RocksDB or Filesystem. Defaults to heap.</td>

Review comment:
       ignorable nit: I will probably set the explicit values there as examples 'rocksdb' or 'filesystem' (or the classnames if we decide to change this).

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
##########
@@ -259,28 +278,44 @@ static StreamExecutionEnvironment createStreamExecutionEnvironment(
         options.setShutdownSourcesAfterIdleMs(0L);
       }
     }
+  }
 
-    applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
-
-    if (options.getAutoWatermarkInterval() != null) {
-      flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
-    }
-
-    // State backend
-    if (options.getStateBackendFactory() != null) {
+  private static void configureStateBackend(
+      FlinkPipelineOptions options, StreamExecutionEnvironment env) {
+    if (options.getStateBackend() != null) {
+      final String storagePath = options.getStateBackendStoragePath();
+      Preconditions.checkArgument(
+          storagePath != null,
+          "State backend was set to '%s' but no storage path was provided.",
+          options.getStateBackend());
+
+      final StateBackend stateBackend;
+      if (options.getStateBackend().equalsIgnoreCase("rocksdb")) {

Review comment:
       Should not this be the classname of the backend? I am assuming that what we want to achieve here is to allow new backends (not available in Flink) to be configured too (see comment above)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13116:
URL: https://github.com/apache/beam/pull/13116#discussion_r511985665



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
##########
@@ -259,28 +278,44 @@ static StreamExecutionEnvironment createStreamExecutionEnvironment(
         options.setShutdownSourcesAfterIdleMs(0L);
       }
     }
+  }
 
-    applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
-
-    if (options.getAutoWatermarkInterval() != null) {
-      flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
-    }
-
-    // State backend
-    if (options.getStateBackendFactory() != null) {
+  private static void configureStateBackend(

Review comment:
       Yes, new state backends can be added as needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #13116:
URL: https://github.com/apache/beam/pull/13116#issuecomment-709952169


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13116:
URL: https://github.com/apache/beam/pull/13116#discussion_r511985337



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
##########
@@ -259,28 +278,44 @@ static StreamExecutionEnvironment createStreamExecutionEnvironment(
         options.setShutdownSourcesAfterIdleMs(0L);
       }
     }
+  }
 
-    applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
-
-    if (options.getAutoWatermarkInterval() != null) {
-      flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
-    }
-
-    // State backend
-    if (options.getStateBackendFactory() != null) {
+  private static void configureStateBackend(
+      FlinkPipelineOptions options, StreamExecutionEnvironment env) {
+    if (options.getStateBackend() != null) {
+      final String storagePath = options.getStateBackendStoragePath();
+      Preconditions.checkArgument(
+          storagePath != null,
+          "State backend was set to '%s' but no storage path was provided.",
+          options.getStateBackend());
+
+      final StateBackend stateBackend;
+      if (options.getStateBackend().equalsIgnoreCase("rocksdb")) {

Review comment:
       There is no factory method available for state backends, they all have different constructors. We can't use the class name.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tweise commented on a change in pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #13116:
URL: https://github.com/apache/beam/pull/13116#discussion_r506988274



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
##########
@@ -259,28 +278,44 @@ static StreamExecutionEnvironment createStreamExecutionEnvironment(
         options.setShutdownSourcesAfterIdleMs(0L);
       }
     }
+  }
 
-    applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
-
-    if (options.getAutoWatermarkInterval() != null) {
-      flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
-    }
-
-    // State backend
-    if (options.getStateBackendFactory() != null) {
+  private static void configureStateBackend(

Review comment:
       Flink is headed in the direction where everything that is set on an environment becomes configurable (including the executor, FLIP-73). This change kind of goes in the opposite direction, increasing the amount of Flink pipeline options further. Should we look into the generic configuration mechanism instead, where it is really easy for the user to supply the Flink configuration (optionally inline, instead of via a separate file)?

##########
File path: runners/flink/flink_runner.gradle
##########
@@ -148,6 +148,8 @@ dependencies {
   compile "org.apache.flink:flink-java:$flink_version"
   compile "org.apache.flink:flink-runtime_2.11:$flink_version"
   compile "org.apache.flink:flink-streaming-java_2.11:$flink_version"
+  // RocksDB state backend
+  compile "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version"

Review comment:
       Just wanted to confirm that dependency won't be baked into the job server as it is already part of the Flink dist.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13116:
URL: https://github.com/apache/beam/pull/13116#discussion_r507675006



##########
File path: runners/flink/flink_runner.gradle
##########
@@ -148,6 +148,8 @@ dependencies {
   compile "org.apache.flink:flink-java:$flink_version"
   compile "org.apache.flink:flink-runtime_2.11:$flink_version"
   compile "org.apache.flink:flink-streaming-java_2.11:$flink_version"
+  // RocksDB state backend
+  compile "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version"

Review comment:
       Is that true? AFAIK RocksDB is an optional dependency. We can change the scope to `provided` and add a separate dependency with `testCompile`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13116:
URL: https://github.com/apache/beam/pull/13116#discussion_r519372310



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
##########
@@ -259,28 +278,44 @@ static StreamExecutionEnvironment createStreamExecutionEnvironment(
         options.setShutdownSourcesAfterIdleMs(0L);
       }
     }
+  }
 
-    applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
-
-    if (options.getAutoWatermarkInterval() != null) {
-      flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
-    }
-
-    // State backend
-    if (options.getStateBackendFactory() != null) {
+  private static void configureStateBackend(
+      FlinkPipelineOptions options, StreamExecutionEnvironment env) {
+    if (options.getStateBackend() != null) {
+      final String storagePath = options.getStateBackendStoragePath();
+      Preconditions.checkArgument(
+          storagePath != null,
+          "State backend was set to '%s' but no storage path was provided.",
+          options.getStateBackend());
+
+      final StateBackend stateBackend;
+      if (options.getStateBackend().equalsIgnoreCase("rocksdb")) {

Review comment:
       I've reduced this to one call by adding a null check.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tweise edited a comment on pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
tweise edited a comment on pull request #13116:
URL: https://github.com/apache/beam/pull/13116#issuecomment-716984114


   > I suppose this change is required to specify [the backend per job only](https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#setting-the-per-job-state-backend) and there seems not to be another way to do it in current Flink runner. It is curious I expected users to be able to do Flink specific runtime configuration independently of Beam's, isn't there a way to do this?
   
   I'm not sure that is the case, have you tried it? Can the state backend not be configured as part of `flinkConfiguration` (https://github.com/apache/beam/blob/5675108933de6eb601ca2e4f21870d2ababe0ec7/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L155)?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13116:
URL: https://github.com/apache/beam/pull/13116#discussion_r519345289



##########
File path: runners/flink/flink_runner.gradle
##########
@@ -148,6 +148,8 @@ dependencies {
   compile "org.apache.flink:flink-java:$flink_version"
   compile "org.apache.flink:flink-runtime_2.11:$flink_version"
   compile "org.apache.flink:flink-streaming-java_2.11:$flink_version"
+  // RocksDB state backend
+  compile "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version"

Review comment:
       RocksDB used to be not included in the dist. Good point, we can set it to provided then. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13116:
URL: https://github.com/apache/beam/pull/13116#discussion_r507677298



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
##########
@@ -259,28 +278,44 @@ static StreamExecutionEnvironment createStreamExecutionEnvironment(
         options.setShutdownSourcesAfterIdleMs(0L);
       }
     }
+  }
 
-    applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
-
-    if (options.getAutoWatermarkInterval() != null) {
-      flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
-    }
-
-    // State backend
-    if (options.getStateBackendFactory() != null) {
+  private static void configureStateBackend(

Review comment:
       I think that's a great idea. We can start thinking about that on the mailing list and handle this via separate JIRA issue. This pipeline option can then be replaced by the generic configuration option.
   
   For now, this pipeline option will fulfill a common request by Beam users to directly set the state backend without having to change the Flink configuration.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13116:
URL: https://github.com/apache/beam/pull/13116#discussion_r519345557



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
##########
@@ -259,28 +278,44 @@ static StreamExecutionEnvironment createStreamExecutionEnvironment(
         options.setShutdownSourcesAfterIdleMs(0L);
       }
     }
+  }
 
-    applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
-
-    if (options.getAutoWatermarkInterval() != null) {
-      flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
-    }
-
-    // State backend
-    if (options.getStateBackendFactory() != null) {
+  private static void configureStateBackend(

Review comment:
       We may want to experiment with different state backends.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13116:
URL: https://github.com/apache/beam/pull/13116#discussion_r519346341



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
##########
@@ -259,28 +278,44 @@ static StreamExecutionEnvironment createStreamExecutionEnvironment(
         options.setShutdownSourcesAfterIdleMs(0L);
       }
     }
+  }
 
-    applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
-
-    if (options.getAutoWatermarkInterval() != null) {
-      flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
-    }
-
-    // State backend
-    if (options.getStateBackendFactory() != null) {
+  private static void configureStateBackend(
+      FlinkPipelineOptions options, StreamExecutionEnvironment env) {
+    if (options.getStateBackend() != null) {
+      final String storagePath = options.getStateBackendStoragePath();
+      Preconditions.checkArgument(
+          storagePath != null,
+          "State backend was set to '%s' but no storage path was provided.",
+          options.getStateBackend());
+
+      final StateBackend stateBackend;
+      if (options.getStateBackend().equalsIgnoreCase("rocksdb")) {
+        try {
+          stateBackend = new RocksDBStateBackend(storagePath);
+        } catch (IOException e) {
+          throw new RuntimeException("Could not create RocksDB state backend.", e);
+        }
+      } else if (options.getStateBackend().equalsIgnoreCase("filesystem")) {
+        stateBackend = new FsStateBackend(storagePath);
+      } else {
+        throw new IllegalArgumentException(
+            String.format(
+                "Unknown state backend '%s'. Use 'rocksdb' or 'filesystem' or configure via Flink config file.",
+                options.getStateBackend()));
+      }
+      env.setStateBackend(stateBackend);
+    } else if (options.getStateBackendFactory() != null) {
+      // Legacy way of setting the state backend
       final StateBackend stateBackend =
           InstanceBuilder.ofType(FlinkStateBackendFactory.class)
               .fromClass(options.getStateBackendFactory())
               .build()
               .createStateBackend(options);
-      flinkStreamEnv.setStateBackend(stateBackend);
+      env.setStateBackend(stateBackend);

Review comment:
       Actually we can't because the if blocks are non-exhaustive. If nothing is configured, we don't want to call `setStateBackend´.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tweise commented on a change in pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #13116:
URL: https://github.com/apache/beam/pull/13116#discussion_r512415961



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
##########
@@ -259,28 +278,44 @@ static StreamExecutionEnvironment createStreamExecutionEnvironment(
         options.setShutdownSourcesAfterIdleMs(0L);
       }
     }
+  }
 
-    applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
-
-    if (options.getAutoWatermarkInterval() != null) {
-      flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
-    }
-
-    // State backend
-    if (options.getStateBackendFactory() != null) {
+  private static void configureStateBackend(

Review comment:
       @iemejia why does Nexmark need the `rocksdb` state backend? That backend is required when state does not fit into the heap, otherwise it is almost always better to use `filesystem`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tweise edited a comment on pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
tweise edited a comment on pull request #13116:
URL: https://github.com/apache/beam/pull/13116#issuecomment-716984114


   > I suppose this change is required to specify [the backend per job only](https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#setting-the-per-job-state-backend) and there seems not to be another way to do it in current Flink runner. It is curious I expected users to be able to do Flink specific runtime configuration independently of Beam's, isn't there a way to do this?
   
   I'm not sure that is the case, have you tried it? Can the state backend not configured as part of `flinkConfiguration` (https://github.com/apache/beam/blob/5675108933de6eb601ca2e4f21870d2ababe0ec7/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L155)?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13116:
URL: https://github.com/apache/beam/pull/13116#discussion_r507677961



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
##########
@@ -259,28 +278,44 @@ static StreamExecutionEnvironment createStreamExecutionEnvironment(
         options.setShutdownSourcesAfterIdleMs(0L);
       }
     }
+  }
 
-    applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
-
-    if (options.getAutoWatermarkInterval() != null) {
-      flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval());
-    }
-
-    // State backend
-    if (options.getStateBackendFactory() != null) {
+  private static void configureStateBackend(

Review comment:
       Plus, we want to be able to easily run Nexmark with RocksDB.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tweise commented on pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
tweise commented on pull request #13116:
URL: https://github.com/apache/beam/pull/13116#issuecomment-716984114


   > I suppose this change is required to specify [the backend per job only](https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#setting-the-per-job-state-backend) and there seems not to be another way to do it in current Flink runner. It is curious I expected users to be able to do Flink specific runtime configuration independently of Beam's, isn't there a way to do this?
   
   I'm not sure that is the case, has any tried it? Can the state backend not configured as part of `flinkConfiguration` (https://github.com/apache/beam/blob/5675108933de6eb601ca2e4f21870d2ababe0ec7/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L155)?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #13116:
URL: https://github.com/apache/beam/pull/13116#issuecomment-725330079


   Thanks Brian. Yes, this caused a merge conflict which was not visible before merge. Should have rebased since this PR was open for a bit. Looks like https://github.com/apache/beam/pull/13297 fixed it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tweise commented on a change in pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
tweise commented on a change in pull request #13116:
URL: https://github.com/apache/beam/pull/13116#discussion_r512415517



##########
File path: runners/flink/flink_runner.gradle
##########
@@ -148,6 +148,8 @@ dependencies {
   compile "org.apache.flink:flink-java:$flink_version"
   compile "org.apache.flink:flink-runtime_2.11:$flink_version"
   compile "org.apache.flink:flink-streaming-java_2.11:$flink_version"
+  // RocksDB state backend
+  compile "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version"

Review comment:
       The dependency is part of the flink distribution:
   ```
   $ tar -tf flink-dist_2.12-1.11.2.jar | grep RocksDBStateBackend
   org/apache/flink/contrib/streaming/state/RocksDBStateBackend.class
   org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.class
   org/apache/flink/contrib/streaming/state/RocksDBStateBackend$PriorityQueueStateType.class
   ```
   If it wasn't, you could not configure the default state backend:
   https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#setting-default-state-backend




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm merged pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
mxm merged pull request #13116:
URL: https://github.com/apache/beam/pull/13116


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #13116:
URL: https://github.com/apache/beam/pull/13116#issuecomment-723559227


   There is currently no universal way to set the configuration per-job. Unless a per-job cluster is used. In this case a configuration file can be supplied for the per-job cluster. There is no way to explicitly configure a state backend via a flag to the main program. Even if we allowed for a configuration file parameter, we would rely on that configuration file to be present during runtime which we can't assume. Further, if we allowed a direct configuration YAML string, we would also have to explicitly set the state backend on the `StreamExecutionEnvironment`, otherwise during runtime the cluster's default state backend would be used instead of the configured one (the default state backend can only be configured during job creation time). The intention here is to allow setting and overriding the default state backend. Thus, it makes sense (for now) to have an explicit way to configure the state backend.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #13116: [BEAM-9855] Provide an option to configure the Flink state backend

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #13116:
URL: https://github.com/apache/beam/pull/13116#discussion_r520609556



##########
File path: runners/flink/flink_runner.gradle
##########
@@ -148,6 +148,8 @@ dependencies {
   compile "org.apache.flink:flink-java:$flink_version"
   compile "org.apache.flink:flink-runtime_2.11:$flink_version"
   compile "org.apache.flink:flink-streaming-java_2.11:$flink_version"
+  // RocksDB state backend
+  compile "org.apache.flink:flink-statebackend-rocksdb_2.11:$flink_version"

Review comment:
       I've verified that RocksDB is not included in any of the to-be-released jars.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org