You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "zjffdu (via GitHub)" <gi...@apache.org> on 2023/04/10 14:13:38 UTC

[GitHub] [beam] zjffdu opened a new pull request, #26193: [Feature Request]: Support detached mode for flink runner (#26158)

zjffdu opened a new pull request, #26193:
URL: https://github.com/apache/beam/pull/26193

   **Please** add a meaningful description for your change here
   
   This PR adds the support of detached mode for flink runner, so that `pipeline.run()` become non-blocking mode in detached mode, user can call `PipelineResult#waitUntilFinish` to wait for the pipeline to finish. 2 main changes in this PR
   
   * Add 2 options in FlinkOptions: AttachedMode (default is true to make it compatible with previous behavior), JobCheckIntervalInSecs (used for check job status in detached mode, default is  5 seconds)
   * Update `FlinkDetachedRunnerResult.java` to make it work in detached mode.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1640143410

   waiting on author


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #26193:
URL: https://github.com/apache/beam/pull/26193#discussion_r1164825595


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java:
##########
@@ -142,6 +142,20 @@
 
   void setNumberOfExecutionRetries(Integer retries);
 
+  @Description(
+      "Set job check interval in seconds under detached mode in method waitUntilFinish, "
+          + "by default it is 5 seconds")
+  @Default.Integer(5)
+  int getJobCheckIntervalInSecs();
+
+  void setJobCheckIntervalInSecs(int seconds);
+
+  @Description("Set the attached mode")
+  @Default.Boolean(true)
+  boolean getAttachedMode();

Review Comment:
   There is a setBlockOnRun option for DirectRunner: https://github.com/apache/beam/blob/4ffeae4d2b800f2df36d2ea2eab549f2204d5691/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java#L36
   
   could use the same naming



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1646141509

   Hi @zjffdu just a kindly reminder for the minor comments, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1646569638

   Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] zjffdu commented on a diff in pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "zjffdu (via GitHub)" <gi...@apache.org>.
zjffdu commented on code in PR #26193:
URL: https://github.com/apache/beam/pull/26193#discussion_r1166136917


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java:
##########
@@ -142,6 +142,20 @@
 
   void setNumberOfExecutionRetries(Integer retries);
 
+  @Description(
+      "Set job check interval in seconds under detached mode in method waitUntilFinish, "
+          + "by default it is 5 seconds")
+  @Default.Integer(5)
+  int getJobCheckIntervalInSecs();
+
+  void setJobCheckIntervalInSecs(int seconds);
+
+  @Description("Set the attached mode")
+  @Default.Boolean(true)
+  boolean getAttachedMode();

Review Comment:
   Thanks for the comment @Abacn , I would agree to use the same naming if it is a common option in `PipelineOptions`, but IMHO it would be better to use flink-specific terminology in `FlinkPipelineOptions`. As it would be easy for flink users to understand `attachedMode` compare to `blockOnRun` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kkdoon commented on a diff in pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "kkdoon (via GitHub)" <gi...@apache.org>.
