You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2023/05/19 15:02:52 UTC
[beam] branch master updated: Cache total size in file based source (#26746)
This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 658e50fac18 Cache total size in file based source (#26746)
658e50fac18 is described below
commit 658e50fac18e2fe45949fe2639b0fceaf42928e1
Author: Yi Hu <ya...@google.com>
AuthorDate: Fri May 19 11:02:41 2023 -0400
Cache total size in file based source (#26746)
* Cache total size in file based source
* Add test case
---
.../org/apache/beam/sdk/io/FileBasedSource.java | 12 ++++++++++++
.../org/apache/beam/sdk/io/FileBasedSourceTest.java | 21 +++++++++++++++++++++
2 files changed, 33 insertions(+)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index 6f95411c592..9d1c1bd80fa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
@@ -73,6 +74,8 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
private MatchResult.@Nullable Metadata singleFileMetadata;
private final Mode mode;
+ private final AtomicReference<@Nullable Long> filesSizeBytes;
+
/** A given {@code FileBasedSource} represents a file resource of one of these types. */
public enum Mode {
FILEPATTERN,
@@ -91,6 +94,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
this.mode = Mode.FILEPATTERN;
this.emptyMatchTreatment = emptyMatchTreatment;
this.fileOrPatternSpec = fileOrPatternSpec;
+ this.filesSizeBytes = new AtomicReference<>();
}
/**
@@ -123,6 +127,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
mode = Mode.SINGLE_FILE_OR_SUBRANGE;
this.singleFileMetadata = checkNotNull(fileMetadata, "fileMetadata");
this.fileOrPatternSpec = StaticValueProvider.of(fileMetadata.resourceId().toString());
+ this.filesSizeBytes = new AtomicReference<>();
// This field will be unused in this mode.
this.emptyMatchTreatment = EmptyMatchTreatment.DISALLOW;
@@ -222,6 +227,11 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
String fileOrPattern = fileOrPatternSpec.get();
if (mode == Mode.FILEPATTERN) {
+ Long maybeNumBytes = filesSizeBytes.get();
+ if (maybeNumBytes != null) {
+ return maybeNumBytes;
+ }
+
long totalSize = 0;
List<Metadata> allMatches = FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata();
for (Metadata metadata : allMatches) {
@@ -232,6 +242,8 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
fileOrPattern,
allMatches.size(),
totalSize);
+
+ filesSizeBytes.compareAndSet(null, totalSize);
return totalSize;
} else {
long start = getStartOffset();
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index 1ee9c8c6f42..3d8ddd3a336 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -41,6 +41,8 @@ import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
+import java.util.logging.Level;
+import java.util.logging.LogManager;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader;
@@ -50,6 +52,7 @@ import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -782,7 +785,25 @@ public class FileBasedSourceTest {
File file = createFileWithData(fileName, data);
TestFileBasedSource source = new TestFileBasedSource(file.getPath(), 64, null);
+
+ ExpectedLogs.LogSaver logSaver = new ExpectedLogs.LogSaver();
+ LogManager.getLogManager().getLogger("").addHandler(logSaver);
+ assertEquals(file.length(), source.getEstimatedSizeBytes(null));
+ ExpectedLogs.verifyLogged(
+ ExpectedLogs.matcher(
+ Level.INFO, String.format("matched 1 files with total size %d", file.length())),
+ logSaver);
+ LogManager.getLogManager().getLogger("").removeHandler(logSaver);
+
+ logSaver = new ExpectedLogs.LogSaver();
+ LogManager.getLogManager().getLogger("").addHandler(logSaver);
assertEquals(file.length(), source.getEstimatedSizeBytes(null));
+ // Second call get result from cache and does not send match request
+ ExpectedLogs.verifyNotLogged(
+ ExpectedLogs.matcher(
+ Level.INFO, String.format("matched 1 files with total size %d", file.length())),
+ logSaver);
+ LogManager.getLogManager().getLogger("").removeHandler(logSaver);
}
@Test