You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2020/09/03 22:52:21 UTC

[beam] branch master updated: [BEAM-5757] Add ElasticsearchIO: delete document support (#12670)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 89a2d17  [BEAM-5757] Add ElasticsearchIO: delete document support (#12670)
89a2d17 is described below

commit 89a2d17624a5f2f445b7199fe7a61ec0eca8205a
Author: Jithin Sukumar <ji...@gmail.com>
AuthorDate: Fri Sep 4 07:52:02 2020 +0900

    [BEAM-5757] Add ElasticsearchIO: delete document support (#12670)
    
    * [BEAM-5757] Add ElasticsearchIO: delete document support
    
    * [BEAM-5757] ElasticsearchIO: fix idFn and deleteFn check
---
 .../sdk/io/elasticsearch/ElasticsearchIOIT.java    | 34 ++++++++
 .../sdk/io/elasticsearch/ElasticsearchIOTest.java  |  7 ++
 .../sdk/io/elasticsearch/ElasticsearchIOIT.java    | 34 ++++++++
 .../sdk/io/elasticsearch/ElasticsearchIOTest.java  |  7 ++
 .../sdk/io/elasticsearch/ElasticsearchIOIT.java    | 34 ++++++++
 .../sdk/io/elasticsearch/ElasticsearchIOTest.java  |  7 ++
 .../sdk/io/elasticsearch/ElasticsearchIOIT.java    | 34 ++++++++
 .../sdk/io/elasticsearch/ElasticsearchIOTest.java  |  7 ++
 .../elasticsearch/ElasticsearchIOTestCommon.java   | 84 +++++++++++++++++++
 .../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 94 +++++++++++++++-------
 10 files changed, 311 insertions(+), 31 deletions(-)

diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index c0ea12c..6c8aa3c 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -140,4 +140,38 @@ public class ElasticsearchIOIT {
     elasticsearchIOTestCommonUpdate.setPipeline(pipeline);
     elasticsearchIOTestCommonUpdate.testWritePartialUpdate();
   }
+
+  /**
+   * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+   * around half of the documents are deleted and the other half is partially updated using bulk
+   * delete request. The test then asserts the documents were deleted successfully.
+   */
+  @Test
+  public void testWriteWithIsDeletedFnWithPartialUpdates() throws Exception {
+    ElasticsearchIOTestUtils.copyIndex(
+        restClient,
+        readConnectionConfiguration.getIndex(),
+        updateConnectionConfiguration.getIndex());
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+        new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+    elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+    elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithPartialUpdates();
+  }
+
+  /**
+   * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+   * around half of the documents are deleted using bulk delete request. The test then asserts the
+   * documents were deleted successfully.
+   */
+  @Test
+  public void testWriteWithIsDeletedFnWithoutPartialUpdate() throws Exception {
+    ElasticsearchIOTestUtils.copyIndex(
+        restClient,
+        readConnectionConfiguration.getIndex(),
+        updateConnectionConfiguration.getIndex());
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+        new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+    elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+    elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithoutPartialUpdate();
+  }
 }
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 89d0b28..d1b43f0 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -224,4 +224,11 @@ public class ElasticsearchIOTest implements Serializable {
     elasticsearchIOTestCommon.setPipeline(pipeline);
     elasticsearchIOTestCommon.testWriteRetryValidRequest();
   }
+
+  @Test
+  public void testWriteWithIsDeleteFn() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithPartialUpdates();
+    elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithoutPartialUpdate();
+  }
 }
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index cf52282..c9032dd 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -144,4 +144,38 @@ public class ElasticsearchIOIT {
     elasticsearchIOTestCommonUpdate.setPipeline(pipeline);
     elasticsearchIOTestCommonUpdate.testWritePartialUpdate();
   }
+
+  /**
+   * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+   * around half of the documents are deleted and the other half is partially updated using bulk
+   * delete request. The test then asserts the documents were deleted successfully.
+   */
+  @Test
+  public void testWriteWithIsDeletedFnWithPartialUpdates() throws Exception {
+    ElasticsearchIOTestUtils.copyIndex(
+        restClient,
+        readConnectionConfiguration.getIndex(),
+        updateConnectionConfiguration.getIndex());
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+        new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+    elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+    elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithPartialUpdates();
+  }
+
+  /**
+   * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+   * around half of the documents are deleted using bulk delete request. The test then asserts the
+   * documents were deleted successfully.
+   */
+  @Test
+  public void testWriteWithIsDeletedFnWithoutPartialUpdate() throws Exception {
+    ElasticsearchIOTestUtils.copyIndex(
+        restClient,
+        readConnectionConfiguration.getIndex(),
+        updateConnectionConfiguration.getIndex());
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+        new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+    elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+    elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithoutPartialUpdate();
+  }
 }
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 4f2fc28..e675d43 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -222,4 +222,11 @@ public class ElasticsearchIOTest extends ESIntegTestCase implements Serializable
     elasticsearchIOTestCommon.setPipeline(pipeline);
     elasticsearchIOTestCommon.testWriteRetryValidRequest();
   }
