You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2023/05/19 15:02:52 UTC

[beam] branch master updated: Cache total size in file based source (#26746)

This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 658e50fac18 Cache total size in file based source (#26746)
658e50fac18 is described below

commit 658e50fac18e2fe45949fe2639b0fceaf42928e1
Author: Yi Hu <ya...@google.com>
AuthorDate: Fri May 19 11:02:41 2023 -0400

    Cache total size in file based source (#26746)
    
    * Cache total size in file based source
    
    * Add test case
---
 .../org/apache/beam/sdk/io/FileBasedSource.java     | 12 ++++++++++++
 .../org/apache/beam/sdk/io/FileBasedSourceTest.java | 21 +++++++++++++++++++++
 2 files changed, 33 insertions(+)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index 6f95411c592..9d1c1bd80fa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
@@ -73,6 +74,8 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
   private MatchResult.@Nullable Metadata singleFileMetadata;
   private final Mode mode;
 
+  private final AtomicReference<@Nullable Long> filesSizeBytes;
+
   /** A given {@code FileBasedSource} represents a file resource of one of these types. */
   public enum Mode {
     FILEPATTERN,
@@ -91,6 +94,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     this.mode = Mode.FILEPATTERN;
     this.emptyMatchTreatment = emptyMatchTreatment;
     this.fileOrPatternSpec = fileOrPatternSpec;
+    this.filesSizeBytes = new AtomicReference<>();
   }
 
   /**
@@ -123,6 +127,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     mode = Mode.SINGLE_FILE_OR_SUBRANGE;
     this.singleFileMetadata = checkNotNull(fileMetadata, "fileMetadata");
     this.fileOrPatternSpec = StaticValueProvider.of(fileMetadata.resourceId().toString());
+    this.filesSizeBytes = new AtomicReference<>();
 
     // This field will be unused in this mode.
     this.emptyMatchTreatment = EmptyMatchTreatment.DISALLOW;
@@ -222,6 +227,11 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     String fileOrPattern = fileOrPatternSpec.get();
 
     if (mode == Mode.FILEPATTERN) {
+      Long maybeNumBytes = filesSizeBytes.get();
+      if (maybeNumBytes != null) {
+        return maybeNumBytes;
+      }
+
       long totalSize = 0;
       List<Metadata> allMatches = FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata();
       for (Metadata metadata : allMatches) {
@@ -232,6 +242,8 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
           fileOrPattern,
           allMatches.size(),
           totalSize);
+
+      filesSizeBytes.compareAndSet(null, totalSize);
       return totalSize;
     } else {
       long start = getStartOffset();
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index 1ee9c8c6f42..3d8ddd3a336 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -41,6 +41,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Random;
+import java.util.logging.Level;
+import java.util.logging.LogManager;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader;
@@ -50,6 +52,7 @@ import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -782,7 +785,25 @@ public class FileBasedSourceTest {
     File file = createFileWithData(fileName, data);
 
     TestFileBasedSource source = new TestFileBasedSource(file.getPath(), 64, null);
+
+    ExpectedLogs.LogSaver logSaver = new ExpectedLogs.LogSaver();
+    LogManager.getLogManager().getLogger("").addHandler(logSaver);
+    assertEquals(file.length(), source.getEstimatedSizeBytes(null));
+    ExpectedLogs.verifyLogged(
+        ExpectedLogs.matcher(
+            Level.INFO, String.format("matched 1 files with total size %d", file.length())),
+        logSaver);
+    LogManager.getLogManager().getLogger("").removeHandler(logSaver);
+
+    logSaver = new ExpectedLogs.LogSaver();
+    LogManager.getLogManager().getLogger("").addHandler(logSaver);
     assertEquals(file.length(), source.getEstimatedSizeBytes(null));
+    // Second call get result from cache and does not send match request
+    ExpectedLogs.verifyNotLogged(
+        ExpectedLogs.matcher(
+            Level.INFO, String.format("matched 1 files with total size %d", file.length())),
+        logSaver);
+    LogManager.getLogManager().getLogger("").removeHandler(logSaver);
   }
 
   @Test