You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Pei He (JIRA)" <ji...@apache.org> on 2016/10/04 18:48:20 UTC

[jira] [Comment Edited] (BEAM-702) Simple pattern for per-bundle and per-DoFn Closeable resources

    [ https://issues.apache.org/jira/browse/BEAM-702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15546277#comment-15546277 ] 

Pei He edited comment on BEAM-702 at 10/4/16 6:47 PM:
------------------------------------------------------

The feature request is for something like this?
class MyDoFn extends DoFn<String, String> {
  @Closable(Scope.BUNDLE)
  private DBWriter writer = null;
    
  @StartBundle
  public void startBundle(Context c) {
    writer = createDBWriter();
  }

  @ProcessElement
  public void processElement(ProcessContext c) {
    writer.write(...)
  }
}

I think runners can recognize Closable resources and their scopes through annotations, and they can generate the code to execute close() after the resources become out of the scope.

But, I think the real usefulness is more about "closed automatically" in both success and failure conditions.
Closing the resources manually in finishBundle() or teardown() work if the pipeline doesn't failure. Otherwise, if the pipeline fails, the resources might be left open.

This is the problem we have in BigQueryIO.Read, where we start external BQ extract jobs. However, if the pipeline fails, there is no hook to cancel them.

In summary for my 2 cents:
+1 for introducing mechanisms to recognize Closable resources in the model
And, it needs to work in both success and failure conditions.


was (Author: peihe0@gmail.com):
The feature request is for something like this?
class MyDoFn extends DoFn<String, String> {
  @Closable(Scope.BUNDLE)
  private DBWriter writer = null;
    
  @StartBundle
  public void startBundle(Context c) {
    writer = createDBWriter();
  }

  @ProcessElement
  public void processElement(ProcessContext c) {
    writer.write(...)
  }
}

I think runners can recognize Closable resources and their scopes through annotations, and they can generate the code the execute close() after the resources become out of the scope.

But, I think the real usefulness is more about "closed automatically" in both success and failure conditions.
Closing the resources manually in finishBundle() or teardown() work if the pipeline doesn't failure. Otherwise, if the pipeline fails, the resources might be left open.

This is the problem we have in BigQueryIO.Read, where we start external BQ extract jobs. However, if the pipeline fails, there is no hook to cancel them.

In summary for my 2 cents:
+1 for introducing mechanisms to recognize Closable resources in the model
And, it needs to work in both success and failure conditions.

> Simple pattern for per-bundle and per-DoFn Closeable resources
> --------------------------------------------------------------
>
>                 Key: BEAM-702
>                 URL: https://issues.apache.org/jira/browse/BEAM-702
>             Project: Beam
>          Issue Type: Improvement
>            Reporter: Eugene Kirpichov
>
> Dealing with Closeable resources inside a processElement call is easy: simply use try-with-resources.
> However, bundle- or DoFn-scoped resources, such as long-lived database connections, are less convenient to deal with: you have to open them in startBundle and conditionally close in finishBundle (likewise setup/teardown), taking special care if there's multiple resources to close all of them.
> Perhaps we should provide something like Guava's Closer to DoFn's https://github.com/google/guava/wiki/ClosingResourcesExplained. Ideally, the user would need to only write a startBundle() or setup() method, but not write finishBundle() or teardown() - resources would be closed automatically.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)