+
+  @Test
+  public void testWriteWithIsDeleteFn() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithPartialUpdates();
+    elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithoutPartialUpdate();
+  }
 }
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index 9629440..f9ae9f6 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -144,4 +144,38 @@ public class ElasticsearchIOIT {
     elasticsearchIOTestCommonUpdate.setPipeline(pipeline);
     elasticsearchIOTestCommonUpdate.testWritePartialUpdate();
   }
+
+  /**
+   * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+   * around half of the documents are deleted and the other half is partially updated using bulk
+   * delete request. The test then asserts the documents were deleted successfully.
+   */
+  @Test
+  public void testWriteWithIsDeletedFnWithPartialUpdates() throws Exception {
+    ElasticsearchIOTestUtils.copyIndex(
+        restClient,
+        readConnectionConfiguration.getIndex(),
+        updateConnectionConfiguration.getIndex());
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+        new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+    elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+    elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithPartialUpdates();
+  }
+
+  /**
+   * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+   * around half of the documents are deleted using bulk delete request. The test then asserts the
+   * documents were deleted successfully.
+   */
+  @Test
+  public void testWriteWithIsDeletedFnWithoutPartialUpdate() throws Exception {
+    ElasticsearchIOTestUtils.copyIndex(
+        restClient,
+        readConnectionConfiguration.getIndex(),
+        updateConnectionConfiguration.getIndex());
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+        new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+    elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+    elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithoutPartialUpdate();
+  }
 }
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 279fa1e..0de7398 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -216,4 +216,11 @@ public class ElasticsearchIOTest extends ESIntegTestCase implements Serializable
     elasticsearchIOTestCommon.setPipeline(pipeline);
     elasticsearchIOTestCommon.testWriteRetryValidRequest();
   }
+
+  @Test
+  public void testWriteWithIsDeleteFn() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithPartialUpdates();
+    elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithoutPartialUpdate();
+  }
 }
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index 42ae6d5..ce4a041 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -144,4 +144,38 @@ public class ElasticsearchIOIT {
     elasticsearchIOTestCommonUpdate.setPipeline(pipeline);
     elasticsearchIOTestCommonUpdate.testWritePartialUpdate();
   }
+
+  /**
+   * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+   * around half of the documents are deleted and the other half is partially updated using bulk
+   * delete request. The test then asserts the documents were deleted successfully.
+   */
+  @Test
+  public void testWriteWithIsDeletedFnWithPartialUpdates() throws Exception {
+    ElasticsearchIOTestUtils.copyIndex(
+        restClient,
+        readConnectionConfiguration.getIndex(),
+        updateConnectionConfiguration.getIndex());
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+        new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+    elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+    elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithPartialUpdates();
+  }
+
+  /**
+   * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+   * around half of the documents are deleted using bulk delete request. The test then asserts the
+   * documents were deleted successfully.
+   */
+  @Test
+  public void testWriteWithIsDeletedFnWithoutPartialUpdate() throws Exception {
+    ElasticsearchIOTestUtils.copyIndex(
+        restClient,
+        readConnectionConfiguration.getIndex(),
+        updateConnectionConfiguration.getIndex());
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+        new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+    elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+    elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithoutPartialUpdate();
+  }
 }
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 2ccbda3..e9dceb7 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -220,4 +220,11 @@ public class ElasticsearchIOTest extends ESIntegTestCase implements Serializable
     elasticsearchIOTestCommon.setPipeline(pipeline);
     elasticsearchIOTestCommon.testWriteRetryValidRequest();
   }
+
+  @Test
+  public void testWriteWithIsDeleteFn() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithPartialUpdates();
+    elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithoutPartialUpdate();
+  }
 }
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index de1f8b0..a7b2ca4 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -663,4 +663,88 @@ class ElasticsearchIOTestCommon implements Serializable {
     int count = countByScientistName(connectionConfiguration, restClient, "Einstein");
     assertEquals(numDocs / NUM_SCIENTISTS, count);
   }
