You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Murphy, Sean P. via user" <us...@beam.apache.org> on 2023/04/24 20:35:40 UTC

Re: [EXTERNAL] Re: Re: Q: Apache Beam IOElasticsearchIO.read() method (Java), which expects a PBegin input and a means to handle a collection of queries

Thank for the reply.

This would need to build the queries at runtime.   There are incoming patient clinics for which there would be a known quantity, but this could fluctuate from thousands to hundreds of thousands depending on the size of study.

From the approach you provided below; couldn’t the esQueryResults still be determined at runtime?

                ~Sean

From: Evan Galpin <eg...@apache.org>
Date: Monday, April 24, 2023 at 2:18 PM
To: user <us...@beam.apache.org>
Cc: Anthony Samy, Charles <An...@mayo.edu>, Murphy, Sean P. <Mu...@mayo.edu>
Subject: [EXTERNAL] Re: Re: Q: Apache Beam IOElasticsearchIO.read() method (Java), which expects a PBegin input and a means to handle a collection of queries
Redirecting to the user mailing list as well to hopefully help the community if others face similar issues in the future.  All of the solutions in the thread so far involve making changes to the OSS Beam ElasticsearchIO codebase, which is the best long-term path and the path I would encourage.  That said, I understand that doing so is not always feasible depending on timelines etc.  Is your set of queries countable? Can they be known at pipeline compilation time? Not the most elegant solution, but you could potentially iterate over them if they can be known at compile time:

    List<PCollection<String>> esQueryResults = new ArrayList<>();
    for (String queryString : myKnownQueryStrings) {
      esQueryResults.add(p
          .apply(ElasticsearchIO.read()
              .withConnectionConfiguration(
                  ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName))
              .withQuery(queryString))
      );
    }

    PCollectionList<String> resultsList = PCollectionList.empty(p);

    for (PCollection<String> qResults : esQueryResults) {
      resultsList.and(qResults);
    }

    resultsList
        .apply(Flatten.pCollections())
        .apply(...);

On Mon, Apr 24, 2023 at 10:39 AM Murphy, Sean P. <Mu...@mayo.edu>> wrote:
Any other thoughts?  I’ve run out of ideas.   Thanks, ~Sean

From: Murphy, Sean P. <Mu...@mayo.edu>>
Date: Friday, April 21, 2023 at 11:00 AM
To: Alexey Romanenko <ar...@gmail.com>>, Evan Galpin <eg...@apache.org>>
Cc: Anthony Samy, Charles <An...@mayo.edu>>, egalpin@apache.org<ma...@apache.org> <eg...@apache.org>>
Subject: Re: [EXTERNAL] Re: Q: Apache Beam IOElasticsearchIO.read() method (Java), which expects a PBegin input and a means to handle a collection of queries
Thank you, Alexey.

The issue isn’t with the split itself, but how to introduce the Create.of (or similar) using the in a similar fashion as was described for the FileIO approach.   I may have missed something, but I’m not sure I can implement the same approach using ElasticsearchIO.  Thanks, ~Sean

apply(MapElements
         // uses imports from TypeDescriptors
         .into(kvs(strings(), strings()))
         .via((ReadableFile f) -> {
           try {
             return KV.of(
                 f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String());
           } catch (IOException ex) {
             throw new RuntimeException("Failed to read the file", ex);
           }
         }));


From: Alexey Romanenko <ar...@gmail.com>>
Date: Friday, April 21, 2023 at 5:20 AM
To: Murphy, Sean P. <Mu...@mayo.edu>>
Cc: Anthony Samy, Charles <An...@mayo.edu>>, egalpin@apache.org<ma...@apache.org> <eg...@apache.org>>
Subject: [EXTERNAL] Re: Q: Apache Beam IOElasticsearchIO.read() method (Java), which expects a PBegin input and a means to handle a collection of queries
Yes, “ReadAll” doesn’t exist for ElasticsearchIO, it has to be implemented (btw, it would be a good contribution for Beam!). I’d say that "SplitFn()"  is rather optional and specific for this example of SolrIO, that I showed before. The general idea of this is actually to evenly distribute all Reads across all workers and split them, if possible, to have an equal load on your Elasticsearch cluster.

I can’t say for sure what is a best way to implement it for Elasticsearch, so I’d recommend you to discuss it with Evan Galpin, who is a main contributor and maintaner of ElasticsearchIO.

—
Alexey



On 20 Apr 2023, at 18:52, Murphy, Sean P. <Mu...@mayo.edu>> wrote:

Excuse my question if it’s obvious, but since those methods aren’t accessible for Elasticsearch from the same level : https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java

Would I need to implement my own versions of SplitFn() and ReadAll()?

