You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/23 14:10:44 UTC

[GitHub] [beam] jithin97 opened a new pull request #12670: [WIP][BEAM-5757] Add ElasticsearchIO: delete document support

jithin97 opened a new pull request #12670:
URL: https://github.com/apache/beam/pull/12670


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace
   --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   ![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
   ![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jithin97 commented on a change in pull request #12670: [BEAM-5757] Add ElasticsearchIO: delete document support

Posted by GitBox <gi...@apache.org>.
jithin97 commented on a change in pull request #12670:
URL: https://github.com/apache/beam/pull/12670#discussion_r482860614



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1346,17 +1359,36 @@ private static String lowerCaseOrNull(String input) {
 
       @ProcessElement
       public void processElement(ProcessContext context) throws Exception {
-        String document = context.element();
-        String documentMetadata = getDocumentMetadata(document);
-
-        // index is an insert/upsert and update is a partial update (or insert if not existing)
-        if (spec.getUsePartialUpdate()) {
-          batch.add(
-              String.format(
-                  "{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" : true }%n",
-                  documentMetadata, document));
+        String document = context.element(); // use configuration and auto-generated document IDs
+        String documentMetadata = "{}";
+        boolean isDelete = false;
+        if (spec.getIndexFn() != null || spec.getTypeFn() != null || spec.getIdFn() != null) {
+          // parse once and reused for efficiency
+          JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
+          documentMetadata = getDocumentMetadata(parsedDocument);
+          if (spec.getIsDeleteFn() != null) {
+            isDelete = spec.getIsDeleteFn().apply(parsedDocument);
+            // if it is a delete opration, then it is mandatory to specify the document id using
+            // getIdFn
+            checkArgument(
+                !(isDelete && spec.getIdFn() == null),
+                "Id needs to be specified by withIdFn for delete operation");

Review comment:
       Yes, I think that is a better way to handle it. I fixed it.
   Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jithin97 commented on a change in pull request #12670: [BEAM-5757] Add ElasticsearchIO: delete document support

Posted by GitBox <gi...@apache.org>.
jithin97 commented on a change in pull request #12670:
URL: https://github.com/apache/beam/pull/12670#discussion_r482881070



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1346,17 +1359,36 @@ private static String lowerCaseOrNull(String input) {
 
       @ProcessElement
       public void processElement(ProcessContext context) throws Exception {
-        String document = context.element();
-        String documentMetadata = getDocumentMetadata(document);
-
-        // index is an insert/upsert and update is a partial update (or insert if not existing)
-        if (spec.getUsePartialUpdate()) {
-          batch.add(
-              String.format(
-                  "{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" : true }%n",
-                  documentMetadata, document));
+        String document = context.element(); // use configuration and auto-generated document IDs
+        String documentMetadata = "{}";
+        boolean isDelete = false;
+        if (spec.getIndexFn() != null || spec.getTypeFn() != null || spec.getIdFn() != null) {
+          // parse once and reused for efficiency
+          JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
+          documentMetadata = getDocumentMetadata(parsedDocument);
+          if (spec.getIsDeleteFn() != null) {
+            isDelete = spec.getIsDeleteFn().apply(parsedDocument);
+            // if it is a delete opration, then it is mandatory to specify the document id using
+            // getIdFn
+            checkArgument(
+                !(isDelete && spec.getIdFn() == null),
+                "Id needs to be specified by withIdFn for delete operation");
+          }
+        }
+
+        if (!isDelete) {
+          // index is an insert/upsert and update is a partial update (or insert if not existing)
+          if (spec.getUsePartialUpdate()) {
+            batch.add(
+                String.format(
+                    "{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" : true }%n",
+                    documentMetadata, document));
+          } else {
+            batch.add(String.format("{ \"index\" : %s }%n%s%n", documentMetadata, document));
+          }
         } else {
-          batch.add(String.format("{ \"index\" : %s }%n%s%n", documentMetadata, document));
+          // delete request used for deleting a document.
+          batch.add(String.format("{ \"delete\" : %s }%n", documentMetadata));

Review comment:
       Sure, that is better for readability.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jithin97 commented on a change in pull request #12670: [BEAM-5757] Add ElasticsearchIO: delete document support

Posted by GitBox <gi...@apache.org>.
jithin97 commented on a change in pull request #12670:
URL: https://github.com/apache/beam/pull/12670#discussion_r482859763



##########
File path: sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
##########
@@ -140,4 +140,38 @@ public void testWritePartialUpdate() throws Exception {
     elasticsearchIOTestCommonUpdate.setPipeline(pipeline);
     elasticsearchIOTestCommonUpdate.testWritePartialUpdate();
   }
+
+  /**
+   * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+   * around half of the documents are deleted and the other half is partially updated using bulk
+   * delete request. The test then asserts the documents were deleted successfully.
+   */
+  @Test
+  public void testWriteWithIsDeletedFnWithPartialUpdates() throws Exception {
+    ElasticsearchIOTestUtils.copyIndex(
+        restClient,
+        readConnectionConfiguration.getIndex(),
+        updateConnectionConfiguration.getIndex());
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+        new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+    elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+    elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithPartialUpdates();
+  }
+
+  /**
+   * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+   * around half of the documents are deleted using bulk delete request. The test then asserts the
+   * documents were deleted successfully.
+   */
+  @Test
+  public void testWriteWithIsDeletedFnWithoutPartialUpdate() throws Exception {
+    ElasticsearchIOTestUtils.copyIndex(
+        restClient,
+        readConnectionConfiguration.getIndex(),
+        updateConnectionConfiguration.getIndex());
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+        new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+    elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+    elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithoutPartialUpdate();
+  }

Review comment:
       Hi @TheNeuralBit I am also new to the Apache Beam code base 😅. But I also think there is a good opportunity to remove duplicate codes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot edited a comment on pull request #12670: [BEAM-5757] Add ElasticsearchIO: delete document support

Posted by GitBox <gi...@apache.org>.
echauchot edited a comment on pull request #12670:
URL: https://github.com/apache/beam/pull/12670#issuecomment-684611188


   thanks @TheNeuralBit, I'm just back from vacation but I lack some time to review it


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12670: [BEAM-5757] Add ElasticsearchIO: delete document support

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12670:
URL: https://github.com/apache/beam/pull/12670#issuecomment-683083133


   R: @apilloud @TheNeuralBit 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12670: [BEAM-5757] Add ElasticsearchIO: delete document support

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12670:
URL: https://github.com/apache/beam/pull/12670#issuecomment-686803675


   It's probably worth adding a note about this in CHANGES.md so it shows up in the release notes for the next release. Would you mind sending a PR for that?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jithin97 commented on pull request #12670: [BEAM-5757] Add ElasticsearchIO: delete document support

Posted by GitBox <gi...@apache.org>.
jithin97 commented on pull request #12670:
URL: https://github.com/apache/beam/pull/12670#issuecomment-686848020


   > It's probably worth adding a note about this in CHANGES.md so it shows up in the release notes for the next release. Would you mind sending a PR for that?
   
   Sure, I will do that.
   Thanks for the review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jithin97 commented on pull request #12670: [BEAM-5757] Add ElasticsearchIO: delete document support

Posted by GitBox <gi...@apache.org>.
jithin97 commented on pull request #12670:
URL: https://github.com/apache/beam/pull/12670#issuecomment-680015143


   R: @echauchot @jbonofre @timrobertson100


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12670: [BEAM-5757] Add ElasticsearchIO: delete document support

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12670:
URL: https://github.com/apache/beam/pull/12670#issuecomment-686153681


   Run Java PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12670: [BEAM-5757] Add ElasticsearchIO: delete document support

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12670:
URL: https://github.com/apache/beam/pull/12670#discussion_r482613805



##########
File path: sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
##########
@@ -140,4 +140,38 @@ public void testWritePartialUpdate() throws Exception {
     elasticsearchIOTestCommonUpdate.setPipeline(pipeline);
     elasticsearchIOTestCommonUpdate.testWritePartialUpdate();
   }
+
+  /**
+   * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+   * around half of the documents are deleted and the other half is partially updated using bulk
+   * delete request. The test then asserts the documents were deleted successfully.
+   */
+  @Test
+  public void testWriteWithIsDeletedFnWithPartialUpdates() throws Exception {
+    ElasticsearchIOTestUtils.copyIndex(
+        restClient,
+        readConnectionConfiguration.getIndex(),
+        updateConnectionConfiguration.getIndex());
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+        new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+    elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+    elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithPartialUpdates();
+  }
+
+  /**
+   * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+   * around half of the documents are deleted using bulk delete request. The test then asserts the
+   * documents were deleted successfully.
+   */
+  @Test
+  public void testWriteWithIsDeletedFnWithoutPartialUpdate() throws Exception {
+    ElasticsearchIOTestUtils.copyIndex(
+        restClient,
+        readConnectionConfiguration.getIndex(),
+        updateConnectionConfiguration.getIndex());
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+        new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+    elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+    elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithoutPartialUpdate();
+  }

Review comment:
       Not really your problem but this is my first time looking at these tests, and this structure tests seems really odd to me. It looks like we could get rid of a lof of duplicate code if we just implemented both `ElasticsearchIOIT` and `ElasticsearchIOTest`  in elasticsearch common, then the implementations for each version could inherit from those with an (almost) empty implementation.
   
   Again, not asking to do anything here, but I might try to tackle this myself, let me know if you think that's not possible.

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1346,17 +1359,36 @@ private static String lowerCaseOrNull(String input) {
 
       @ProcessElement
       public void processElement(ProcessContext context) throws Exception {
-        String document = context.element();
-        String documentMetadata = getDocumentMetadata(document);
-
-        // index is an insert/upsert and update is a partial update (or insert if not existing)
-        if (spec.getUsePartialUpdate()) {
-          batch.add(
-              String.format(
-                  "{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" : true }%n",
-                  documentMetadata, document));
+        String document = context.element(); // use configuration and auto-generated document IDs
+        String documentMetadata = "{}";
+        boolean isDelete = false;
+        if (spec.getIndexFn() != null || spec.getTypeFn() != null || spec.getIdFn() != null) {
+          // parse once and reused for efficiency
+          JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
+          documentMetadata = getDocumentMetadata(parsedDocument);
+          if (spec.getIsDeleteFn() != null) {
+            isDelete = spec.getIsDeleteFn().apply(parsedDocument);
+            // if it is a delete opration, then it is mandatory to specify the document id using
+            // getIdFn
+            checkArgument(
+                !(isDelete && spec.getIdFn() == null),
+                "Id needs to be specified by withIdFn for delete operation");

Review comment:
       Could we instead just verify that `IdFn` is specified whenever `DeleteFn` is specified? Then we could move this check out to `Write#expand` and raise an exception when constructing the pipeline, rather than when it's executing. WDYT?

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1346,17 +1359,36 @@ private static String lowerCaseOrNull(String input) {
 
       @ProcessElement
       public void processElement(ProcessContext context) throws Exception {
-        String document = context.element();
-        String documentMetadata = getDocumentMetadata(document);
-
-        // index is an insert/upsert and update is a partial update (or insert if not existing)
-        if (spec.getUsePartialUpdate()) {
-          batch.add(
-              String.format(
-                  "{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" : true }%n",
-                  documentMetadata, document));
+        String document = context.element(); // use configuration and auto-generated document IDs
+        String documentMetadata = "{}";
+        boolean isDelete = false;
+        if (spec.getIndexFn() != null || spec.getTypeFn() != null || spec.getIdFn() != null) {
+          // parse once and reused for efficiency
+          JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
+          documentMetadata = getDocumentMetadata(parsedDocument);
+          if (spec.getIsDeleteFn() != null) {
+            isDelete = spec.getIsDeleteFn().apply(parsedDocument);
+            // if it is a delete opration, then it is mandatory to specify the document id using
+            // getIdFn
+            checkArgument(
+                !(isDelete && spec.getIdFn() == null),
+                "Id needs to be specified by withIdFn for delete operation");
+          }
+        }
+
+        if (!isDelete) {
+          // index is an insert/upsert and update is a partial update (or insert if not existing)
+          if (spec.getUsePartialUpdate()) {
+            batch.add(
+                String.format(
+                    "{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" : true }%n",
+                    documentMetadata, document));
+          } else {
+            batch.add(String.format("{ \"index\" : %s }%n%s%n", documentMetadata, document));
+          }
         } else {
-          batch.add(String.format("{ \"index\" : %s }%n%s%n", documentMetadata, document));
+          // delete request used for deleting a document.
+          batch.add(String.format("{ \"delete\" : %s }%n", documentMetadata));

Review comment:
       nit: I'd probably write this the other way so we don't have to reason about the double-negative:
   
   ```java
   if (isDelete) {
     // do a delete
   } else {
     // do an insert/upsert
   }
   ```
   
   But that's a big nit, feel free to leave it this way if you prefer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12670: [BEAM-5757] Add ElasticsearchIO: delete document support

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12670:
URL: https://github.com/apache/beam/pull/12670#issuecomment-683131869


   I can take a look at this


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on pull request #12670: [BEAM-5757] Add ElasticsearchIO: delete document support

Posted by GitBox <gi...@apache.org>.
echauchot commented on pull request #12670:
URL: https://github.com/apache/beam/pull/12670#issuecomment-684611188


   thanks @TheNeuralBit, I lack some time to review it


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit merged pull request #12670: [BEAM-5757] Add ElasticsearchIO: delete document support

Posted by GitBox <gi...@apache.org>.
TheNeuralBit merged pull request #12670:
URL: https://github.com/apache/beam/pull/12670


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org