+
+  /**
+   * Tests deletion of documents from Elasticsearch index. Documents with odd integer as id are
+   * deleted and those with even integer are partially updated. Documents to be deleted needs to
+   * have criteria within the fields of the document.
+   */
+  void testWriteWithIsDeletedFnWithPartialUpdates() throws Exception {
+    if (!useAsITests) {
+      ElasticsearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient);
+    }
+
+    long currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+    assertEquals(numDocs, currentNumDocs);
+
+    // partial documents containing the ID and group only
+    List<String> data = new ArrayList<>();
+    for (int i = 0; i < numDocs; i++) {
+      // Scientist names at odd index to be deleted.
+      data.add(String.format("{\"id\" : %s, \"is_deleted\": %b}", i, i % 2 == 1));
+    }
+
+    pipeline
+        .apply(Create.of(data))
+        .apply(
+            ElasticsearchIO.write()
+                .withConnectionConfiguration(connectionConfiguration)
+                .withIdFn(new ExtractValueFn("id"))
+                .withUsePartialUpdate(true)
+                .withIsDeleteFn(doc -> doc.get("is_deleted").asBoolean()));
+    pipeline.run();
+
+    currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+
+    // check we have not unwittingly modified existing behaviour
+    assertEquals(
+        numDocs / NUM_SCIENTISTS,
+        countByScientistName(connectionConfiguration, restClient, "Einstein"));
+
+    // Check if documents are deleted as expected
+    assertEquals(numDocs / 2, currentNumDocs);
+    assertEquals(0, countByScientistName(connectionConfiguration, restClient, "Darwin"));
+  }
+
+  /**
+   * Tests deletion of documents from Elasticsearch index. Documents with odd integer as id are
+   * deleted. Documents to be deleted needs to have criteria within the fields of the document.
+   */
+  void testWriteWithIsDeletedFnWithoutPartialUpdate() throws Exception {
+    if (!useAsITests) {
+      ElasticsearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient);
+    }
+
+    long currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+    assertEquals(numDocs, currentNumDocs);
+
+    // partial documents containing the ID and group only
+    List<String> data = new ArrayList<>();
+    for (int i = 0; i < numDocs; i++) {
+      // Scientist names at odd index to be deleted.
+      if (i % 2 == 1) {
+        data.add(String.format("{\"id\" : %s, \"is_deleted\": %b}", i, true));
+      }
+    }
+
+    pipeline
+        .apply(Create.of(data))
+        .apply(
+            ElasticsearchIO.write()
+                .withConnectionConfiguration(connectionConfiguration)
+                .withIdFn(new ExtractValueFn("id"))
+                .withIsDeleteFn(doc -> doc.get("is_deleted").asBoolean()));
+    pipeline.run();
+
+    currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+
+    // check we have not unwittingly modified existing behaviour
+    assertEquals(
+        numDocs / NUM_SCIENTISTS,
+        countByScientistName(connectionConfiguration, restClient, "Einstein"));
+
+    // Check if documents are deleted as expected
+    assertEquals(numDocs / 2, currentNumDocs);
+    assertEquals(0, countByScientistName(connectionConfiguration, restClient, "Darwin"));
+  }
 }
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index a5397fb..a9887c6 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -1039,6 +1039,8 @@ public class ElasticsearchIO {
      */
     public interface FieldValueExtractFn extends SerializableFunction<JsonNode, String> {}
 
+    public interface BooleanFieldValueExtractFn extends SerializableFunction<JsonNode, Boolean> {}
+
     abstract @Nullable ConnectionConfiguration getConnectionConfiguration();
 
     abstract long getMaxBatchSize();
@@ -1055,6 +1057,8 @@ public class ElasticsearchIO {
 
     abstract boolean getUsePartialUpdate();
 
+    abstract @Nullable BooleanFieldValueExtractFn getIsDeleteFn();
+
     abstract Builder builder();
 
     @AutoValue.Builder
@@ -1075,6 +1079,8 @@ public class ElasticsearchIO {
 
       abstract Builder setRetryConfiguration(RetryConfiguration retryConfiguration);
 
+      abstract Builder setIsDeleteFn(BooleanFieldValueExtractFn isDeleteFn);
+
       abstract Write build();
     }
 
@@ -1201,10 +1207,30 @@ public class ElasticsearchIO {
       return builder().setRetryConfiguration(retryConfiguration).build();
     }
 
+    /**
+     * Provide a function to extract the target operation either upsert or delete from the document
+     * fields allowing dynamic bulk operation decision. While using withIsDeleteFn, it should be
+     * taken care that the document's id extraction is defined using the withIdFn function or else
+     * IllegalArgumentException is thrown. Should the function throw an Exception then the batch
+     * will fail and the exception propagated.
+     *
+     * @param isDeleteFn set to true for deleting the specific document
+     * @return the {@link Write} with the function set
+     */
+    public Write withIsDeleteFn(BooleanFieldValueExtractFn isDeleteFn) {
+      checkArgument(isDeleteFn != null, "deleteFn is required");
+      return builder().setIsDeleteFn(isDeleteFn).build();
+    }
+
     @Override
     public PDone expand(PCollection<String> input) {
       ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
+      FieldValueExtractFn idFn = getIdFn();
+      BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
       checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
+      checkArgument(
+          isDeleteFn == null || idFn != null,
+          "Id needs to be specified by withIdFn for delete operation");
       input.apply(ParDo.of(new WriteFn(this)));
       return PDone.in(input.getPipeline());
     }
@@ -1313,31 +1339,23 @@ public class ElasticsearchIO {
       /**
        * Extracts the components that comprise the document address from the document using the
        * {@link FieldValueExtractFn} configured. This allows any or all of the index, type and
-       * document id to be controlled on a per document basis. If none are provided then an empty
-       * default of {@code {}} is returned. Sanitization of the index is performed, automatically
-       * lower-casing the value as required by Elasticsearch.
+       * document id to be controlled on a per document basis. Sanitization of the index is
+       * performed, automatically lower-casing the value as required by Elasticsearch.
        *
-       * @param document the json from which the index, type and id may be extracted
+       * @param parsedDocument the json from which the index, type and id may be extracted
        * @return the document address as JSON or the default
        * @throws IOException if the document cannot be parsed as JSON
        */
-      private String getDocumentMetadata(String document) throws IOException {
-        if (spec.getIndexFn() != null || spec.getTypeFn() != null || spec.getIdFn() != null) {
-          // parse once and reused for efficiency
-          JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
-
-          DocumentMetadata metadata =
-              new DocumentMetadata(
-                  spec.getIndexFn() != null
-                      ? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument))
-                      : null,
-                  spec.getTypeFn() != null ? spec.getTypeFn().apply(parsedDocument) : null,
-                  spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument) : null,
-                  spec.getUsePartialUpdate() ? DEFAULT_RETRY_ON_CONFLICT : null);
-          return OBJECT_MAPPER.writeValueAsString(metadata);
-        } else {
-          return "{}"; // use configuration and auto-generated document IDs
-        }
+      private String getDocumentMetadata(JsonNode parsedDocument) throws IOException {
+        DocumentMetadata metadata =
+            new DocumentMetadata(
+                spec.getIndexFn() != null
+                    ? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument))
+                    : null,
+                spec.getTypeFn() != null ? spec.getTypeFn().apply(parsedDocument) : null,
+                spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument) : null,
+                spec.getUsePartialUpdate() ? DEFAULT_RETRY_ON_CONFLICT : null);
+        return OBJECT_MAPPER.writeValueAsString(metadata);
       }
 
       private static String lowerCaseOrNull(String input) {
@@ -1346,17 +1364,31 @@ public class ElasticsearchIO {
 
       @ProcessElement
       public void processElement(ProcessContext context) throws Exception {
-        String document = context.element();
-        String documentMetadata = getDocumentMetadata(document);
-
-        // index is an insert/upsert and update is a partial update (or insert if not existing)
-        if (spec.getUsePartialUpdate()) {
-          batch.add(
-              String.format(
-                  "{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" : true }%n",
-                  documentMetadata, document));
+        String document = context.element(); // use configuration and auto-generated document IDs
+        String documentMetadata = "{}";
+        boolean isDelete = false;
+        if (spec.getIndexFn() != null || spec.getTypeFn() != null || spec.getIdFn() != null) {
+          // parse once and reused for efficiency
+          JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
+          documentMetadata = getDocumentMetadata(parsedDocument);
+          if (spec.getIsDeleteFn() != null) {
+            isDelete = spec.getIsDeleteFn().apply(parsedDocument);
+          }
+        }
+
+        if (isDelete) {
+          // delete request used for deleting a document.
+          batch.add(String.format("{ \"delete\" : %s }%n", documentMetadata));
         } else {
-          batch.add(String.format("{ \"index\" : %s }%n%s%n", documentMetadata, document));
+          // index is an insert/upsert and update is a partial update (or insert if not existing)
+          if (spec.getUsePartialUpdate()) {
+            batch.add(
+                String.format(
+                    "{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" : true }%n",
+                    documentMetadata, document));
+          } else {
+            batch.add(String.format("{ \"index\" : %s }%n%s%n", documentMetadata, document));
+          }
         }
 
         currentBatchSizeBytes += document.getBytes(StandardCharsets.UTF_8).length;