Such as : PTransform<PCollection<Read>, PCollection<SearchSourceBuilder>> { @Override public PCollection<SearchSourceBuilder> expand(PCollection<Read> input) { return input .apply("Split", ParDo.of(new SplitFn())) .apply("Reshuffle", Reshuffle.viaRandomKey()) .apply("Read", ParDo.of(new ReadFn())); } }



From: Alexey Romanenko <ar...@gmail.com>>
Date: Thursday, April 20, 2023 at 11:13 AM
To: user <us...@beam.apache.org>>, Murphy, Sean P. <Mu...@mayo.edu>>
Subject: [EXTERNAL] Re: Q: Apache Beam IOElasticsearchIO.read() method (Java), which expects a PBegin input and a means to handle a collection of queries
Some Java IO-connectors implement a class something like "class ReadAll extends PTransform<PCollection<Read>, PCollection<YourDocument>>” where “Read” is supposed to be configured dynamically. As a simple example, take a look on “SolrIO” [1]

So, to support what you are looking for, “ReadAll”-pattern should be implemented for ElasticsearchIO.

—
Alexey

[1] https://github.com/apache/beam/blob/master/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java

On 19 Apr 2023, at 19:05, Murphy, Sean P. via user <us...@beam.apache.org>> wrote:

I'm running into an issue using the ElasticsearchIO.read() to handle more than one instance of a query. My queries are being dynamically built as a PCollection based on an incoming group of values. I'm trying to see how to load the .withQuery() parameter which could provide this capability or any approach that provides flexibility.

The issue is that ElasticsearchIO.read() method expects a PBegin input to start a pipeline, but it seems like I need access outside of a pipeline context somehow. PBegin represents the beginning of a pipeline, and it's required to create a pipeline that can read data from Elasticsearch using IOElasticsearchIO.read().

Can I wrap the ElasticsearchIO.read() call in a Create transform that creates a PCollection with a single element (e.g., PBegin) to simulate the beginning of a pipeline or something similar?

Here is my naive attempt without accepting the reality of PBegin:
   PCollection<String> queries = ... // a PCollection of Elasticsearch queries

    PCollection<String> queryResults = queries.apply(
        ParDo.of(new DoFn<String, String>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                String query = c.element();
                PCollection<String> results = c.pipeline()
                    .apply(ElasticsearchIO.read()
                        .withConnectionConfiguration(
                            ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName))
                        .withQuery(query));
                c.output(results);
            }
        })
    .apply(Flatten.pCollections()));


In general I'm wondering for any of IO-related classes proved by Beam that conforms to PBegin input -- if there is a means to introduce a collection.

Here is one approach that might be promising:
// Define a ValueProvider for a List<String>
ValueProvider<List<String>> myListProvider = ValueProvider.StaticValueProvider.of(myList);

// Use the ValueProvider to create a PCollection of Strings
PCollection<String> pcoll = pipeline.apply(Create.ofProvider(myListProvider, ListCoder.of()));

PCollection<String> partitionData = PBegin.in(pipeline)
        .apply("Read data from Elasticsearch", ElasticsearchIO.read().withConnectionConfiguration(connConfig).withQuery(ValueProvider<String> pcoll).withScrollKeepalive("1m").withBatchSize(50))
        .apply(new MedTaggerESRunnerTransform(opt.getProjectAe(), opt.getMedTagVersion(), opt.getNoteType()));

Any thoughts or ideas would be great.   Thanks, ~Sean


Re: [EXTERNAL] Re: Re: Q: Apache Beam IOElasticsearchIO.read() method (Java), which expects a PBegin input and a means to handle a collection of queries

Posted by Evan Galpin <eg...@apache.org>.
Is your pipeline a bounded or unbounded pipeline? Are you hoping to run a
job where the queries are streamed in to some unbounded pipeline Source,
and in response the pipeline would execute the query and proceed with any
downstream data manipulation? If so, unfortunately the approach I
described, as well as the ReadAll approach in SolrIO, won’t work. Both of
those approaches assume a bounded workload (Alexey please correct me if I'm
wrong).

The approach that fits best with supporting Elasticsearch reads in an
unbounded pipeline is rewriting ElasticsearchIO to be a Splitable DoFn[1].

It might be possible to run multiple jobs to suit your needs:

1. A streaming job that writes queries to a time-partitioned file in some
distributed file system (GCS, S3, HDFS)
2. A batch job (being deployed once every time a new file is “done” having
new things written to it) that reads a given time-partitioned file
containing the input queries, then uses a loop like in my prior example to
bootstrap the Elasticsearch reads for each query in the file.

The big difference here is that each of these batch pipelines is bounded,
whereas it sounds like you’d ideally like to have support in a streaming
pipeline. I think the two options for streaming support are using an es
java client directly in a custom DoFn, or converting Read to SplittableDoFn

