You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/16 09:37:22 UTC

[GitHub] [beam] je-ik commented on a change in pull request #17097: [BEAM-14064] ElasticsearchIO remove bundle based

je-ik commented on a change in pull request #17097:
URL: https://github.com/apache/beam/pull/17097#discussion_r827786188



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -221,6 +221,31 @@ private GroupIntoBatches(BatchingParams<InputT> params) {
             duration));
   }
 
+  public GroupIntoBatches<K, InputT> withByteSize(Long batchSizeBytes) {

Review comment:
       This probably deserves a javadoc comment. Why do we use big `Long` instead of `long`?

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2335,126 +2270,40 @@ public void setup() throws IOException {
         }
       }
 
-      @StartBundle
-      public void startBundle(StartBundleContext context) {
-        batch = ArrayListMultimap.create();
-        currentBatchSizeBytes = 0;
-      }
-
-      /**
-       * Adapter interface which provides a common parent for {@link ProcessContext} and {@link
-       * FinishBundleContext} so that we are able to use a single common invocation to output from.
-       */
-      interface ContextAdapter {
-        void output(
-            TupleTag<Document> tag, Document document, Instant timestamp, BoundedWindow window);
-      }
-
-      private static final class ProcessContextAdapter<T> implements ContextAdapter {
-        private final DoFn<T, Document>.ProcessContext context;
-
-        private ProcessContextAdapter(DoFn<T, Document>.ProcessContext context) {
-          this.context = context;
-        }
-
-        @Override
-        public void output(
-            TupleTag<Document> tag, Document document, Instant ignored1, BoundedWindow ignored2) {
-          // Note: window and timestamp are intentionally unused, but required as params to fit the
-          // interface
-          context.output(tag, document);
-        }
-      }
-
-      private static final class FinishBundleContextAdapter<T> implements ContextAdapter {
-        private final DoFn<T, Document>.FinishBundleContext context;
-
-        private FinishBundleContextAdapter(DoFn<T, Document>.FinishBundleContext context) {
-          this.context = context;
-        }
-
-        @Override
-        public void output(
-            TupleTag<Document> tag, Document document, Instant timestamp, BoundedWindow window) {
-          context.output(tag, document, timestamp, window);
+      @ProcessElement
+      public void processElement(ProcessContext c) throws IOException, InterruptedException {

Review comment:
       Can we avoid `ProcessContext` and use `@Element` and `OutputCollector` for new DoFns? Might be worth changing for the old ones as well, but at least for the new code, we might want to use the new style.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -221,6 +221,31 @@ private GroupIntoBatches(BatchingParams<InputT> params) {
             duration));
   }
 
+  public GroupIntoBatches<K, InputT> withByteSize(Long batchSizeBytes) {
+    checkArgument(
+        batchSizeBytes != null && batchSizeBytes < Long.MAX_VALUE && batchSizeBytes > 0,
+        "batchSizeBytes should be a non-negative value less than " + Long.MAX_VALUE);
+    return new GroupIntoBatches<>(
+        BatchingParams.create(
+            params.getBatchSize(),
+            batchSizeBytes,
+            params.getElementByteSize(),
+            params.getMaxBufferingDuration()));
+  }
+
+  public GroupIntoBatches<K, InputT> withByteSize(

Review comment:
       Please add javadoc, same for `Long` vs `long` as above.

##########
File path: sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -336,11 +333,11 @@ void testWriteWithErrors() throws Exception {
         ElasticsearchIO.write()
             .withConnectionConfiguration(connectionConfiguration)
             .withMaxBatchSize(BATCH_SIZE);
-    List<String> input =
-        ElasticsearchIOTestUtils.createDocuments(
-            numDocs, ElasticsearchIOTestUtils.InjectionMode.INJECT_SOME_INVALID_DOCS);
+    //    List<String> input =
+    //        ElasticsearchIOTestUtils.createDocuments(
+    //            numDocs, ElasticsearchIOTestUtils.InjectionMode.INJECT_SOME_INVALID_DOCS);

Review comment:
       Is this some left-over?

##########
File path: sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -362,11 +359,17 @@ public boolean matches(Object o) {
 
     // write bundles size is the runner decision, we cannot force a bundle size,
     // so we test the Writer as a DoFn outside of a runner.
-    try (DoFnTester<Document, Document> fnTester =
-        DoFnTester.of(new BulkIO.BulkIOBundleFn(write.getBulkIO()))) {
-      // inserts into Elasticsearch
-      fnTester.processBundle(serializeDocs(write, input));
-    }
+    //    try (DoFnTester<KV<Integer, Iterable<Document>>, Document> fnTester =
+    //        DoFnTester.of(new BulkIOFn(write.getBulkIO()))) {
+    //      // inserts into Elasticsearch
+    //      fnTester.processBundle(serializeDocs(write, input));
+    //    }

Review comment:
       Can we remove the old code?

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2335,126 +2270,40 @@ public void setup() throws IOException {
         }
       }
 
-      @StartBundle
-      public void startBundle(StartBundleContext context) {
-        batch = ArrayListMultimap.create();
-        currentBatchSizeBytes = 0;
-      }
-
-      /**
-       * Adapter interface which provides a common parent for {@link ProcessContext} and {@link
-       * FinishBundleContext} so that we are able to use a single common invocation to output from.
-       */
-      interface ContextAdapter {
-        void output(
-            TupleTag<Document> tag, Document document, Instant timestamp, BoundedWindow window);
-      }
-
-      private static final class ProcessContextAdapter<T> implements ContextAdapter {
-        private final DoFn<T, Document>.ProcessContext context;
-
-        private ProcessContextAdapter(DoFn<T, Document>.ProcessContext context) {
-          this.context = context;
-        }
-
-        @Override
-        public void output(
-            TupleTag<Document> tag, Document document, Instant ignored1, BoundedWindow ignored2) {
-          // Note: window and timestamp are intentionally unused, but required as params to fit the
-          // interface
-          context.output(tag, document);
-        }
-      }
-
-      private static final class FinishBundleContextAdapter<T> implements ContextAdapter {
-        private final DoFn<T, Document>.FinishBundleContext context;
-
-        private FinishBundleContextAdapter(DoFn<T, Document>.FinishBundleContext context) {
-          this.context = context;
-        }
-
-        @Override
-        public void output(
-            TupleTag<Document> tag, Document document, Instant timestamp, BoundedWindow window) {
-          context.output(tag, document, timestamp, window);
+      @ProcessElement
+      public void processElement(ProcessContext c) throws IOException, InterruptedException {
+        if (c.element() == null || c.element().getValue() == null) {
+          return;
         }
-      }
-
-      @FinishBundle
-      public void finishBundle(FinishBundleContext context)
-          throws IOException, InterruptedException {
-        flushAndOutputResults(new FinishBundleContextAdapter<>(context));
-      }
-
-      private void flushAndOutputResults(ContextAdapter context)
-          throws IOException, InterruptedException {
-        // TODO: remove ContextAdapter and Multimap in favour of MultiOutputReceiver when
-        //  https://issues.apache.org/jira/browse/BEAM-1287 is completed
-        Multimap<BoundedWindow, Document> results = flushBatch();
-        for (Entry<BoundedWindow, Document> result : results.entries()) {
-          BoundedWindow outputWindow = result.getKey();
-          Document outputResult = result.getValue();
-          Instant timestamp = outputResult.getTimestamp();
-          if (timestamp == null) {
-            timestamp = outputWindow.maxTimestamp();
-          }
 
-          if (outputResult.getHasError()) {
-            context.output(Write.FAILED_WRITES, outputResult, timestamp, outputWindow);
+        for (Document doc : flushBatch(Lists.newArrayList(c.element().getValue()))) {
+          if (doc.getHasError()) {
+            c.output(Write.FAILED_WRITES, doc);
           } else {
-            context.output(Write.SUCCESSFUL_WRITES, outputResult, timestamp, outputWindow);
+            c.output(Write.SUCCESSFUL_WRITES, doc);
           }
         }
       }
 
-      protected void addAndMaybeFlush(
-          Document bulkApiEntity, ProcessContext context, BoundedWindow outputWindow)
-          throws IOException, InterruptedException {
-
-        batch.put(outputWindow, bulkApiEntity);
-        currentBatchSizeBytes +=
-            bulkApiEntity.getBulkDirective().getBytes(StandardCharsets.UTF_8).length;
-
-        if (batch.size() >= spec.getMaxBatchSize()
-            || currentBatchSizeBytes >= spec.getMaxBatchSizeBytes()) {
-          flushAndOutputResults(new ProcessContextAdapter<>(context));
-        }
-      }
-
-      private boolean isRetryableClientException(Throwable t) {
-        // RestClient#performRequest only throws wrapped IOException so we must inspect the
-        // exception cause to determine if the exception is likely transient i.e. retryable or
-        // not.
-        return t.getCause() instanceof ConnectTimeoutException
-            || t.getCause() instanceof SocketTimeoutException
-            || t.getCause() instanceof ConnectionClosedException
-            || t.getCause() instanceof ConnectException;
-      }
-
-      private Multimap<BoundedWindow, Document> flushBatch()
+      private Iterable<Document> flushBatch(List<Document> docs)
           throws IOException, InterruptedException {
 
-        if (batch.isEmpty()) {
-          return ArrayListMultimap.create();
+        if (docs.isEmpty()) {
+          return Collections.emptyList();
         }
 
-        LOG.info(
-            "ElasticsearchIO batch size: {}, batch size bytes: {}",
-            batch.size(),
-            currentBatchSizeBytes);
+        //        LOG.info(
+        //            "ElasticsearchIO batch size: {}, batch size bytes: {}",
+        //            batch.size(),
+        //            currentBatchSizeBytes);

Review comment:
       Can we remove this code? Or do we want to keep it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org