You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2020/09/03 22:52:21 UTC
[beam] branch master updated: [BEAM-5757] Add ElasticsearchIO:
delete document support (#12670)
This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 89a2d17 [BEAM-5757] Add ElasticsearchIO: delete document support (#12670)
89a2d17 is described below
commit 89a2d17624a5f2f445b7199fe7a61ec0eca8205a
Author: Jithin Sukumar <ji...@gmail.com>
AuthorDate: Fri Sep 4 07:52:02 2020 +0900
[BEAM-5757] Add ElasticsearchIO: delete document support (#12670)
* [BEAM-5757] Add ElasticsearchIO: delete document support
* [BEAM-5757] ElasticsearchIO: fix idFn and deleteFn check
---
.../sdk/io/elasticsearch/ElasticsearchIOIT.java | 34 ++++++++
.../sdk/io/elasticsearch/ElasticsearchIOTest.java | 7 ++
.../sdk/io/elasticsearch/ElasticsearchIOIT.java | 34 ++++++++
.../sdk/io/elasticsearch/ElasticsearchIOTest.java | 7 ++
.../sdk/io/elasticsearch/ElasticsearchIOIT.java | 34 ++++++++
.../sdk/io/elasticsearch/ElasticsearchIOTest.java | 7 ++
.../sdk/io/elasticsearch/ElasticsearchIOIT.java | 34 ++++++++
.../sdk/io/elasticsearch/ElasticsearchIOTest.java | 7 ++
.../elasticsearch/ElasticsearchIOTestCommon.java | 84 +++++++++++++++++++
.../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 94 +++++++++++++++-------
10 files changed, 311 insertions(+), 31 deletions(-)
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index c0ea12c..6c8aa3c 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -140,4 +140,38 @@ public class ElasticsearchIOIT {
elasticsearchIOTestCommonUpdate.setPipeline(pipeline);
elasticsearchIOTestCommonUpdate.testWritePartialUpdate();
}
+
+ /**
+ * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+ * around half of the documents are deleted and the other half is partially updated using bulk
+ * delete request. The test then asserts the documents were deleted successfully.
+ */
+ @Test
+ public void testWriteWithIsDeletedFnWithPartialUpdates() throws Exception {
+ ElasticsearchIOTestUtils.copyIndex(
+ restClient,
+ readConnectionConfiguration.getIndex(),
+ updateConnectionConfiguration.getIndex());
+ ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+ new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+ elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+ elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithPartialUpdates();
+ }
+
+ /**
+ * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+ * around half of the documents are deleted using bulk delete request. The test then asserts the
+ * documents were deleted successfully.
+ */
+ @Test
+ public void testWriteWithIsDeletedFnWithoutPartialUpdate() throws Exception {
+ ElasticsearchIOTestUtils.copyIndex(
+ restClient,
+ readConnectionConfiguration.getIndex(),
+ updateConnectionConfiguration.getIndex());
+ ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+ new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+ elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+ elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithoutPartialUpdate();
+ }
}
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 89d0b28..d1b43f0 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -224,4 +224,11 @@ public class ElasticsearchIOTest implements Serializable {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteRetryValidRequest();
}
+
+ @Test
+ public void testWriteWithIsDeleteFn() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithPartialUpdates();
+ elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithoutPartialUpdate();
+ }
}
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index cf52282..c9032dd 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -144,4 +144,38 @@ public class ElasticsearchIOIT {
elasticsearchIOTestCommonUpdate.setPipeline(pipeline);
elasticsearchIOTestCommonUpdate.testWritePartialUpdate();
}
+
+ /**
+ * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+ * around half of the documents are deleted and the other half is partially updated using bulk
+ * delete request. The test then asserts the documents were deleted successfully.
+ */
+ @Test
+ public void testWriteWithIsDeletedFnWithPartialUpdates() throws Exception {
+ ElasticsearchIOTestUtils.copyIndex(
+ restClient,
+ readConnectionConfiguration.getIndex(),
+ updateConnectionConfiguration.getIndex());
+ ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+ new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+ elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+ elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithPartialUpdates();
+ }
+
+ /**
+ * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+ * around half of the documents are deleted using bulk delete request. The test then asserts the
+ * documents were deleted successfully.
+ */
+ @Test
+ public void testWriteWithIsDeletedFnWithoutPartialUpdate() throws Exception {
+ ElasticsearchIOTestUtils.copyIndex(
+ restClient,
+ readConnectionConfiguration.getIndex(),
+ updateConnectionConfiguration.getIndex());
+ ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+ new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+ elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+ elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithoutPartialUpdate();
+ }
}
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 4f2fc28..e675d43 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -222,4 +222,11 @@ public class ElasticsearchIOTest extends ESIntegTestCase implements Serializable
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteRetryValidRequest();
}
+
+ @Test
+ public void testWriteWithIsDeleteFn() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithPartialUpdates();
+ elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithoutPartialUpdate();
+ }
}
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index 9629440..f9ae9f6 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -144,4 +144,38 @@ public class ElasticsearchIOIT {
elasticsearchIOTestCommonUpdate.setPipeline(pipeline);
elasticsearchIOTestCommonUpdate.testWritePartialUpdate();
}
+
+ /**
+ * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+ * around half of the documents are deleted and the other half is partially updated using bulk
+ * delete request. The test then asserts the documents were deleted successfully.
+ */
+ @Test
+ public void testWriteWithIsDeletedFnWithPartialUpdates() throws Exception {
+ ElasticsearchIOTestUtils.copyIndex(
+ restClient,
+ readConnectionConfiguration.getIndex(),
+ updateConnectionConfiguration.getIndex());
+ ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+ new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+ elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+ elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithPartialUpdates();
+ }
+
+ /**
+ * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+ * around half of the documents are deleted using bulk delete request. The test then asserts the
+ * documents were deleted successfully.
+ */
+ @Test
+ public void testWriteWithIsDeletedFnWithoutPartialUpdate() throws Exception {
+ ElasticsearchIOTestUtils.copyIndex(
+ restClient,
+ readConnectionConfiguration.getIndex(),
+ updateConnectionConfiguration.getIndex());
+ ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+ new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+ elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+ elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithoutPartialUpdate();
+ }
}
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 279fa1e..0de7398 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -216,4 +216,11 @@ public class ElasticsearchIOTest extends ESIntegTestCase implements Serializable
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteRetryValidRequest();
}
+
+ @Test
+ public void testWriteWithIsDeleteFn() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithPartialUpdates();
+ elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithoutPartialUpdate();
+ }
}
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index 42ae6d5..ce4a041 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -144,4 +144,38 @@ public class ElasticsearchIOIT {
elasticsearchIOTestCommonUpdate.setPipeline(pipeline);
elasticsearchIOTestCommonUpdate.testWritePartialUpdate();
}
+
+ /**
+ * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+ * around half of the documents are deleted and the other half is partially updated using bulk
+ * delete request. The test then asserts the documents were deleted successfully.
+ */
+ @Test
+ public void testWriteWithIsDeletedFnWithPartialUpdates() throws Exception {
+ ElasticsearchIOTestUtils.copyIndex(
+ restClient,
+ readConnectionConfiguration.getIndex(),
+ updateConnectionConfiguration.getIndex());
+ ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+ new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+ elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+ elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithPartialUpdates();
+ }
+
+ /**
+ * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+ * around half of the documents are deleted using bulk delete request. The test then asserts the
+ * documents were deleted successfully.
+ */
+ @Test
+ public void testWriteWithIsDeletedFnWithoutPartialUpdate() throws Exception {
+ ElasticsearchIOTestUtils.copyIndex(
+ restClient,
+ readConnectionConfiguration.getIndex(),
+ updateConnectionConfiguration.getIndex());
+ ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+ new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+ elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+ elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithoutPartialUpdate();
+ }
}
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 2ccbda3..e9dceb7 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-7/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -220,4 +220,11 @@ public class ElasticsearchIOTest extends ESIntegTestCase implements Serializable
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteRetryValidRequest();
}
+
+ @Test
+ public void testWriteWithIsDeleteFn() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithPartialUpdates();
+ elasticsearchIOTestCommon.testWriteWithIsDeletedFnWithoutPartialUpdate();
+ }
}
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index de1f8b0..a7b2ca4 100644
--- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -663,4 +663,88 @@ class ElasticsearchIOTestCommon implements Serializable {
int count = countByScientistName(connectionConfiguration, restClient, "Einstein");
assertEquals(numDocs / NUM_SCIENTISTS, count);
}
+
+ /**
+ * Tests deletion of documents from Elasticsearch index. Documents with odd integer as id are
+ * deleted and those with even integer are partially updated. Documents to be deleted needs to
+ * have criteria within the fields of the document.
+ */
+ void testWriteWithIsDeletedFnWithPartialUpdates() throws Exception {
+ if (!useAsITests) {
+ ElasticsearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient);
+ }
+
+ long currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+ assertEquals(numDocs, currentNumDocs);
+
+ // partial documents containing the ID and group only
+ List<String> data = new ArrayList<>();
+ for (int i = 0; i < numDocs; i++) {
+ // Scientist names at odd index to be deleted.
+ data.add(String.format("{\"id\" : %s, \"is_deleted\": %b}", i, i % 2 == 1));
+ }
+
+ pipeline
+ .apply(Create.of(data))
+ .apply(
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withIdFn(new ExtractValueFn("id"))
+ .withUsePartialUpdate(true)
+ .withIsDeleteFn(doc -> doc.get("is_deleted").asBoolean()));
+ pipeline.run();
+
+ currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+
+ // check we have not unwittingly modified existing behaviour
+ assertEquals(
+ numDocs / NUM_SCIENTISTS,
+ countByScientistName(connectionConfiguration, restClient, "Einstein"));
+
+ // Check if documents are deleted as expected
+ assertEquals(numDocs / 2, currentNumDocs);
+ assertEquals(0, countByScientistName(connectionConfiguration, restClient, "Darwin"));
+ }
+
+ /**
+ * Tests deletion of documents from Elasticsearch index. Documents with odd integer as id are
+ * deleted. Documents to be deleted needs to have criteria within the fields of the document.
+ */
+ void testWriteWithIsDeletedFnWithoutPartialUpdate() throws Exception {
+ if (!useAsITests) {
+ ElasticsearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient);
+ }
+
+ long currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+ assertEquals(numDocs, currentNumDocs);
+
+ // partial documents containing the ID and group only
+ List<String> data = new ArrayList<>();
+ for (int i = 0; i < numDocs; i++) {
+ // Scientist names at odd index to be deleted.
+ if (i % 2 == 1) {
+ data.add(String.format("{\"id\" : %s, \"is_deleted\": %b}", i, true));
+ }
+ }
+
+ pipeline
+ .apply(Create.of(data))
+ .apply(
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withIdFn(new ExtractValueFn("id"))
+ .withIsDeleteFn(doc -> doc.get("is_deleted").asBoolean()));
+ pipeline.run();
+
+ currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+
+ // check we have not unwittingly modified existing behaviour
+ assertEquals(
+ numDocs / NUM_SCIENTISTS,
+ countByScientistName(connectionConfiguration, restClient, "Einstein"));
+
+ // Check if documents are deleted as expected
+ assertEquals(numDocs / 2, currentNumDocs);
+ assertEquals(0, countByScientistName(connectionConfiguration, restClient, "Darwin"));
+ }
}
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index a5397fb..a9887c6 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -1039,6 +1039,8 @@ public class ElasticsearchIO {
*/
public interface FieldValueExtractFn extends SerializableFunction<JsonNode, String> {}
+ public interface BooleanFieldValueExtractFn extends SerializableFunction<JsonNode, Boolean> {}
+
abstract @Nullable ConnectionConfiguration getConnectionConfiguration();
abstract long getMaxBatchSize();
@@ -1055,6 +1057,8 @@ public class ElasticsearchIO {
abstract boolean getUsePartialUpdate();
+ abstract @Nullable BooleanFieldValueExtractFn getIsDeleteFn();
+
abstract Builder builder();
@AutoValue.Builder
@@ -1075,6 +1079,8 @@ public class ElasticsearchIO {
abstract Builder setRetryConfiguration(RetryConfiguration retryConfiguration);
+ abstract Builder setIsDeleteFn(BooleanFieldValueExtractFn isDeleteFn);
+
abstract Write build();
}
@@ -1201,10 +1207,30 @@ public class ElasticsearchIO {
return builder().setRetryConfiguration(retryConfiguration).build();
}
+ /**
+ * Provide a function to extract the target operation either upsert or delete from the document
+ * fields allowing dynamic bulk operation decision. While using withIsDeleteFn, it should be
+ * taken care that the document's id extraction is defined using the withIdFn function or else
+ * IllegalArgumentException is thrown. Should the function throw an Exception then the batch
+ * will fail and the exception propagated.
+ *
+ * @param isDeleteFn set to true for deleting the specific document
+ * @return the {@link Write} with the function set
+ */
+ public Write withIsDeleteFn(BooleanFieldValueExtractFn isDeleteFn) {
+ checkArgument(isDeleteFn != null, "deleteFn is required");
+ return builder().setIsDeleteFn(isDeleteFn).build();
+ }
+
@Override
public PDone expand(PCollection<String> input) {
ConnectionConfiguration connectionConfiguration = getConnectionConfiguration();
+ FieldValueExtractFn idFn = getIdFn();
+ BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
+ checkArgument(
+ isDeleteFn == null || idFn != null,
+ "Id needs to be specified by withIdFn for delete operation");
input.apply(ParDo.of(new WriteFn(this)));
return PDone.in(input.getPipeline());
}
@@ -1313,31 +1339,23 @@ public class ElasticsearchIO {
/**
* Extracts the components that comprise the document address from the document using the
* {@link FieldValueExtractFn} configured. This allows any or all of the index, type and
- * document id to be controlled on a per document basis. If none are provided then an empty
- * default of {@code {}} is returned. Sanitization of the index is performed, automatically
- * lower-casing the value as required by Elasticsearch.
+ * document id to be controlled on a per document basis. Sanitization of the index is
+ * performed, automatically lower-casing the value as required by Elasticsearch.
*
- * @param document the json from which the index, type and id may be extracted
+ * @param parsedDocument the json from which the index, type and id may be extracted
* @return the document address as JSON or the default
* @throws IOException if the document cannot be parsed as JSON
*/
- private String getDocumentMetadata(String document) throws IOException {
- if (spec.getIndexFn() != null || spec.getTypeFn() != null || spec.getIdFn() != null) {
- // parse once and reused for efficiency
- JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
-
- DocumentMetadata metadata =
- new DocumentMetadata(
- spec.getIndexFn() != null
- ? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument))
- : null,
- spec.getTypeFn() != null ? spec.getTypeFn().apply(parsedDocument) : null,
- spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument) : null,
- spec.getUsePartialUpdate() ? DEFAULT_RETRY_ON_CONFLICT : null);
- return OBJECT_MAPPER.writeValueAsString(metadata);
- } else {
- return "{}"; // use configuration and auto-generated document IDs
- }
+ private String getDocumentMetadata(JsonNode parsedDocument) throws IOException {
+ DocumentMetadata metadata =
+ new DocumentMetadata(
+ spec.getIndexFn() != null
+ ? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument))
+ : null,
+ spec.getTypeFn() != null ? spec.getTypeFn().apply(parsedDocument) : null,
+ spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument) : null,
+ spec.getUsePartialUpdate() ? DEFAULT_RETRY_ON_CONFLICT : null);
+ return OBJECT_MAPPER.writeValueAsString(metadata);
}
private static String lowerCaseOrNull(String input) {
@@ -1346,17 +1364,31 @@ public class ElasticsearchIO {
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
- String document = context.element();
- String documentMetadata = getDocumentMetadata(document);
-
- // index is an insert/upsert and update is a partial update (or insert if not existing)
- if (spec.getUsePartialUpdate()) {
- batch.add(
- String.format(
- "{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" : true }%n",
- documentMetadata, document));
+ String document = context.element(); // use configuration and auto-generated document IDs
+ String documentMetadata = "{}";
+ boolean isDelete = false;
+ if (spec.getIndexFn() != null || spec.getTypeFn() != null || spec.getIdFn() != null) {
+ // parse once and reused for efficiency
+ JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
+ documentMetadata = getDocumentMetadata(parsedDocument);
+ if (spec.getIsDeleteFn() != null) {
+ isDelete = spec.getIsDeleteFn().apply(parsedDocument);
+ }
+ }
+
+ if (isDelete) {
+ // delete request used for deleting a document.
+ batch.add(String.format("{ \"delete\" : %s }%n", documentMetadata));
} else {
- batch.add(String.format("{ \"index\" : %s }%n%s%n", documentMetadata, document));
+ // index is an insert/upsert and update is a partial update (or insert if not existing)
+ if (spec.getUsePartialUpdate()) {
+ batch.add(
+ String.format(
+ "{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" : true }%n",
+ documentMetadata, document));
+ } else {
+ batch.add(String.format("{ \"index\" : %s }%n%s%n", documentMetadata, document));
+ }
}
currentBatchSizeBytes += document.getBytes(StandardCharsets.UTF_8).length;