[1]
https://beam.apache.org/blog/splittable-do-fn-is-available/

On Mon, Apr 24, 2023 at 13:36 Murphy, Sean P. via user <us...@beam.apache.org>
wrote:

> Thank for the reply.
>
>
>
> This would need to build the queries at runtime.   There are incoming
> patient clinics for which there would be a known quantity, but this could
> fluctuate from thousands to hundreds of thousands depending on the size of
> study.
>
>
>
> From the approach you provided below; couldn’t the esQueryResults still be
> determined at runtime?
>
>
>
>                 ~Sean
>
>
>
> *From: *Evan Galpin <eg...@apache.org>
>
>
> *Date: *Monday, April 24, 2023 at 2:18 PM
> *To: *user <us...@beam.apache.org>
> *Cc: *Anthony Samy, Charles <An...@mayo.edu>, Murphy, Sean
> P. <Mu...@mayo.edu>
> *Subject: *[EXTERNAL] Re: Re: Q: Apache Beam IOElasticsearchIO.read()
> method (Java), which expects a PBegin input and a means to handle a
> collection of queries
>
> Redirecting to the user mailing list as well to hopefully help the
> community if others face similar issues in the future.  All of the
> solutions in the thread so far involve making changes to the OSS Beam
> ElasticsearchIO codebase, which is the best long-term path and the path I
> would encourage.  That said, I understand that doing so is not always
> feasible depending on timelines etc.  Is your set of queries countable? Can
> they be known at pipeline compilation time? Not the most elegant solution,
> but you could potentially iterate over them if they can be known at compile
> time:
>
>     List<PCollection<String>> esQueryResults = new ArrayList<>();
>     for (String queryString : myKnownQueryStrings) {
>       esQueryResults.add(p
>           .apply(ElasticsearchIO.read()
>               .withConnectionConfiguration(
>                   ElasticsearchIO.ConnectionConfiguration.create(hosts,
> indexName))
>               .withQuery(queryString))
>       );
>     }
>
>     PCollectionList<String> resultsList = PCollectionList.empty(p);
>
>     for (PCollection<String> qResults : esQueryResults) {
>       resultsList.and(qResults);
>     }
>
>     resultsList
>         .apply(Flatten.pCollections())
>         .apply(...);
>
>
>
> On Mon, Apr 24, 2023 at 10:39 AM Murphy, Sean P. <Mu...@mayo.edu>
> wrote:
>
> Any other thoughts?  I’ve run out of ideas.   Thanks, ~Sean
>
>
>
> *From: *Murphy, Sean P. <Mu...@mayo.edu>
> *Date: *Friday, April 21, 2023 at 11:00 AM
> *To: *Alexey Romanenko <ar...@gmail.com>, Evan Galpin <
> egalpin@apache.org>
> *Cc: *Anthony Samy, Charles <An...@mayo.edu>,
> egalpin@apache.org <eg...@apache.org>
> *Subject: *Re: [EXTERNAL] Re: Q: Apache Beam IOElasticsearchIO.read()
> method (Java), which expects a PBegin input and a means to handle a
> collection of queries
>
> Thank you, Alexey.
>
>
>
> The issue isn’t with the split itself, but how to introduce the Create.of
> (or similar) using the in a similar fashion as was described for the FileIO
> approach.   I may have missed something, but I’m not sure I can implement
> the same approach using ElasticsearchIO.  Thanks, ~Sean
>
>
>
> apply(MapElements
>
>          // uses imports from TypeDescriptors
>
>          .into(kvs(strings(), strings()))
>
>          .via((ReadableFile f) -> {
>
>            try {
>
>              return KV.of(
>
>                  f.getMetadata().resourceId().toString(),
> f.readFullyAsUTF8String());
>
>            } catch (IOException ex) {
>
>              throw new RuntimeException("Failed to read the file", ex);
>
>            }
>
>          }));
>
>
>
>
>
> *From: *Alexey Romanenko <ar...@gmail.com>
> *Date: *Friday, April 21, 2023 at 5:20 AM
> *To: *Murphy, Sean P. <Mu...@mayo.edu>
> *Cc: *Anthony Samy, Charles <An...@mayo.edu>,
> egalpin@apache.org <eg...@apache.org>
> *Subject: *[EXTERNAL] Re: Q: Apache Beam IOElasticsearchIO.read() method
> (Java), which expects a PBegin input and a means to handle a collection of
> queries
>
> Yes, “ReadAll” doesn’t exist for ElasticsearchIO, it has to be implemented
> (btw, it would be a good contribution for Beam!). I’d say that "SplitFn()"
> is rather optional and specific for this example of SolrIO, that I showed
> before. The general idea of this is actually to evenly distribute all
> Reads across all workers and split them, if possible, to have an equal load
> on your Elasticsearch cluster.
>
>
>
> I can’t say for sure what is a best way to implement it for Elasticsearch,
> so I’d recommend you to discuss it with Evan Galpin, who is a main
> contributor and maintaner of ElasticsearchIO.
>
>
>
> —
>
> Alexey
>
>
>
>
>
> On 20 Apr 2023, at 18:52, Murphy, Sean P. <Mu...@mayo.edu> wrote:
>
>
>
> Excuse my question if it’s obvious, but since those methods aren’t
> accessible for Elasticsearch from the same level :
> https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
>
>
>
> Would I need to implement my own versions of SplitFn() and ReadAll()?
>
>
>
> Such as : PTransform<PCollection<Read>, PCollection<SearchSourceBuilder>>
> { @Override public PCollection<SearchSourceBuilder>
> expand(PCollection<Read> input) { return input .apply("Split", ParDo.of(new
> SplitFn())) .apply("Reshuffle", Reshuffle.viaRandomKey()) .apply("Read",
> ParDo.of(new ReadFn())); } }
>
>
>
>
>
>
>
> *From: *Alexey Romanenko <ar...@gmail.com>
> *Date: *Thursday, April 20, 2023 at 11:13 AM
> *To: *user <us...@beam.apache.org>, Murphy, Sean P. <Mu...@mayo.edu>
> *Subject: *[EXTERNAL] Re: Q: Apache Beam IOElasticsearchIO.read() method
> (Java), which expects a PBegin input and a means to handle a collection of
> queries
>
> Some Java IO-connectors implement a class something like "class ReadAll
> extends PTransform<PCollection<Read>, PCollection<YourDocument>>” where
> “Read” is supposed to be configured dynamically. As a simple example, take
> a look on “SolrIO” [1]
>
>
>
> So, to support what you are looking for, “ReadAll”-pattern should be
> implemented for ElasticsearchIO.
>
>
>
> —
>
> Alexey
>
>
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
>
>
>
> On 19 Apr 2023, at 19:05, Murphy, Sean P. via user <us...@beam.apache.org>
> wrote:
>
>
>
> I'm running into an issue using the ElasticsearchIO.read() to handle more
> than one instance of a query. My queries are being dynamically built as a
> PCollection based on an incoming group of values. I'm trying to see how
> to load the .withQuery() parameter which could provide this capability or
> any approach that provides flexibility.
>
>
>
> The issue is that ElasticsearchIO.read() method expects a PBegin input to
> start a pipeline, but it seems like I need access outside of a pipeline
> context somehow. PBegin represents the beginning of a pipeline, and it's
> required to create a pipeline that can read data from Elasticsearch using
> IOElasticsearchIO.read().
>
>
>
> Can I wrap the ElasticsearchIO.read() call in a Create transform that
> creates a PCollection with a single element (e.g., PBegin) to simulate the
> beginning of a pipeline or something similar?
>
>
>
> Here is my naive attempt without accepting the reality of PBegin:
>
>    PCollection<String> queries = ... // a PCollection of Elasticsearch
> queries
>
>
>
>     PCollection<String> queryResults = queries.apply(
>
>         ParDo.of(new DoFn<String, String>() {
>
>             @ProcessElement
>
>             public void processElement(ProcessContext c) {
>
>                 String query = c.element();
>
>                 PCollection<String> results = c.pipeline()
>
>                     .apply(ElasticsearchIO.read()
>
>                         .withConnectionConfiguration(
>
>
> ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName))
>
>                         .withQuery(query));
>
>                 c.output(results);
>
>             }
>
>         })
>
>     .apply(Flatten.pCollections()));
>
>
>
>
>
> In general I'm wondering for any of IO-related classes proved by Beam that
> conforms to PBegin input -- if there is a means to introduce a collection.
>
>
>
> Here is one approach that might be promising:
>
> // Define a ValueProvider for a List<String>
>
> ValueProvider<List<String>> myListProvider =
> ValueProvider.StaticValueProvider.of(myList);
>
>
>
> // Use the ValueProvider to create a PCollection of Strings
>
> PCollection<String> pcoll =
> pipeline.apply(Create.ofProvider(myListProvider, ListCoder.of()));
>
>
>
> PCollection<String> partitionData = PBegin.in(pipeline)
>         .apply("Read data from Elasticsearch", ElasticsearchIO.*read*().withConnectionConfiguration(connConfig).withQuery(ValueProvider<String>
> pcoll).withScrollKeepalive("1m").withBatchSize(50))
>         .apply(new MedTaggerESRunnerTransform(opt.getProjectAe(),
> opt.getMedTagVersion(), opt.getNoteType()));
>
>
>
> Any thoughts or ideas would be great.   Thanks, ~Sean
>
>
>
>