You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/05/01 19:24:00 UTC

[jira] [Work logged] (BEAM-12093) Overhaul ElasticsearchIO#Write

     [ https://issues.apache.org/jira/browse/BEAM-12093?focusedWorklogId=591903&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-591903 ]

ASF GitHub Bot logged work on BEAM-12093:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/May/21 19:23
            Start Date: 01/May/21 19:23
    Worklog Time Spent: 10m 
      Work Description: egalpin commented on a change in pull request #14347:
URL: https://github.com/apache/beam/pull/14347#discussion_r624553384



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1246,88 +1891,182 @@ public Write withUsePartialUpdate(boolean usePartialUpdate) {
      * }</pre>
      *
      * @param retryConfiguration the rules which govern the retry behavior
-     * @return the {@link Write} with retrying configured
+     * @return the {@link BulkIO} with retrying configured
      */
-    public Write withRetryConfiguration(RetryConfiguration retryConfiguration) {
+    public BulkIO withRetryConfiguration(RetryConfiguration retryConfiguration) {
       checkArgument(retryConfiguration != null, "retryConfiguration is required");
       return builder().setRetryConfiguration(retryConfiguration).build();
     }
 
     /**
-     * Provide a function to extract the target operation either upsert or delete from the document
-     * fields allowing dynamic bulk operation decision. While using withIsDeleteFn, it should be
-     * taken care that the document's id extraction is defined using the withIdFn function or else
-     * IllegalArgumentException is thrown. Should the function throw an Exception then the batch
-     * will fail and the exception propagated.
+     * Whether or not to suppress version conflict errors in a Bulk API response. This can be useful
+     * if your use case involves using external version types.
      *
-     * @param isDeleteFn set to true for deleting the specific document
-     * @return the {@link Write} with the function set
+     * @param ignoreVersionConflicts true to suppress version conflicts, false to surface version
+     *     conflict errors.
+     * @return the {@link BulkIO} with version conflict handling configured
      */
-    public Write withIsDeleteFn(BooleanFieldValueExtractFn isDeleteFn) {
-      checkArgument(isDeleteFn != null, "deleteFn is required");
-      return builder().setIsDeleteFn(isDeleteFn).build();
+    public BulkIO withIgnoreVersionConflicts(boolean ignoreVersionConflicts) {
+      Set<String> allowedResponseErrors = getAllowedResponseErrors();
+      if (allowedResponseErrors == null) {
+        allowedResponseErrors = new HashSet<>();
+      }
+      if (ignoreVersionConflicts) {
+        allowedResponseErrors.add(VERSION_CONFLICT_ERROR);
+      }
+
+      return builder().setAllowedResponseErrors(allowedResponseErrors).build();
+    }
+
+    /**
+     * Provide a set of textual error types which can be contained in Bulk API response
+     * items[].error.type field. Any element in @param allowableResponseErrorTypes will suppress
+     * errors of the same type in Bulk responses.
+     *
+     * <p>See also
+     * https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-failures-ex
+     *
+     * @param allowableResponseErrorTypes
+     * @return the {@link BulkIO} with allowable response errors set
+     */
+    public BulkIO withAllowableResponseErrors(@Nullable Set<String> allowableResponseErrorTypes) {
+      if (allowableResponseErrorTypes == null) {
+        allowableResponseErrorTypes = new HashSet<>();
+      }
+
+      return builder().setAllowedResponseErrors(allowableResponseErrorTypes).build();
+    }
+
+    /**
+     * If using {@link BulkIO#withUseStatefulBatches}, this can be used to set a maximum elapsed
+     * time before buffered elements are emitted to Elasticsearch as a Bulk API request. If this
+     * config is not set, Bulk requests will not be issued until {@link BulkIO#getMaxBatchSize}
+     * number of documents have been buffered. This may result in higher latency in particular if
+     * your max batch size is set to a large value and your pipeline input is low volume.
+     *
+     * @param maxBufferingDuration the maximum duration to wait before sending any buffered
+     *     documents to Elasticsearch, regardless of maxBatchSize.
+     * @return the {@link BulkIO} with maximum buffering duration set
+     */
+    public BulkIO withMaxBufferingDuration(Duration maxBufferingDuration) {
+      LOG.warn(
+          "Use of withMaxBufferingDuration requires withUseStatefulBatches(true). "
+              + "Setting that automatically.");
+      return builder()
+          .setUseStatefulBatches(true)
+          .setMaxBufferingDuration(maxBufferingDuration)
+          .build();
+    }
+
+    /**
+     * Whether or not to use Stateful Processing to ensure bulk requests have the desired number of
+     * entities i.e. as close to the maxBatchSize as possible. By default without this feature
+     * enabled, Bulk requests will not contain more than maxBatchSize entities, but the lower bound
+     * of batch size is determined by Beam Runner bundle sizes, which may be as few as 1.
+     *
+     * @param useStatefulBatches true enables the use of Stateful Processing to ensure that batches
+     *     are as close to the maxBatchSize as possible.
+     * @return the {@link BulkIO} with Stateful Processing enabled or disabled
+     */
+    public BulkIO withUseStatefulBatches(boolean useStatefulBatches) {
+      return builder().setUseStatefulBatches(useStatefulBatches).build();
+    }
+
+    /**
+     * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, states and therefore
+     * batches are maintained per-key-per-window. BE AWARE that low values for @param
+     * maxParallelRequestsPerWindow, in particular if the input data has a finite number of windows,
+     * can reduce parallelism greatly. If data is globally windowed and @param
+     * maxParallelRequestsPerWindow is set to 1,there will only ever be 1 request in flight. Having
+     * only a single request in flight can be beneficial for ensuring an Elasticsearch cluster is
+     * not overwhelmed by parallel requests,but may not work for all use cases. If this number is
+     * less than the number of maximum workers in your pipeline, the IO work may not be distributed

Review comment:
       added that wording 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 591903)
    Time Spent: 9h 10m  (was: 9h)

> Overhaul ElasticsearchIO#Write
> ------------------------------
>
>                 Key: BEAM-12093
>                 URL: https://issues.apache.org/jira/browse/BEAM-12093
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-elasticsearch
>            Reporter: Evan Galpin
>            Assignee: Evan Galpin
>            Priority: P2
>              Labels: elasticsearch
>          Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> The current ElasticsearchIO#Write is great, but there are two related areas which could be improved:
>  # Separation of concern
>  # Bulk API batch size optimization
>  
> Presently, the Write transform has 2 responsibilities which are coupled and inseparable by users:
>  # Convert input documents into Bulk API entities, serializing based on user settings (partial update, delete, upsert, etc)
>  # Batch the converted Bulk API entities together and interface with the target ES cluster
>  
> Having these 2 roles tightly coupled means testing requires an available Elasticsearch cluster, making unit testing almost impossible. Allowing access to the serialized documents would make unit testing much easier for pipeline developers, among numerous other benefits to having separation between serialization and IO.
> Relatedly, the batching of entities when creating Bulk API payloads is currently limited by the lesser of Beam Runner bundling semantics, and the `ElasticsearchIO#Write#maxBatchSize` setting. This is understandable for portability between runners, but it also means most Bulk payloads only have a few (1-5) entities. By using Stateful Processing to better adhere to the `ElasticsearchIO#Write#maxBatchSize` setting, we have been able to drop the number of indexing requests in an Elasticsearch cluster by 50-100x. Separating the role of document serialization and IO allows supporting multiple IO techniques with minimal and understandable code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)