kkdoon commented on code in PR #26193:
URL: https://github.com/apache/beam/pull/26193#discussion_r1253904078


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -18,41 +18,119 @@
 package org.apache.beam.runners.flink;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
 import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Result of a detached execution of a {@link org.apache.beam.sdk.Pipeline} with Flink. In detached
  * execution, results and job execution are currently unavailable.
  */
 public class FlinkDetachedRunnerResult implements PipelineResult {
 
-  FlinkDetachedRunnerResult() {}
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkDetachedRunnerResult.class);
+  private static final int TEN_YEAR_DAYS = 365 * 10;
+
+  private JobClient jobClient;
+  private int jobCheckIntervalInSecs;
+
+  FlinkDetachedRunnerResult(JobClient jobClient, int jobCheckIntervalInSecs) {
+    this.jobClient = jobClient;
+    this.jobCheckIntervalInSecs = jobCheckIntervalInSecs;
+  }
 
   @Override
   public State getState() {
-    return State.UNKNOWN;
+    try {
+      return toBeamJobState(jobClient.getJobStatus().get());
+    } catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException("Fail to get flink job state", e);
+    }
   }
 
   @Override
   public MetricResults metrics() {
-    throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
+    return MetricsContainerStepMap.asAttemptedOnlyMetricResults(getMetricsContainerStepMap());
+  }
+
+  private MetricsContainerStepMap getMetricsContainerStepMap() {
+    try {
+      return (MetricsContainerStepMap)
+          jobClient
+              .getAccumulators()
+              .get()
+              .getOrDefault(FlinkMetricContainer.ACCUMULATOR_NAME, new MetricsContainerStepMap());
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.warn("Fail to get flink job accumulators", e);
+      return new MetricsContainerStepMap();
+    }
   }
 
   @Override
   public State cancel() throws IOException {
-    throw new UnsupportedOperationException("Cancelling is not yet supported.");
+    try {
+      this.jobClient.cancel().get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException("Fail to cancel flink job", e);
+    }
+    return getState();
   }
 
   @Override
   public State waitUntilFinish() {
-    return State.UNKNOWN;
+    return waitUntilFinish(Duration.standardDays(TEN_YEAR_DAYS));
   }
 
   @Override
   public State waitUntilFinish(Duration duration) {
-    return State.UNKNOWN;
+    long start = System.currentTimeMillis();
+    long durationInMillis = duration.getMillis();
+    State state = State.UNKNOWN;
+    while ((System.currentTimeMillis() - start) < durationInMillis) {
+      state = getState();
+      if (state.isTerminal()) {
+        return state;
+      }
+      try {
+        Thread.sleep(jobCheckIntervalInSecs * 1000);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    if (state != null && !state.isTerminal()) {
+      LOG.warn("Job is not finished in {} seconds", duration.getStandardSeconds());
+    }
+    return state;
+  }
+
+  private State toBeamJobState(JobStatus flinkJobStatus) {
+    switch (flinkJobStatus) {
+      case CANCELLING:
+      case CREATED:
+      case INITIALIZING:
+      case FAILING:
+      case RECONCILING:
+      case RESTARTING:
+      case RUNNING:
+        return State.RUNNING;
+      case FINISHED:
+        return State.DONE;
+      case CANCELED:
+        return State.CANCELLED;
+      case FAILED:
+        return State.FAILED;
+      case SUSPENDED:

Review Comment:
   should this return [STOPPED](https://beam.apache.org/releases/javadoc/2.1.0/org/apache/beam/sdk/PipelineResult.State.html#STOPPED) beam state instead, where the job is in paused state and not in a terminal state? Flink's [suspended](https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/api/common/JobStatus.html#SUSPENDED) state also corresponds to paused state where the job may transition to [restarting state](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/internals/job_scheduling/)



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java:
##########
@@ -142,6 +142,20 @@ public interface FlinkPipelineOptions
 
   void setNumberOfExecutionRetries(Integer retries);
 
+  @Description(
+      "Set job check interval in seconds under detached mode in method waitUntilFinish, "
+          + "by default it is 5 seconds")
+  @Default.Integer(5)
+  int getJobCheckIntervalInSecs();
+
+  void setJobCheckIntervalInSecs(int seconds);
+
+  @Description("Set the attached mode")

Review Comment:
   nit: probably can re-phrase the comment as "Specifies if the pipeline is submitted in attached or detached mode"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #26193:
URL: https://github.com/apache/beam/pull/26193#discussion_r1258727258


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableRunnerResult.java:
##########
@@ -44,13 +50,44 @@ public JobApi.MetricResults portableMetrics() throws UnsupportedOperationExcepti
         .build();
   }
 
-  static class Detached extends FlinkDetachedRunnerResult implements PortablePipelineResult {
+  static class Detached implements PortablePipelineResult {
+
+    Detached() {
+      super();
+    }
 
     @Override
     public JobApi.MetricResults portableMetrics() throws UnsupportedOperationException {
       LOG.warn(
           "Collecting monitoring infos is not implemented yet in Flink portable runner (detached mode).");
       return JobApi.MetricResults.newBuilder().build();
     }
+
+    @Override
+    public @UnknownKeyFor @NonNull @Initialized State getState() {

Review Comment:
   These "@UnknownKeyFor @NonNull @Initialized" are not necessary. These are added by default to compiled .class. IDE may auto propagated them which is slightly annoying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #26193:
URL: https://github.com/apache/beam/pull/26193#discussion_r1175352269


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java:
##########
@@ -142,6 +142,20 @@
 
   void setNumberOfExecutionRetries(Integer retries);
 
+  @Description(
+      "Set job check interval in seconds under detached mode in method waitUntilFinish, "
+          + "by default it is 5 seconds")
+  @Default.Integer(5)
+  int getJobCheckIntervalInSecs();
+
+  void setJobCheckIntervalInSecs(int seconds);
+
+  @Description("Set the attached mode")
+  @Default.Boolean(true)
+  boolean getAttachedMode();

Review Comment:
   Sounds good to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #26193:
URL: https://github.com/apache/beam/pull/26193#discussion_r1271239683


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -18,41 +18,119 @@
 package org.apache.beam.runners.flink;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
 import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Result of a detached execution of a {@link org.apache.beam.sdk.Pipeline} with Flink. In detached
  * execution, results and job execution are currently unavailable.
  */
 public class FlinkDetachedRunnerResult implements PipelineResult {
 
-  FlinkDetachedRunnerResult() {}
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkDetachedRunnerResult.class);
+
+  private JobClient jobClient;
+  private int jobCheckIntervalInSecs;
+
+  FlinkDetachedRunnerResult(JobClient jobClient, int jobCheckIntervalInSecs) {
+    this.jobClient = jobClient;
+    this.jobCheckIntervalInSecs = jobCheckIntervalInSecs;
+  }
 
   @Override
   public State getState() {
-    return State.UNKNOWN;
+    try {
+      return toBeamJobState(jobClient.getJobStatus().get());
+    } catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException("Fail to get flink job state", e);
+    }
   }
 
   @Override
   public MetricResults metrics() {
-    throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
+    return MetricsContainerStepMap.asAttemptedOnlyMetricResults(getMetricsContainerStepMap());
+  }
+
+  private MetricsContainerStepMap getMetricsContainerStepMap() {
+    try {
+      return (MetricsContainerStepMap)
+          jobClient
+              .getAccumulators()
+              .get()
+              .getOrDefault(FlinkMetricContainer.ACCUMULATOR_NAME, new MetricsContainerStepMap());
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.warn("Fail to get flink job accumulators", e);
+      return new MetricsContainerStepMap();
+    }
   }
 
   @Override
   public State cancel() throws IOException {
-    throw new UnsupportedOperationException("Cancelling is not yet supported.");
+    try {
+      this.jobClient.cancel().get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException("Fail to cancel flink job", e);
+    }
+    return getState();
   }
 
   @Override
   public State waitUntilFinish() {
-    return State.UNKNOWN;
+    return waitUntilFinish(Duration.standardDays(Long.MAX_VALUE));

Review Comment:
   ```suggestion
       return waitUntilFinish(Duration.millis(Long.MAX_VALUE));
   ```
   
   Duration.standardDay(Long.MAX_VALUE) will cause overflow?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1618132828

   Reminder, please take a look at this pr: @robertwb 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1561751853

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @robertwb added as fallback since no labels match configuration
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1607345028

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @robertwb added as fallback since no labels match configuration
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] zjffdu commented on a diff in pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "zjffdu (via GitHub)" <gi...@apache.org>.
zjffdu commented on code in PR #26193:
URL: https://github.com/apache/beam/pull/26193#discussion_r1271184006


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableRunnerResult.java:
##########
@@ -44,13 +50,44 @@ public JobApi.MetricResults portableMetrics() throws UnsupportedOperationExcepti
         .build();
   }
 
-  static class Detached extends FlinkDetachedRunnerResult implements PortablePipelineResult {
+  static class Detached implements PortablePipelineResult {
+
+    Detached() {
+      super();
+    }
 
     @Override
     public JobApi.MetricResults portableMetrics() throws UnsupportedOperationException {
       LOG.warn(
           "Collecting monitoring infos is not implemented yet in Flink portable runner (detached mode).");
       return JobApi.MetricResults.newBuilder().build();
     }
+
+    @Override
+    public @UnknownKeyFor @NonNull @Initialized State getState() {

Review Comment:
   Fixed



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -18,41 +18,120 @@
 package org.apache.beam.runners.flink;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
 import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Result of a detached execution of a {@link org.apache.beam.sdk.Pipeline} with Flink. In detached
  * execution, results and job execution are currently unavailable.
  */
 public class FlinkDetachedRunnerResult implements PipelineResult {
 
-  FlinkDetachedRunnerResult() {}
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkDetachedRunnerResult.class);
+  private static final int TEN_YEAR_DAYS = 365 * 10;
+
+  private JobClient jobClient;
+  private int jobCheckIntervalInSecs;
+
+  FlinkDetachedRunnerResult(JobClient jobClient, int jobCheckIntervalInSecs) {
+    this.jobClient = jobClient;
+    this.jobCheckIntervalInSecs = jobCheckIntervalInSecs;
+  }
 
   @Override
   public State getState() {
-    return State.UNKNOWN;
+    try {
+      return toBeamJobState(jobClient.getJobStatus().get());
+    } catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException("Fail to get flink job state", e);
+    }
   }
 
   @Override
   public MetricResults metrics() {
-    throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
+    return MetricsContainerStepMap.asAttemptedOnlyMetricResults(getMetricsContainerStepMap());
+  }
+
+  private MetricsContainerStepMap getMetricsContainerStepMap() {
+    try {
+      return (MetricsContainerStepMap)
+          jobClient
+              .getAccumulators()
+              .get()
+              .getOrDefault(FlinkMetricContainer.ACCUMULATOR_NAME, new MetricsContainerStepMap());
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.warn("Fail to get flink job accumulators", e);
+      return new MetricsContainerStepMap();
+    }
   }
 
   @Override
   public State cancel() throws IOException {
-    throw new UnsupportedOperationException("Cancelling is not yet supported.");
+    try {
+      this.jobClient.cancel().get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException("Fail to cancel flink job", e);
+    }
+    return getState();
   }
 
   @Override
   public State waitUntilFinish() {
-    return State.UNKNOWN;
+    return waitUntilFinish(Duration.standardDays(TEN_YEAR_DAYS));

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature Request]: Support detached mode for flink runner (#26158) [beam]

Posted by "gli-marc-hurabielle (via GitHub)" <gi...@apache.org>.
gli-marc-hurabielle commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1798099294

   Do you know if it is working for portable runner? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1587222257

   Reminder, please take a look at this pr: @robertwb 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1571944181

   Reminder, please take a look at this pr: @robertwb 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #26193:
URL: https://github.com/apache/beam/pull/26193#discussion_r1258727258


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableRunnerResult.java:
##########
@@ -44,13 +50,44 @@ public JobApi.MetricResults portableMetrics() throws UnsupportedOperationExcepti
         .build();
   }
 
-  static class Detached extends FlinkDetachedRunnerResult implements PortablePipelineResult {
+  static class Detached implements PortablePipelineResult {
+
+    Detached() {
+      super();
+    }
 
     @Override
     public JobApi.MetricResults portableMetrics() throws UnsupportedOperationException {
       LOG.warn(
           "Collecting monitoring infos is not implemented yet in Flink portable runner (detached mode).");
       return JobApi.MetricResults.newBuilder().build();
     }
+
+    @Override
+    public @UnknownKeyFor @NonNull @Initialized State getState() {

Review Comment:
   These `@UnknownKeyFor @NonNull @Initialized` are not necessary. These are added by default to compiled .class. IDE may auto propagated them which is slightly annoying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1640100206

   Reminder, please take a look at this pr: @robertwb 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1502004572

   Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1591077193

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @robertwb added as fallback since no labels match configuration
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] zjffdu commented on pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "zjffdu (via GitHub)" <gi...@apache.org>.
zjffdu commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1523548398

   @Abacn Not yet, anyone in beam community familiar with flink and can help on review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1523525244

   @zjffdu have you find a reviewer for flink component?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] zjffdu commented on a diff in pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "zjffdu (via GitHub)" <gi...@apache.org>.
zjffdu commented on code in PR #26193:
URL: https://github.com/apache/beam/pull/26193#discussion_r1174914924


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java:
##########
@@ -142,6 +142,20 @@
 
   void setNumberOfExecutionRetries(Integer retries);
 
+  @Description(
+      "Set job check interval in seconds under detached mode in method waitUntilFinish, "
+          + "by default it is 5 seconds")
+  @Default.Integer(5)
+  int getJobCheckIntervalInSecs();
+
+  void setJobCheckIntervalInSecs(int seconds);
+
+  @Description("Set the attached mode")
+  @Default.Boolean(true)
+  boolean getAttachedMode();

Review Comment:
   @Abacn Any thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1576676096

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @robertwb added as fallback since no labels match configuration
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1602532606

   Reminder, please take a look at this pr: @robertwb 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1629577982

   Thanks @kkdoon for reviewing. Added a few minor comments about the code format. Please pin me when finished. Thanks for being persistent for this work!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] zjffdu commented on a diff in pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "zjffdu (via GitHub)" <gi...@apache.org>.
zjffdu commented on code in PR #26193:
URL: https://github.com/apache/beam/pull/26193#discussion_r1256809704


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -18,41 +18,119 @@
 package org.apache.beam.runners.flink;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
 import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Result of a detached execution of a {@link org.apache.beam.sdk.Pipeline} with Flink. In detached
  * execution, results and job execution are currently unavailable.
  */
 public class FlinkDetachedRunnerResult implements PipelineResult {
 
-  FlinkDetachedRunnerResult() {}
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkDetachedRunnerResult.class);
+  private static final int TEN_YEAR_DAYS = 365 * 10;
+
+  private JobClient jobClient;
+  private int jobCheckIntervalInSecs;
+
+  FlinkDetachedRunnerResult(JobClient jobClient, int jobCheckIntervalInSecs) {
+    this.jobClient = jobClient;
+    this.jobCheckIntervalInSecs = jobCheckIntervalInSecs;
+  }
 
   @Override
   public State getState() {
-    return State.UNKNOWN;
+    try {
+      return toBeamJobState(jobClient.getJobStatus().get());
+    } catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException("Fail to get flink job state", e);
+    }
   }
 
   @Override
   public MetricResults metrics() {
-    throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
+    return MetricsContainerStepMap.asAttemptedOnlyMetricResults(getMetricsContainerStepMap());
+  }
+
+  private MetricsContainerStepMap getMetricsContainerStepMap() {
+    try {
+      return (MetricsContainerStepMap)
+          jobClient
+              .getAccumulators()
+              .get()
+              .getOrDefault(FlinkMetricContainer.ACCUMULATOR_NAME, new MetricsContainerStepMap());
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.warn("Fail to get flink job accumulators", e);
+      return new MetricsContainerStepMap();
+    }
   }
 
   @Override
   public State cancel() throws IOException {
-    throw new UnsupportedOperationException("Cancelling is not yet supported.");
+    try {
+      this.jobClient.cancel().get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException("Fail to cancel flink job", e);
+    }
+    return getState();
   }
 
   @Override
   public State waitUntilFinish() {
-    return State.UNKNOWN;
+    return waitUntilFinish(Duration.standardDays(TEN_YEAR_DAYS));
   }
 
   @Override
   public State waitUntilFinish(Duration duration) {
-    return State.UNKNOWN;
+    long start = System.currentTimeMillis();
+    long durationInMillis = duration.getMillis();
+    State state = State.UNKNOWN;
+    while ((System.currentTimeMillis() - start) < durationInMillis) {
+      state = getState();
+      if (state.isTerminal()) {
+        return state;
+      }
+      try {
+        Thread.sleep(jobCheckIntervalInSecs * 1000);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    if (state != null && !state.isTerminal()) {
+      LOG.warn("Job is not finished in {} seconds", duration.getStandardSeconds());
+    }
+    return state;
+  }
+
+  private State toBeamJobState(JobStatus flinkJobStatus) {
+    switch (flinkJobStatus) {
+      case CANCELLING:
+      case CREATED:
+      case INITIALIZING:
+      case FAILING:
+      case RECONCILING:
+      case RESTARTING:
+      case RUNNING:
+        return State.RUNNING;
+      case FINISHED:
+        return State.DONE;
+      case CANCELED:
+        return State.CANCELLED;
+      case FAILED:
+        return State.FAILED;
+      case SUSPENDED:

Review Comment:
   Thanks for the review, @kkdoon I have updated this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] zjffdu commented on pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "zjffdu (via GitHub)" <gi...@apache.org>.
zjffdu commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1646364221

   @Abacn I have addressed the review comments, please help review again, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn merged pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn merged PR #26193:
URL: https://github.com/apache/beam/pull/26193


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] zjffdu commented on a diff in pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "zjffdu (via GitHub)" <gi...@apache.org>.
zjffdu commented on code in PR #26193:
URL: https://github.com/apache/beam/pull/26193#discussion_r1166136917


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java:
##########
@@ -142,6 +142,20 @@
 
   void setNumberOfExecutionRetries(Integer retries);
 
+  @Description(
+      "Set job check interval in seconds under detached mode in method waitUntilFinish, "
+          + "by default it is 5 seconds")
+  @Default.Integer(5)
+  int getJobCheckIntervalInSecs();
+
+  void setJobCheckIntervalInSecs(int seconds);
+
+  @Description("Set the attached mode")
+  @Default.Boolean(true)
+  boolean getAttachedMode();

Review Comment:
   Thanks for the comment @Abacn , I would agree to use the same naming if it is a common option in `PipelineOptions`, but IMHO it would be better to use flink-specific terminology in `FlinkPipelineOptions` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #26193:
URL: https://github.com/apache/beam/pull/26193#issuecomment-1523560295

   CC: @yananhao12 @xinyuiscool could you please taking a look, feel free to pass on to someone else if not


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] zjffdu commented on a diff in pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "zjffdu (via GitHub)" <gi...@apache.org>.
zjffdu commented on code in PR #26193:
URL: https://github.com/apache/beam/pull/26193#discussion_r1256812326


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java:
##########
@@ -142,6 +142,20 @@ public interface FlinkPipelineOptions
 
   void setNumberOfExecutionRetries(Integer retries);
 
+  @Description(
+      "Set job check interval in seconds under detached mode in method waitUntilFinish, "
+          + "by default it is 5 seconds")
+  @Default.Integer(5)
+  int getJobCheckIntervalInSecs();
+
+  void setJobCheckIntervalInSecs(int seconds);
+
+  @Description("Set the attached mode")

Review Comment:
   Thanks for the review @kkdoon I have updated the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #26193: [Feature Request]: Support detached mode for flink runner (#26158)

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #26193:
URL: https://github.com/apache/beam/pull/26193#discussion_r1258725656


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java:
##########
@@ -18,41 +18,120 @@
 package org.apache.beam.runners.flink;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.core.execution.JobClient;
 import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Result of a detached execution of a {@link org.apache.beam.sdk.Pipeline} with Flink. In detached
  * execution, results and job execution are currently unavailable.
  */
 public class FlinkDetachedRunnerResult implements PipelineResult {
 
-  FlinkDetachedRunnerResult() {}
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkDetachedRunnerResult.class);
+  private static final int TEN_YEAR_DAYS = 365 * 10;
+
+  private JobClient jobClient;
+  private int jobCheckIntervalInSecs;
+
+  FlinkDetachedRunnerResult(JobClient jobClient, int jobCheckIntervalInSecs) {
+    this.jobClient = jobClient;
+    this.jobCheckIntervalInSecs = jobCheckIntervalInSecs;
+  }
 
   @Override
   public State getState() {
-    return State.UNKNOWN;
+    try {
+      return toBeamJobState(jobClient.getJobStatus().get());
+    } catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException("Fail to get flink job state", e);
+    }
   }
 
   @Override
   public MetricResults metrics() {
-    throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics.");
+    return MetricsContainerStepMap.asAttemptedOnlyMetricResults(getMetricsContainerStepMap());
+  }
+
+  private MetricsContainerStepMap getMetricsContainerStepMap() {
+    try {
+      return (MetricsContainerStepMap)
+          jobClient
+              .getAccumulators()
+              .get()
+              .getOrDefault(FlinkMetricContainer.ACCUMULATOR_NAME, new MetricsContainerStepMap());
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.warn("Fail to get flink job accumulators", e);
+      return new MetricsContainerStepMap();
+    }
   }
 
   @Override
   public State cancel() throws IOException {
-    throw new UnsupportedOperationException("Cancelling is not yet supported.");
+    try {
+      this.jobClient.cancel().get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException("Fail to cancel flink job", e);
+    }
+    return getState();
   }
 
   @Override
   public State waitUntilFinish() {
-    return State.UNKNOWN;
+    return waitUntilFinish(Duration.standardDays(TEN_YEAR_DAYS));

Review Comment:
   consider Duration.millis(Long.MAX_VALUE) which is conventionally used throughout beam



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org