You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/09/03 00:54:40 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #12670: [BEAM-5757] Add ElasticsearchIO: delete document support

TheNeuralBit commented on a change in pull request #12670:
URL: https://github.com/apache/beam/pull/12670#discussion_r482613805



##########
File path: sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
##########
@@ -140,4 +140,38 @@ public void testWritePartialUpdate() throws Exception {
     elasticsearchIOTestCommonUpdate.setPipeline(pipeline);
     elasticsearchIOTestCommonUpdate.testWritePartialUpdate();
   }
+
+  /**
+   * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+   * around half of the documents are deleted and the other half is partially updated using bulk
+   * delete request. The test then asserts the documents were deleted successfully.
+   */
+  @Test
+  public void testWriteWithIsDeletedFnWithPartialUpdates() throws Exception {
+    ElasticsearchIOTestUtils.copyIndex(
+        restClient,
+        readConnectionConfiguration.getIndex(),
+        updateConnectionConfiguration.getIndex());
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+        new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+    elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+    elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithPartialUpdates();
+  }
+
+  /**
+   * This test verifies volume deletes of Elasticsearch. The test dataset index is cloned and then
+   * around half of the documents are deleted using bulk delete request. The test then asserts the
+   * documents were deleted successfully.
+   */
+  @Test
+  public void testWriteWithIsDeletedFnWithoutPartialUpdate() throws Exception {
+    ElasticsearchIOTestUtils.copyIndex(
+        restClient,
+        readConnectionConfiguration.getIndex(),
+        updateConnectionConfiguration.getIndex());
+    ElasticsearchIOTestCommon elasticsearchIOTestCommonDeleteFn =
+        new ElasticsearchIOTestCommon(updateConnectionConfiguration, restClient, true);
+    elasticsearchIOTestCommonDeleteFn.setPipeline(pipeline);
+    elasticsearchIOTestCommonDeleteFn.testWriteWithIsDeletedFnWithoutPartialUpdate();
+  }

Review comment:
       Not really your problem but this is my first time looking at these tests, and this structure tests seems really odd to me. It looks like we could get rid of a lof of duplicate code if we just implemented both `ElasticsearchIOIT` and `ElasticsearchIOTest`  in elasticsearch common, then the implementations for each version could inherit from those with an (almost) empty implementation.
   
   Again, not asking to do anything here, but I might try to tackle this myself, let me know if you think that's not possible.

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1346,17 +1359,36 @@ private static String lowerCaseOrNull(String input) {
 
       @ProcessElement
       public void processElement(ProcessContext context) throws Exception {
-        String document = context.element();
-        String documentMetadata = getDocumentMetadata(document);
-
-        // index is an insert/upsert and update is a partial update (or insert if not existing)
-        if (spec.getUsePartialUpdate()) {
-          batch.add(
-              String.format(
-                  "{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" : true }%n",
-                  documentMetadata, document));
+        String document = context.element(); // use configuration and auto-generated document IDs
+        String documentMetadata = "{}";
+        boolean isDelete = false;
+        if (spec.getIndexFn() != null || spec.getTypeFn() != null || spec.getIdFn() != null) {
+          // parse once and reused for efficiency
+          JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
+          documentMetadata = getDocumentMetadata(parsedDocument);
+          if (spec.getIsDeleteFn() != null) {
+            isDelete = spec.getIsDeleteFn().apply(parsedDocument);
+            // if it is a delete opration, then it is mandatory to specify the document id using
+            // getIdFn
+            checkArgument(
+                !(isDelete && spec.getIdFn() == null),
+                "Id needs to be specified by withIdFn for delete operation");

Review comment:
       Could we instead just verify that `IdFn` is specified whenever `DeleteFn` is specified? Then we could move this check out to `Write#expand` and raise an exception when constructing the pipeline, rather than when it's executing. WDYT?

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1346,17 +1359,36 @@ private static String lowerCaseOrNull(String input) {
 
       @ProcessElement
       public void processElement(ProcessContext context) throws Exception {
-        String document = context.element();
-        String documentMetadata = getDocumentMetadata(document);
-
-        // index is an insert/upsert and update is a partial update (or insert if not existing)
-        if (spec.getUsePartialUpdate()) {
-          batch.add(
-              String.format(
-                  "{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" : true }%n",
-                  documentMetadata, document));
+        String document = context.element(); // use configuration and auto-generated document IDs
+        String documentMetadata = "{}";
+        boolean isDelete = false;
+        if (spec.getIndexFn() != null || spec.getTypeFn() != null || spec.getIdFn() != null) {
+          // parse once and reused for efficiency
+          JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
+          documentMetadata = getDocumentMetadata(parsedDocument);
+          if (spec.getIsDeleteFn() != null) {
+            isDelete = spec.getIsDeleteFn().apply(parsedDocument);
+            // if it is a delete opration, then it is mandatory to specify the document id using
+            // getIdFn
+            checkArgument(
+                !(isDelete && spec.getIdFn() == null),
+                "Id needs to be specified by withIdFn for delete operation");
+          }
+        }
+
+        if (!isDelete) {
+          // index is an insert/upsert and update is a partial update (or insert if not existing)
+          if (spec.getUsePartialUpdate()) {
+            batch.add(
+                String.format(
+                    "{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" : true }%n",
+                    documentMetadata, document));
+          } else {
+            batch.add(String.format("{ \"index\" : %s }%n%s%n", documentMetadata, document));
+          }
         } else {
-          batch.add(String.format("{ \"index\" : %s }%n%s%n", documentMetadata, document));
+          // delete request used for deleting a document.
+          batch.add(String.format("{ \"delete\" : %s }%n", documentMetadata));

Review comment:
       nit: I'd probably write this the other way so we don't have to reason about the double-negative:
   
   ```java
   if (isDelete) {
     // do a delete
   } else {
     // do an insert/upsert
   }
   ```
   
   But that's a big nit, feel free to leave it this way if you prefer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org