You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/20 22:10:23 UTC
[1/3] beam git commit: [BEAM-59] FileBasedSource: convert to
FileSystem
Repository: beam
Updated Branches:
refs/heads/master 4f8b1cc22 -> e44918881
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index 94a29da..c15e667 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -26,11 +26,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.when;
-import com.google.common.collect.ImmutableList;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
@@ -39,21 +38,23 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader;
import org.apache.beam.sdk.io.Source.Reader;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
import org.junit.Test;
@@ -62,7 +63,6 @@ import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
/**
* Tests code common to all file-based sources.
@@ -70,7 +70,7 @@ import org.mockito.Mockito;
@RunWith(JUnit4.class)
public class FileBasedSourceTest {
- Random random = new Random(0L);
+ private Random random = new Random(0L);
@Rule public final TestPipeline p = TestPipeline.create();
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
@@ -89,16 +89,16 @@ public class FileBasedSourceTest {
final String splitHeader;
public TestFileBasedSource(String fileOrPattern, long minBundleSize, String splitHeader) {
- super(fileOrPattern, minBundleSize);
+ super(StaticValueProvider.of(fileOrPattern), minBundleSize);
this.splitHeader = splitHeader;
}
public TestFileBasedSource(
- String fileOrPattern,
+ Metadata fileOrPattern,
long minBundleSize,
long startOffset,
long endOffset,
- String splitHeader) {
+ @Nullable String splitHeader) {
super(fileOrPattern, minBundleSize, startOffset, endOffset);
this.splitHeader = splitHeader;
}
@@ -113,7 +113,7 @@ public class FileBasedSourceTest {
@Override
protected FileBasedSource<String> createForSubrangeOfFile(
- String fileName, long start, long end) {
+ Metadata fileName, long start, long end) {
return new TestFileBasedSource(fileName, getMinBundleSize(), start, end, splitHeader);
}
@@ -397,30 +397,12 @@ public class FileBasedSourceTest {
}
@Test
- public void testSplittingUsingFullThreadPool() throws Exception {
- int numFiles = FileBasedSource.THREAD_POOL_SIZE * 5;
- File file0 = null;
- for (int i = 0; i < numFiles; i++) {
- List<String> data = createStringDataset(3, 1000);
- File file = createFileWithData("file" + i, data);
- if (i == 0) {
- file0 = file;
- }
- }
-
- TestFileBasedSource source =
- new TestFileBasedSource(file0.getParent() + "/" + "file*", Long.MAX_VALUE, null);
- List<? extends BoundedSource<String>> splits = source.split(Long.MAX_VALUE, null);
- assertEquals(numFiles, splits.size());
- }
-
- @Test
public void testSplittingFailsOnEmptyFileExpansion() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
String missingFilePath = tempFolder.newFolder().getAbsolutePath() + "/missing.txt";
TestFileBasedSource source = new TestFileBasedSource(missingFilePath, Long.MAX_VALUE, null);
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(String.format("Unable to find any files matching %s", missingFilePath));
+ thrown.expect(FileNotFoundException.class);
+ thrown.expectMessage(String.format("No files found for spec: %s", missingFilePath));
source.split(1234, options);
}
@@ -460,16 +442,7 @@ public class FileBasedSourceTest {
PipelineOptions options = PipelineOptionsFactory.create();
File file1 = createFileWithData("file1", new ArrayList<String>());
- IOChannelFactory mockIOFactory = Mockito.mock(IOChannelFactory.class);
- String parent = file1.getParent();
- String pattern = "mocked://test";
- when(mockIOFactory.match(pattern))
- .thenReturn(
- ImmutableList.of(
- new File(parent, "file1").getPath(),
- new File(parent, "file2").getPath(),
- new File(parent, "file3").getPath()));
- IOChannelUtils.setIOFactoryInternal("mocked", mockIOFactory, true /* override */);
+ String pattern = file1.getParent() + "/file*";
List<String> data2 = createStringDataset(3, 50);
createFileWithData("file2", data2);
@@ -496,9 +469,10 @@ public class FileBasedSourceTest {
String fileName = "file";
File file = createFileWithData(fileName, data);
- TestFileBasedSource source1 = new TestFileBasedSource(file.getPath(), 64, 0, 25, null);
+ Metadata metadata = FileSystems.matchSingleFileSpec(file.getPath());
+ TestFileBasedSource source1 = new TestFileBasedSource(metadata, 64, 0, 25, null);
TestFileBasedSource source2 =
- new TestFileBasedSource(file.getPath(), 64, 25, Long.MAX_VALUE, null);
+ new TestFileBasedSource(metadata, 64, 25, Long.MAX_VALUE, null);
List<String> results = new ArrayList<String>();
results.addAll(readFromSource(source1, options));
@@ -523,7 +497,7 @@ public class FileBasedSourceTest {
List<String> expectedResults = new ArrayList<String>();
expectedResults.addAll(data);
// Remove all occurrences of header from expected results.
- expectedResults.removeAll(Arrays.asList(header));
+ expectedResults.removeAll(Collections.singletonList(header));
assertEquals(expectedResults, readFromSource(source, options));
}
@@ -540,9 +514,10 @@ public class FileBasedSourceTest {
String fileName = "file";
File file = createFileWithData(fileName, data);
- TestFileBasedSource source1 = new TestFileBasedSource(file.getPath(), 64, 0, 60, header);
+ Metadata metadata = FileSystems.matchSingleFileSpec(file.getPath());
+ TestFileBasedSource source1 = new TestFileBasedSource(metadata, 64, 0, 60, header);
TestFileBasedSource source2 =
- new TestFileBasedSource(file.getPath(), 64, 60, Long.MAX_VALUE, header);
+ new TestFileBasedSource(metadata, 64, 60, Long.MAX_VALUE, header);
List<String> expectedResults = new ArrayList<String>();
expectedResults.addAll(data);
@@ -568,16 +543,17 @@ public class FileBasedSourceTest {
String fileName = "file";
File file = createFileWithData(fileName, data);
- TestFileBasedSource source1 = new TestFileBasedSource(file.getPath(), 64, 0, 42, header);
- TestFileBasedSource source2 = new TestFileBasedSource(file.getPath(), 64, 42, 112, header);
+ Metadata metadata = FileSystems.matchSingleFileSpec(file.getPath());
+ TestFileBasedSource source1 = new TestFileBasedSource(metadata, 64, 0, 42, header);
+ TestFileBasedSource source2 = new TestFileBasedSource(metadata, 64, 42, 112, header);
TestFileBasedSource source3 =
- new TestFileBasedSource(file.getPath(), 64, 112, Long.MAX_VALUE, header);
+ new TestFileBasedSource(metadata, 64, 112, Long.MAX_VALUE, header);
List<String> expectedResults = new ArrayList<String>();
expectedResults.addAll(data);
// Remove all occurrences of header from expected results.
- expectedResults.removeAll(Arrays.asList(header));
+ expectedResults.removeAll(Collections.singletonList(header));
List<String> results = new ArrayList<>();
results.addAll(readFromSource(source1, options));
@@ -599,16 +575,17 @@ public class FileBasedSourceTest {
String fileName = "file";
File file = createFileWithData(fileName, data);
- TestFileBasedSource source1 = new TestFileBasedSource(file.getPath(), 64, 0, 42, header);
- TestFileBasedSource source2 = new TestFileBasedSource(file.getPath(), 64, 42, 62, header);
+ Metadata metadata = FileSystems.matchSingleFileSpec(file.getPath());
+ TestFileBasedSource source1 = new TestFileBasedSource(metadata, 64, 0, 42, header);
+ TestFileBasedSource source2 = new TestFileBasedSource(metadata, 64, 42, 62, header);
TestFileBasedSource source3 =
- new TestFileBasedSource(file.getPath(), 64, 62, Long.MAX_VALUE, header);
+ new TestFileBasedSource(metadata, 64, 62, Long.MAX_VALUE, header);
List<String> expectedResults = new ArrayList<String>();
expectedResults.addAll(data);
// Remove all occurrences of header from expected results.
- expectedResults.removeAll(Arrays.asList(header));
+ expectedResults.removeAll(Collections.singletonList(header));
List<String> results = new ArrayList<>();
results.addAll(readFromSource(source1, options));
@@ -633,19 +610,20 @@ public class FileBasedSourceTest {
List<String> expectedResults = new ArrayList<String>();
expectedResults.addAll(data.subList(10, data.size()));
// Remove all occurrences of header from expected results.
- expectedResults.removeAll(Arrays.asList(header));
+ expectedResults.removeAll(Collections.singletonList(header));
+ Metadata metadata = FileSystems.matchSingleFileSpec(file.getPath());
// Split starts after "<" of the header
TestFileBasedSource source =
- new TestFileBasedSource(file.getPath(), 64, 1, Long.MAX_VALUE, header);
+ new TestFileBasedSource(metadata, 64, 1, Long.MAX_VALUE, header);
assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
// Split starts after "<h" of the header
- source = new TestFileBasedSource(file.getPath(), 64, 2, Long.MAX_VALUE, header);
+ source = new TestFileBasedSource(metadata, 64, 2, Long.MAX_VALUE, header);
assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
// Split starts after "<h>" of the header
- source = new TestFileBasedSource(file.getPath(), 64, 3, Long.MAX_VALUE, header);
+ source = new TestFileBasedSource(metadata, 64, 3, Long.MAX_VALUE, header);
assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
}
@@ -656,10 +634,11 @@ public class FileBasedSourceTest {
String fileName = "file";
File file = createFileWithData(fileName, data);
- TestFileBasedSource source1 = new TestFileBasedSource(file.getPath(), 64, 0, 52, null);
- TestFileBasedSource source2 = new TestFileBasedSource(file.getPath(), 64, 52, 72, null);
+ Metadata metadata = FileSystems.matchSingleFileSpec(file.getPath());
+ TestFileBasedSource source1 = new TestFileBasedSource(metadata, 64, 0, 52, null);
+ TestFileBasedSource source2 = new TestFileBasedSource(metadata, 64, 52, 72, null);
TestFileBasedSource source3 =
- new TestFileBasedSource(file.getPath(), 64, 72, Long.MAX_VALUE, null);
+ new TestFileBasedSource(metadata, 64, 72, Long.MAX_VALUE, null);
List<String> results = new ArrayList<>();
results.addAll(readFromSource(source1, options));
@@ -677,9 +656,10 @@ public class FileBasedSourceTest {
String fileName = "file";
File file = createFileWithData(fileName, data);
- TestFileBasedSource source1 = new TestFileBasedSource(file.getPath(), 64, 0, 162, null);
+ Metadata metadata = FileSystems.matchSingleFileSpec(file.getPath());
+ TestFileBasedSource source1 = new TestFileBasedSource(metadata, 64, 0, 162, null);
TestFileBasedSource source2 =
- new TestFileBasedSource(file.getPath(), 1024, 162, Long.MAX_VALUE, null);
+ new TestFileBasedSource(metadata, 1024, 162, Long.MAX_VALUE, null);
List<String> results = new ArrayList<>();
results.addAll(readFromSource(source1, options));
@@ -793,74 +773,6 @@ public class FileBasedSourceTest {
}
@Test
- public void testEstimatedSizeOfFilePatternAllThreads() throws Exception {
- File file0 = null;
- int numFiles = FileBasedSource.THREAD_POOL_SIZE * 5;
- long totalSize = 0;
- for (int i = 0; i < numFiles; i++) {
- List<String> data = createStringDataset(3, 20);
- File file = createFileWithData("file" + i, data);
- if (i == 0) {
- file0 = file;
- }
- totalSize += file.length();
- }
-
- TestFileBasedSource source =
- new TestFileBasedSource(new File(file0.getParent(), "file*").getPath(), 64, null);
-
- // Since all files are of equal size, sampling should produce the exact result.
- assertEquals(totalSize, source.getEstimatedSizeBytes(null));
- }
-
- @Test
- public void testEstimatedSizeOfFilePatternThroughSamplingEqualSize() throws Exception {
- // When all files are of equal size, we should get the exact size.
- int numFilesToTest = FileBasedSource.MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT * 2;
- File file0 = null;
- for (int i = 0; i < numFilesToTest; i++) {
- List<String> data = createStringDataset(3, 20);
- File file = createFileWithData("file" + i, data);
- if (i == 0) {
- file0 = file;
- }
- }
-
- long actualTotalSize = file0.length() * numFilesToTest;
- TestFileBasedSource source =
- new TestFileBasedSource(new File(file0.getParent(), "file*").getPath(), 64, null);
- assertEquals(actualTotalSize, source.getEstimatedSizeBytes(null));
- }
-
- @Test
- public void testEstimatedSizeOfFilePatternThroughSamplingDifferentSizes() throws Exception {
- float tolerableError = 0.2f;
- int numFilesToTest = FileBasedSource.MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT * 2;
- File file0 = null;
-
- // Keeping sizes of files close to each other to make sure that the test passes reliably.
- Random rand = new Random(System.currentTimeMillis());
- int dataSizeBase = 100;
- int dataSizeDelta = 10;
-
- long actualTotalSize = 0;
- for (int i = 0; i < numFilesToTest; i++) {
- List<String> data = createStringDataset(
- 3, (int) (dataSizeBase + rand.nextFloat() * dataSizeDelta * 2 - dataSizeDelta));
- File file = createFileWithData("file" + i, data);
- if (i == 0) {
- file0 = file;
- }
- actualTotalSize += file.length();
- }
-
- TestFileBasedSource source =
- new TestFileBasedSource(new File(file0.getParent(), "file*").getPath(), 64, null);
- assertEquals((double) actualTotalSize, (double) source.getEstimatedSizeBytes(null),
- actualTotalSize * tolerableError);
- }
-
- @Test
public void testReadAllSplitsOfFilePattern() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
List<String> data1 = createStringDataset(3, 50);
@@ -900,7 +812,8 @@ public class FileBasedSourceTest {
PipelineOptions options = PipelineOptionsFactory.create();
File file = createFileWithData("file", createStringDataset(3, 100));
- TestFileBasedSource source = new TestFileBasedSource(file.getPath(), 1, 0, file.length(), null);
+ Metadata metadata = FileSystems.matchSingleFileSpec(file.getPath());
+ TestFileBasedSource source = new TestFileBasedSource(metadata, 1, 0, file.length(), null);
// Shouldn't be able to split while unstarted.
assertSplitAtFractionFails(source, 0, 0.7, options);
assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.7, options);
@@ -918,21 +831,16 @@ public class FileBasedSourceTest {
// Smaller file for exhaustive testing.
File file = createFileWithData("file", createStringDataset(3, 20));
- TestFileBasedSource source = new TestFileBasedSource(file.getPath(), 1, 0, file.length(), null);
+ Metadata metadata = FileSystems.matchSingleFileSpec(file.getPath());
+ TestFileBasedSource source = new TestFileBasedSource(metadata, 1, 0, file.length(), null);
assertSplitAtFractionExhaustive(source, options);
}
@Test
public void testToStringFile() throws Exception {
- String path = "/tmp/foo";
- TestFileBasedSource source = new TestFileBasedSource(path, 1, 0, 10, null);
- assertEquals(String.format("%s range [0, 10)", path), source.toString());
- }
-
- @Test
- public void testToStringPattern() throws Exception {
- String path = "/tmp/foo/*";
- TestFileBasedSource source = new TestFileBasedSource(path, 1, 0, 10, null);
- assertEquals(String.format("%s range [0, 10)", path), source.toString());
+ File f = createFileWithData("foo", Collections.<String>emptyList());
+ Metadata metadata = FileSystems.matchSingleFileSpec(f.getPath());
+ TestFileBasedSource source = new TestFileBasedSource(metadata, 1, 0, 10, null);
+ assertEquals(String.format("%s range [0, 10)", f.getAbsolutePath()), source.toString());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java
index d8c345c..d42f8ed 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java
@@ -33,6 +33,8 @@ import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.util.AvroUtils.AvroMetadata;
import org.junit.Rule;
import org.junit.Test;
@@ -83,7 +85,9 @@ public class AvroUtilsTest {
for (String codec : codecs) {
String filename = generateTestFile(
codec, expected, AvroCoder.of(Bird.class), codec);
- AvroMetadata metadata = AvroUtils.readMetadataFromFile(filename);
+
+ Metadata fileMeta = FileSystems.matchSingleFileSpec(filename);
+ AvroMetadata metadata = AvroUtils.readMetadataFromFile(fileMeta.resourceId());
assertEquals(codec, metadata.getCodec());
}
}
@@ -94,7 +98,8 @@ public class AvroUtilsTest {
String codec = DataFileConstants.NULL_CODEC;
String filename = generateTestFile(
codec, expected, AvroCoder.of(Bird.class), codec);
- AvroMetadata metadata = AvroUtils.readMetadataFromFile(filename);
+ Metadata fileMeta = FileSystems.matchSingleFileSpec(filename);
+ AvroMetadata metadata = AvroUtils.readMetadataFromFile(fileMeta.resourceId());
// By default, parse validates the schema, which is what we want.
Schema schema = new Schema.Parser().parse(metadata.getSchemaString());
assertEquals(8, schema.getFields().size());
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 1c853bb..d0dfd3e 100644
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -636,9 +636,8 @@ public class GcsUtil {
return batches;
}
- public void copy(Iterable<String> srcFilenames,
- Iterable<String> destFilenames) throws
- IOException {
+ public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames)
+ throws IOException {
executeBatches(makeCopyBatches(srcFilenames, destFilenames));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
index 863b01b..6a71bdc 100644
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Strings.isNullOrEmpty;
import com.google.api.services.storage.model.StorageObject;
import java.io.File;
import java.io.IOException;
+import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.FileSystem;
@@ -68,7 +69,7 @@ import javax.annotation.Nullable;
* "http://docs.oracle.com/javase/tutorial/essential/io/pathOps.html"
* >Java Tutorials: Path Operations</a>
*/
-public class GcsPath implements Path {
+public class GcsPath implements Path, Serializable {
public static final String SCHEME = "gs";
@@ -176,7 +177,7 @@ public class GcsPath implements Path {
}
@Nullable
- private FileSystem fs;
+ private transient FileSystem fs;
@Nonnull
private final String bucket;
@Nonnull
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 3653753..713a9a9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -503,8 +503,7 @@ public class BigtableIOTest {
null /*filter*/,
ByteKeyRange.ALL_KEYS,
null /*size*/);
- List<BigtableSource> splits =
- source.split(numRows * bytesPerRow / numSplits, null);
+ List<BigtableSource> splits = source.split(numRows * bytesPerRow / numSplits, null);
// Test num splits and split equality.
assertThat(splits, hasSize(numSplits));
@@ -529,8 +528,7 @@ public class BigtableIOTest {
RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*17.*")).build();
BigtableSource source =
new BigtableSource(serviceFactory, table, filter, ByteKeyRange.ALL_KEYS, null /*size*/);
- List<BigtableSource> splits =
- source.split(numRows * bytesPerRow / numSplits, null);
+ List<BigtableSource> splits = source.split(numRows * bytesPerRow / numSplits, null);
// Test num splits and split equality.
assertThat(splits, hasSize(numSplits));
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
index da70632..ccde03f 100644
--- a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
@@ -643,7 +643,7 @@ public class HadoopInputFormatIOTest {
* {@link HadoopInputFormatBoundedSource#createReader(PipelineOptions)}
* createReader()} method when
* {@link HadoopInputFormatBoundedSource#split(long, PipelineOptions)}
- * split()} is not called.
+ * is not called.
*/
@Test
public void testCreateReaderIfSplitNotCalled() throws Exception {
[2/3] beam git commit: [BEAM-59] FileBasedSource: convert to
FileSystem
Posted by dh...@apache.org.
[BEAM-59] FileBasedSource: convert to FileSystem
* Make ResourceId serializable
* Update all implementations
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/49b8b230
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/49b8b230
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/49b8b230
Branch: refs/heads/master
Commit: 49b8b2302f4cbc67690617dee788356de2337d65
Parents: 4f8b1cc
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 20 15:08:34 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 20 15:08:34 2017 -0700
----------------------------------------------------------------------
.../UnboundedReadFromBoundedSource.java | 3 +-
.../UnboundedReadFromBoundedSourceTest.java | 14 +-
.../translation/wrappers/SourceInputFormat.java | 3 +-
.../beam/runners/spark/io/MicrobatchSource.java | 4 +-
.../java/org/apache/beam/sdk/io/AvroSource.java | 43 +--
.../apache/beam/sdk/io/BlockBasedSource.java | 10 +-
.../apache/beam/sdk/io/CompressedSource.java | 26 +-
.../org/apache/beam/sdk/io/FileBasedSource.java | 355 +++++++------------
.../org/apache/beam/sdk/io/FileSystems.java | 32 +-
.../org/apache/beam/sdk/io/LocalResourceId.java | 29 +-
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 17 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 21 +-
.../java/org/apache/beam/sdk/io/XmlSource.java | 12 +-
.../org/apache/beam/sdk/io/fs/MatchResult.java | 3 +-
.../org/apache/beam/sdk/io/fs/ResourceId.java | 3 +-
.../org/apache/beam/sdk/util/AvroUtils.java | 24 +-
.../org/apache/beam/sdk/io/AvroSourceTest.java | 15 +-
.../beam/sdk/io/CompressedSourceTest.java | 12 +-
.../apache/beam/sdk/io/FileBasedSourceTest.java | 192 +++-------
.../org/apache/beam/sdk/util/AvroUtilsTest.java | 9 +-
.../java/org/apache/beam/sdk/util/GcsUtil.java | 5 +-
.../org/apache/beam/sdk/util/gcsfs/GcsPath.java | 5 +-
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 6 +-
.../inputformat/HadoopInputFormatIOTest.java | 2 +-
24 files changed, 349 insertions(+), 496 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index f67af8a..6b99522 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -61,8 +61,7 @@ import org.slf4j.LoggerFactory;
/**
* {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
*
- * <p>{@link BoundedSource} is read directly without calling
- * {@link BoundedSource#split},
+ * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#split},
* and element timestamps are propagated. While any elements remain, the watermark is the beginning
* of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
* the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
index c905cf5..0e48a9d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
@@ -44,8 +44,10 @@ import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.FileBasedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
@@ -299,21 +301,21 @@ public class UnboundedReadFromBoundedSourceTest {
*/
private static class UnsplittableSource extends FileBasedSource<Byte> {
public UnsplittableSource(String fileOrPatternSpec, long minBundleSize) {
- super(fileOrPatternSpec, minBundleSize);
+ super(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
}
public UnsplittableSource(
- String fileName, long minBundleSize, long startOffset, long endOffset) {
- super(fileName, minBundleSize, startOffset, endOffset);
+ Metadata metadata, long minBundleSize, long startOffset, long endOffset) {
+ super(metadata, minBundleSize, startOffset, endOffset);
}
@Override
- protected FileBasedSource<Byte> createForSubrangeOfFile(String fileName, long start, long end) {
- return new UnsplittableSource(fileName, getMinBundleSize(), start, end);
+ protected UnsplittableSource createForSubrangeOfFile(Metadata metadata, long start, long end) {
+ return new UnsplittableSource(metadata, getMinBundleSize(), start, end);
}
@Override
- protected FileBasedReader<Byte> createSingleFileReader(PipelineOptions options) {
+ protected UnsplittableReader createSingleFileReader(PipelineOptions options) {
return new UnsplittableReader(this);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
index a87472b..12be8eb 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -100,8 +100,7 @@ public class SourceInputFormat<T>
public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException {
try {
long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
- List<? extends Source<T>> shards =
- initialSource.split(desiredSizeBytes, options);
+ List<? extends Source<T>> shards = initialSource.split(desiredSizeBytes, options);
int numShards = shards.size();
SourceInputSplit<T>[] sourceInputSplits = new SourceInputSplit[numShards];
for (int i = 0; i < numShards; i++) {
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 7c07920..fde5f9a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -102,8 +102,8 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
}
@Override
- public List<? extends BoundedSource<T>> split(long desiredBundleSizeBytes,
- PipelineOptions options) throws Exception {
+ public List<? extends BoundedSource<T>> split(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
List<MicrobatchSource<T, CheckpointMarkT>> result = new ArrayList<>();
List<? extends UnboundedSource<T, CheckpointMarkT>> splits =
source.split(numInitialSplits, options);
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 0c52dea..5e0900a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
@@ -34,7 +33,6 @@ import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.zip.Inflater;
@@ -52,6 +50,7 @@ import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.util.AvroUtils;
@@ -84,15 +83,6 @@ import org.apache.commons.compress.utils.CountingInputStream;
* }
* </pre>
*
- * <p>The {@link AvroSource#readFromFileWithClass(String, Class)} method is a convenience method
- * that returns a read transform. For example:
- *
- * <pre>
- * {@code
- * PCollection<MyType> records = AvroSource.readFromFileWithClass(file.toPath(), MyType.class));
- * }
- * </pre>
- *
* <p>This class's implementation is based on the <a
* href="https://avro.apache.org/docs/1.7.7/spec.html">Avro 1.7.7</a> specification and implements
* parsing of some parts of Avro Object Container Files. The rationale for doing so is that the Avro
@@ -165,15 +155,6 @@ public class AvroSource<T> extends BlockBasedSource<T> {
private transient Schema readSchema;
/**
- * Creates a {@link Read} transform that will read from an {@link AvroSource} that is configured
- * to read records of the given type from a file pattern.
- */
- public static <T> Read.Bounded<T> readFromFileWithClass(String filePattern, Class<T> clazz) {
- return Read.from(new AvroSource<>(filePattern, DEFAULT_MIN_BUNDLE_SIZE,
- ReflectData.get().getSchema(clazz).toString(), clazz, null, null));
- }
-
- /**
* Creates an {@link AvroSource} that reads from the given file name or pattern ("glob"). The
* returned source can be further configured by calling {@link #withSchema} to return a type other
* than {@link GenericRecord}.
@@ -237,9 +218,9 @@ public class AvroSource<T> extends BlockBasedSource<T> {
this.fileSchemaString = null;
}
- private AvroSource(String fileName, long minBundleSize, long startOffset, long endOffset,
+ private AvroSource(Metadata metadata, long minBundleSize, long startOffset, long endOffset,
String schema, Class<T> type, String codec, byte[] syncMarker, String fileSchema) {
- super(fileName, minBundleSize, startOffset, endOffset);
+ super(metadata, minBundleSize, startOffset, endOffset);
this.readSchemaString = internSchemaString(schema);
this.codec = codec;
this.syncMarker = syncMarker;
@@ -254,8 +235,14 @@ public class AvroSource<T> extends BlockBasedSource<T> {
super.validate();
}
+ @Deprecated // Added to let DataflowRunner migrate off of this; to be deleted.
+ public BlockBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end)
+ throws IOException {
+ return createForSubrangeOfFile(FileSystems.matchSingleFileSpec(fileName), start, end);
+ }
+
@Override
- public BlockBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
+ public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, long start, long end) {
byte[] syncMarker = this.syncMarker;
String codec = this.codec;
String readSchemaString = this.readSchemaString;
@@ -267,11 +254,9 @@ public class AvroSource<T> extends BlockBasedSource<T> {
if (codec == null || syncMarker == null || fileSchemaString == null) {
AvroMetadata metadata;
try {
- Collection<String> files = FileBasedSource.expandFilePattern(fileName);
- checkArgument(files.size() <= 1, "More than 1 file matched %s");
- metadata = AvroUtils.readMetadataFromFile(fileName);
+ metadata = AvroUtils.readMetadataFromFile(fileMetadata.resourceId());
} catch (IOException e) {
- throw new RuntimeException("Error reading metadata from file " + fileName, e);
+ throw new RuntimeException("Error reading metadata from file " + fileMetadata, e);
}
codec = metadata.getCodec();
syncMarker = metadata.getSyncMarker();
@@ -287,7 +272,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
// readSchemaString. This allows for Java to have an efficient serialization since it
// will only encode the schema once while just storing pointers to the encoded version
// within this source.
- return new AvroSource<>(fileName, getMinBundleSize(), start, end, readSchemaString, type,
+ return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, readSchemaString, type,
codec, syncMarker, fileSchemaString);
}
@@ -389,7 +374,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
switch (getMode()) {
case SINGLE_FILE_OR_SUBRANGE:
return new AvroSource<>(
- getFileOrPatternSpec(),
+ getSingleFileMetadata(),
getMinBundleSize(),
getStartOffset(),
getEndOffset(),
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
index 83336ff..cf6671e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
@@ -21,7 +21,9 @@ import java.io.IOException;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
/**
* A {@code BlockBasedSource} is a {@link FileBasedSource} where a file consists of blocks of
@@ -64,7 +66,7 @@ public abstract class BlockBasedSource<T> extends FileBasedSource<T> {
* {@link FileBasedSource} for more information.
*/
public BlockBasedSource(String fileOrPatternSpec, long minBundleSize) {
- super(fileOrPatternSpec, minBundleSize);
+ super(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
}
/**
@@ -72,8 +74,8 @@ public abstract class BlockBasedSource<T> extends FileBasedSource<T> {
* when implementing {@link BlockBasedSource#createForSubrangeOfFile}. See documentation in
* {@link FileBasedSource}.
*/
- public BlockBasedSource(String fileName, long minBundleSize, long startOffset, long endOffset) {
- super(fileName, minBundleSize, startOffset, endOffset);
+ public BlockBasedSource(Metadata metadata, long minBundleSize, long startOffset, long endOffset) {
+ super(metadata, minBundleSize, startOffset, endOffset);
}
/**
@@ -81,7 +83,7 @@ public abstract class BlockBasedSource<T> extends FileBasedSource<T> {
*/
@Override
protected abstract BlockBasedSource<T> createForSubrangeOfFile(
- String fileName, long start, long end);
+ Metadata metadata, long start, long end);
/**
* Creates a {@code BlockBasedReader}.
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index 1d940cb..f2fc37b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -36,6 +36,7 @@ import java.util.zip.ZipInputStream;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
@@ -287,16 +288,6 @@ public class CompressedSource<T> extends FileBasedSource<T> {
private final DecompressingChannelFactory channelFactory;
/**
- * Creates a {@link Read} transform that reads from that reads from the underlying
- * {@link FileBasedSource} {@code sourceDelegate} after decompressing it with a {@link
- * DecompressingChannelFactory}.
- */
- public static <T> Read.Bounded<T> readFromSource(
- FileBasedSource<T> sourceDelegate, DecompressingChannelFactory channelFactory) {
- return Read.from(new CompressedSource<>(sourceDelegate, channelFactory));
- }
-
- /**
* Creates a {@code CompressedSource} from an underlying {@code FileBasedSource}. The type
* of compression used will be based on the file name extension unless explicitly
* configured via {@link CompressedSource#withDecompression}.
@@ -329,12 +320,12 @@ public class CompressedSource<T> extends FileBasedSource<T> {
* CompressedSource#createForSubrangeOfFile}.
*/
private CompressedSource(FileBasedSource<T> sourceDelegate,
- DecompressingChannelFactory channelFactory, String filePatternOrSpec, long minBundleSize,
+ DecompressingChannelFactory channelFactory, Metadata metadata, long minBundleSize,
long startOffset, long endOffset) {
- super(filePatternOrSpec, minBundleSize, startOffset, endOffset);
+ super(metadata, minBundleSize, startOffset, endOffset);
this.sourceDelegate = sourceDelegate;
this.channelFactory = channelFactory;
- boolean splittable = false;
+ boolean splittable;
try {
splittable = isSplittable();
} catch (Exception e) {
@@ -342,7 +333,8 @@ public class CompressedSource<T> extends FileBasedSource<T> {
}
checkArgument(
splittable || startOffset == 0,
- "CompressedSources must start reading at offset 0. Requested offset: " + startOffset);
+ "CompressedSources must start reading at offset 0. Requested offset: %s",
+ startOffset);
}
/**
@@ -361,9 +353,9 @@ public class CompressedSource<T> extends FileBasedSource<T> {
* source for a single file.
*/
@Override
- protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
- return new CompressedSource<>(sourceDelegate.createForSubrangeOfFile(fileName, start, end),
- channelFactory, fileName, sourceDelegate.getMinBundleSize(), start, end);
+ protected FileBasedSource<T> createForSubrangeOfFile(Metadata metadata, long start, long end) {
+ return new CompressedSource<>(sourceDelegate.createForSubrangeOfFile(metadata, start, end),
+ channelFactory, metadata, sourceDelegate.getMinBundleSize(), start, end);
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index 95e6078..b2a4075 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -18,32 +18,28 @@
package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,9 +52,9 @@ import org.slf4j.LoggerFactory;
* glob, a single file, or a offset range for a single file. See {@link OffsetBasedSource} and
* {@link org.apache.beam.sdk.io.range.RangeTracker} for semantics of offset ranges.
*
- * <p>This source stores a {@code String} that is an {@link IOChannelFactory} specification for a
- * file or file pattern. There should be an {@code IOChannelFactory} defined for the file
- * specification provided. Please refer to {@link IOChannelUtils} and {@link IOChannelFactory} for
+ * <p>This source stores a {@code String} that is a {@link FileSystems} specification for a
+ * file or file pattern. There should be a {@link FileSystem} registered for the file
+ * specification provided. Please refer to {@link FileSystems} and {@link FileSystem} for
* more information on this.
*
* <p>In addition to the methods left abstract from {@code BoundedSource}, subclasses must implement
@@ -70,16 +66,9 @@ import org.slf4j.LoggerFactory;
*/
public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedSource.class);
- private static final float FRACTION_OF_FILES_TO_STAT = 0.01f;
-
- // Package-private for testing
- static final int MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT = 100;
-
- // Size of the thread pool to be used for performing file operations in parallel.
- // Package-private for testing.
- static final int THREAD_POOL_SIZE = 128;
private final ValueProvider<String> fileOrPatternSpec;
+ @Nullable private MatchResult.Metadata singleFileMetadata;
private final Mode mode;
/**
@@ -91,25 +80,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
}
/**
- * Create a {@code FileBaseSource} based on a file or a file pattern specification. This
- * constructor must be used when creating a new {@code FileBasedSource} for a file pattern.
- *
- * <p>See {@link OffsetBasedSource} for a detailed description of {@code minBundleSize}.
- *
- * @param fileOrPatternSpec {@link IOChannelFactory} specification of file or file pattern
- * represented by the {@link FileBasedSource}.
- * @param minBundleSize minimum bundle size in bytes.
- */
- public FileBasedSource(String fileOrPatternSpec, long minBundleSize) {
- this(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
- }
-
- /**
* Create a {@code FileBaseSource} based on a file or a file pattern specification.
- * Same as the {@code String} constructor, but accepting a {@link ValueProvider}
- * to allow for runtime configuration of the source.
*/
- public FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
+ protected FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
super(0, Long.MAX_VALUE, minBundleSize);
mode = Mode.FILEPATTERN;
this.fileOrPatternSpec = fileOrPatternSpec;
@@ -124,18 +97,38 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
* <p>See {@link OffsetBasedSource} for detailed descriptions of {@code minBundleSize},
* {@code startOffset}, and {@code endOffset}.
*
- * @param fileName {@link IOChannelFactory} specification of the file represented by the
- * {@link FileBasedSource}.
+ * @param fileMetadata specification of the file represented by the {@link FileBasedSource}, in
+ * suitable form for use with {@link FileSystems#match(List)}.
* @param minBundleSize minimum bundle size in bytes.
* @param startOffset starting byte offset.
* @param endOffset ending byte offset. If the specified value {@code >= #getMaxEndOffset()} it
* implies {@code #getMaxEndOffSet()}.
*/
- public FileBasedSource(String fileName, long minBundleSize,
- long startOffset, long endOffset) {
+ protected FileBasedSource(
+ Metadata fileMetadata, long minBundleSize, long startOffset, long endOffset) {
super(startOffset, endOffset, minBundleSize);
mode = Mode.SINGLE_FILE_OR_SUBRANGE;
- this.fileOrPatternSpec = StaticValueProvider.of(fileName);
+ this.singleFileMetadata = checkNotNull(fileMetadata, "fileMetadata");
+ this.fileOrPatternSpec = StaticValueProvider.of(fileMetadata.resourceId().toString());
+ }
+
+ /**
+ * Returns the information about the single file that this source is reading from.
+ *
+ * @throws IllegalArgumentException if this source is in {@link Mode#FILEPATTERN} mode.
+ */
+ protected final MatchResult.Metadata getSingleFileMetadata() {
+ checkArgument(
+ mode == Mode.SINGLE_FILE_OR_SUBRANGE,
+ "This function should only be called for a single file, not %s",
+ this);
+ checkState(
+ singleFileMetadata != null,
+ "It should not be possible to construct a %s in mode %s with null metadata: %s",
+ FileBasedSource.class,
+ mode,
+ this);
+ return singleFileMetadata;
}
public final String getFileOrPatternSpec() {
@@ -163,10 +156,12 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
"End offset value %s of the subrange cannot be larger than the end offset value %s",
end,
getEndOffset());
+ checkState(
+ singleFileMetadata != null,
+ "A single file source should not have null metadata: %s",
+ this);
- checkState(fileOrPatternSpec.isAccessible(),
- "Subrange creation should only happen at execution time.");
- FileBasedSource<T> source = createForSubrangeOfFile(fileOrPatternSpec.get(), start, end);
+ FileBasedSource<T> source = createForSubrangeOfFile(singleFileMetadata, start, end);
if (start > 0 || end != Long.MAX_VALUE) {
checkArgument(source.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE,
"Source created for the range [%s,%s) must be a subrange source", start, end);
@@ -178,54 +173,51 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
* Creates and returns a new {@code FileBasedSource} of the same type as the current
* {@code FileBasedSource} backed by a given file and an offset range. When current source is
* being split, this method is used to generate new sub-sources. When creating the source
- * subclasses must call the constructor {@link #FileBasedSource(String, long, long, long)} of
+ * subclasses must call the constructor {@link #FileBasedSource(Metadata, long, long, long)} of
* {@code FileBasedSource} with corresponding parameter values passed here.
*
- * @param fileName file backing the new {@code FileBasedSource}.
+ * @param fileMetadata file backing the new {@code FileBasedSource}.
* @param start starting byte offset of the new {@code FileBasedSource}.
* @param end ending byte offset of the new {@code FileBasedSource}. May be Long.MAX_VALUE,
* in which case it will be inferred using {@link #getMaxEndOffset}.
*/
protected abstract FileBasedSource<T> createForSubrangeOfFile(
- String fileName, long start, long end);
+ Metadata fileMetadata, long start, long end);
/**
* Creates and returns an instance of a {@code FileBasedReader} implementation for the current
* source assuming the source represents a single file. File patterns will be handled by
* {@code FileBasedSource} implementation automatically.
*/
- protected abstract FileBasedReader<T> createSingleFileReader(
- PipelineOptions options);
+ protected abstract FileBasedReader<T> createSingleFileReader(PipelineOptions options);
@Override
public final long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
// This implementation of method getEstimatedSizeBytes is provided to simplify subclasses. Here
// we perform the size estimation of files and file patterns using the interface provided by
- // IOChannelFactory.
+ // FileSystem.
if (mode == Mode.FILEPATTERN) {
checkState(fileOrPatternSpec.isAccessible(),
"Size estimation should be done at execution time.");
- IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get());
- // TODO Implement a more efficient parallel/batch size estimation mechanism for file patterns.
- long startTime = System.currentTimeMillis();
+ String pattern = fileOrPatternSpec.get();
long totalSize = 0;
- Collection<String> inputs = factory.match(fileOrPatternSpec.get());
- if (inputs.size() <= MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT) {
- totalSize = getExactTotalSizeOfFiles(inputs, factory);
- LOG.debug("Size estimation of all files of pattern {} took {} ms",
- fileOrPatternSpec,
- System.currentTimeMillis() - startTime);
- } else {
- totalSize = getEstimatedSizeOfFilesBySampling(inputs, factory);
- LOG.debug("Size estimation of pattern {} by sampling took {} ms",
- fileOrPatternSpec,
- System.currentTimeMillis() - startTime);
+ List<MatchResult> inputs =
+ FileSystems.match(Collections.singletonList(pattern));
+ MatchResult result = Iterables.getOnlyElement(inputs);
+ checkArgument(
+ result.status() == Status.OK,
+ "Error matching the pattern or glob %s: status %s",
+ pattern,
+ result.status());
+ Metadata[] allMatches = result.metadata();
+ for (Metadata metadata : allMatches) {
+ totalSize += metadata.sizeBytes();
}
LOG.info(
"Filepattern {} matched {} files with total size {}",
fileOrPatternSpec.get(),
- inputs.size(),
+ allMatches.length,
totalSize);
return totalSize;
} else {
@@ -235,98 +227,15 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
}
}
- // Get the exact total size of the given set of files.
- // Invokes multiple requests for size estimation in parallel using a thread pool.
- // TODO: replace this with bulk request API when it is available. Will require updates
- // to IOChannelFactory interface.
- private static long getExactTotalSizeOfFiles(
- Collection<String> files, IOChannelFactory ioChannelFactory) throws IOException {
- List<ListenableFuture<Long>> futures = new ArrayList<>();
- ListeningExecutorService service =
- MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE));
- try {
- long totalSize = 0;
- for (String file : files) {
- futures.add(createFutureForSizeEstimation(file, ioChannelFactory, service));
- }
-
- for (Long val : Futures.allAsList(futures).get()) {
- totalSize += val;
- }
-
- return totalSize;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- } catch (ExecutionException e) {
- throw new IOException(e.getCause());
- } finally {
- service.shutdown();
- }
- }
-
- private static ListenableFuture<Long> createFutureForSizeEstimation(
- final String file,
- final IOChannelFactory ioChannelFactory,
- ListeningExecutorService service) {
- return service.submit(
- new Callable<Long>() {
- @Override
- public Long call() throws IOException {
- return ioChannelFactory.getSizeBytes(file);
- }
- });
- }
-
- // Estimate the total size of the given set of files through sampling and extrapolation.
- // Currently we use uniform sampling which requires a linear sampling size for a reasonable
- // estimate.
- // TODO: Implement a more efficient sampling mechanism.
- private static long getEstimatedSizeOfFilesBySampling(
- Collection<String> files, IOChannelFactory ioChannelFactory) throws IOException {
- int sampleSize = (int) (FRACTION_OF_FILES_TO_STAT * files.size());
- sampleSize = Math.max(MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT, sampleSize);
-
- List<String> selectedFiles = new ArrayList<String>(files);
- Collections.shuffle(selectedFiles);
- selectedFiles = selectedFiles.subList(0, sampleSize);
-
- long exactTotalSampleSize = getExactTotalSizeOfFiles(selectedFiles, ioChannelFactory);
- double avgSize = 1.0 * exactTotalSampleSize / selectedFiles.size();
- long totalSize = Math.round(files.size() * avgSize);
- LOG.info(
- "Sampling {} files gave {} total bytes ({} average per file), "
- + "inferring total size of {} files to be {}",
- selectedFiles.size(),
- exactTotalSampleSize,
- avgSize,
- files.size(),
- totalSize);
- return totalSize;
- }
-
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- String patternDisplay = getFileOrPatternSpecProvider().isAccessible()
- ? getFileOrPatternSpecProvider().get()
- : getFileOrPatternSpecProvider().toString();
- builder.add(DisplayData.item("filePattern", patternDisplay)
- .withLabel("File Pattern"));
- }
-
- private ListenableFuture<List<? extends FileBasedSource<T>>> createFutureForFileSplit(
- final String file,
- final long desiredBundleSizeBytes,
- final PipelineOptions options,
- ListeningExecutorService service) {
- return service.submit(new Callable<List<? extends FileBasedSource<T>>>() {
- @Override
- public List<? extends FileBasedSource<T>> call() throws Exception {
- return createForSubrangeOfFile(file, 0, Long.MAX_VALUE)
- .split(desiredBundleSizeBytes, options);
- }
- });
+ if (mode == Mode.FILEPATTERN) {
+ String patternDisplay = getFileOrPatternSpecProvider().isAccessible()
+ ? getFileOrPatternSpecProvider().get()
+ : getFileOrPatternSpecProvider().toString();
+ builder.add(DisplayData.item("filePattern", patternDisplay).withLabel("File Pattern"));
+ }
}
@Override
@@ -339,42 +248,38 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
if (mode == Mode.FILEPATTERN) {
long startTime = System.currentTimeMillis();
- List<ListenableFuture<List<? extends FileBasedSource<T>>>> futures = new ArrayList<>();
-
- ListeningExecutorService service =
- MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE));
- try {
- checkState(fileOrPatternSpec.isAccessible(),
- "Bundle splitting should only happen at execution time.");
- Collection<String> expandedFiles =
- FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
- checkArgument(!expandedFiles.isEmpty(),
- "Unable to find any files matching %s", fileOrPatternSpec.get());
- for (final String file : expandedFiles) {
- futures.add(createFutureForFileSplit(file, desiredBundleSizeBytes, options, service));
- }
- List<? extends FileBasedSource<T>> splitResults =
- ImmutableList.copyOf(Iterables.concat(Futures.allAsList(futures).get()));
- LOG.info(
- "Splitting filepattern {} into bundles of size {} took {} ms "
- + "and produced {} files and {} bundles",
- fileOrPatternSpec,
- desiredBundleSizeBytes,
- System.currentTimeMillis() - startTime,
- expandedFiles.size(),
- splitResults.size());
- return splitResults;
- } finally {
- service.shutdown();
+ checkState(fileOrPatternSpec.isAccessible(),
+ "Bundle splitting should only happen at execution time.");
+ List<Metadata> expandedFiles = FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
+ checkArgument(!expandedFiles.isEmpty(),
+ "Unable to find any files matching %s", fileOrPatternSpec.get());
+ List<FileBasedSource<T>> splitResults = new ArrayList<>(expandedFiles.size());
+ for (Metadata metadata : expandedFiles) {
+ FileBasedSource<T> split = createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
+ verify(split.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE,
+ "%s.createForSubrangeOfFile must return a source in mode %s",
+ split,
+ Mode.SINGLE_FILE_OR_SUBRANGE);
+ // The split is NOT in FILEPATTERN mode, so we can call its split without fear
+ // of recursion. This will break a single file into multiple splits when the file is
+ // splittable and larger than the desired bundle size.
+ splitResults.addAll(split.split(desiredBundleSizeBytes, options));
}
+ LOG.info(
+ "Splitting filepattern {} into bundles of size {} took {} ms "
+ + "and produced {} files and {} bundles",
+ fileOrPatternSpec,
+ desiredBundleSizeBytes,
+ System.currentTimeMillis() - startTime,
+ expandedFiles.size(),
+ splitResults.size());
+ return splitResults;
} else {
if (isSplittable()) {
- List<FileBasedSource<T>> splitResults = new ArrayList<>();
- for (OffsetBasedSource<T> split :
- super.split(desiredBundleSizeBytes, options)) {
- splitResults.add((FileBasedSource<T>) split);
- }
- return splitResults;
+ @SuppressWarnings("unchecked")
+ List<FileBasedSource<T>> splits =
+ (List<FileBasedSource<T>>) super.split(desiredBundleSizeBytes, options);
+ return splits;
} else {
LOG.debug("The source for file {} is not split into sub-range based sources since "
+ "the file is not seekable",
@@ -387,17 +292,23 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
/**
* Determines whether a file represented by this source is can be split into bundles.
*
- * <p>By default, a file is splittable if it is on a file system that supports efficient read
- * seeking. Subclasses may override to provide different behavior.
+ * <p>By default, a source in mode {@link Mode#FILEPATTERN} is always splittable, because
+ * splitting will involve expanding the file pattern and producing single-file/subrange sources,
+ * which may or may not be splittable themselves.
+ *
+ * <p>By default, a source in {@link Mode#SINGLE_FILE_OR_SUBRANGE} is splittable if it is on a
+ * file system that supports efficient read seeking.
+ *
+ * <p>Subclasses may override to provide different behavior.
*/
protected boolean isSplittable() throws Exception {
- // We split a file-based source into subranges only if the file is efficiently seekable.
- // If a file is not efficiently seekable it would be highly inefficient to create and read a
- // source based on a subrange of that file.
- checkState(fileOrPatternSpec.isAccessible(),
- "isSplittable should only be called at runtime.");
- IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get());
- return factory.isReadSeekEfficient(fileOrPatternSpec.get());
+ if (mode == Mode.FILEPATTERN) {
+ // split will expand file pattern and return single file or subrange sources that
+ // may or may not be splittable.
+ return true;
+ }
+
+ return getSingleFileMetadata().isReadSeekEfficient();
}
@Override
@@ -407,18 +318,12 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
if (mode == Mode.FILEPATTERN) {
long startTime = System.currentTimeMillis();
- Collection<String> files = FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
+ List<Metadata> fileMetadata = FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
List<FileBasedReader<T>> fileReaders = new ArrayList<>();
- for (String fileName : files) {
- long endOffset;
- try {
- endOffset = IOChannelUtils.getFactory(fileName).getSizeBytes(fileName);
- } catch (IOException e) {
- LOG.warn("Failed to get size of {}", fileName, e);
- endOffset = Long.MAX_VALUE;
- }
+ for (Metadata metadata : fileMetadata) {
+ long endOffset = metadata.sizeBytes();
fileReaders.add(
- createForSubrangeOfFile(fileName, 0, endOffset).createSingleFileReader(options));
+ createForSubrangeOfFile(metadata, 0, endOffset).createSingleFileReader(options));
}
LOG.debug(
"Creating a reader for file pattern {} took {} ms",
@@ -471,20 +376,15 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
public final long getMaxEndOffset(PipelineOptions options) throws IOException {
checkArgument(
mode != Mode.FILEPATTERN, "Cannot determine the exact end offset of a file pattern");
- if (getEndOffset() == Long.MAX_VALUE) {
- IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get());
- return factory.getSizeBytes(fileOrPatternSpec.get());
- } else {
- return getEndOffset();
- }
+ Metadata metadata = getSingleFileMetadata();
+ return metadata.sizeBytes();
}
- protected static final Collection<String> expandFilePattern(String fileOrPatternSpec)
- throws IOException {
- IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
- Collection<String> matches = factory.match(fileOrPatternSpec);
- LOG.info("Matched {} files for pattern {}", matches.size(), fileOrPatternSpec);
- return matches;
+ private static List<Metadata> expandFilePattern(String fileOrPatternSpec) throws IOException {
+ MatchResult matches =
+ Iterables.getOnlyElement(FileSystems.match(Collections.singletonList(fileOrPatternSpec)));
+ LOG.info("Matched {} files for pattern {}", matches.metadata().length, fileOrPatternSpec);
+ return ImmutableList.copyOf(matches.metadata());
}
/**
@@ -545,10 +445,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
@Override
protected final boolean startImpl() throws IOException {
FileBasedSource<T> source = getCurrentSource();
- IOChannelFactory factory = IOChannelUtils.getFactory(
- source.getFileOrPatternSpecProvider().get());
- this.channel = factory.open(source.getFileOrPatternSpecProvider().get());
-
+ this.channel = FileSystems.open(source.getSingleFileMetadata().resourceId());
if (channel instanceof SeekableByteChannel) {
SeekableByteChannel seekChannel = (SeekableByteChannel) channel;
seekChannel.position(source.getStartOffset());
@@ -585,6 +482,16 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
}
}
+ @Override
+ public boolean allowsDynamicSplitting() {
+ try {
+ return getCurrentSource().isSplittable();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Error determining if %s allows dynamic splitting", this), e);
+ }
+ }
+
/**
* Performs any initialization of the subclass of {@code FileBasedReader} that involves IO
* operations. Will only be invoked once and before that invocation the base class will seek the
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index 96306dc..aa247c3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -31,12 +31,13 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
-
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -46,7 +47,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
-
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
@@ -111,6 +111,34 @@ public class FileSystems {
}
/**
+ * Returns the {@link Metadata} for a single file resource. Expects a resource specification
+ * {@code spec} that matches a single result.
+ *
+ * @param spec a resource specification that matches exactly one result.
+ * @return the {@link Metadata} for the specified resource.
+ * @throws IOException in the event of an error in the inner call to {@link #match},
+ * or if the given spec does not match exactly 1 result.
+ */
+ public static Metadata matchSingleFileSpec(String spec) throws IOException {
+ List<MatchResult> matches = FileSystems.match(Collections.singletonList(spec));
+ MatchResult matchResult = Iterables.getOnlyElement(matches);
+ if (matchResult.status() != Status.OK) {
+ throw new IOException(
+ String.format("Error matching file spec %s: status %s", spec, matchResult.status()));
+ }
+ Metadata[] metadata = matchResult.metadata();
+ if (metadata.length != 1) {
+ throw new IOException(
+ String.format(
+ "Expecting spec %s to match exactly one file, but matched %s: %s",
+ spec,
+ metadata.length,
+ Arrays.toString(metadata)));
+ }
+ return metadata[0];
+ }
+
+ /**
* Returns {@link MatchResult MatchResults} for the given {@link ResourceId resourceIds}.
*
* @param resourceIds {@link ResourceId resourceIds} that might be derived from {@link #match},
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
index 2272a06..091e955 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
@@ -35,7 +35,8 @@ import org.apache.commons.lang3.SystemUtils;
*/
class LocalResourceId implements ResourceId {
- private final Path path;
+ private final String pathString;
+ private transient volatile Path cachedPath;
private final boolean isDirectory;
static LocalResourceId fromPath(Path path, boolean isDirectory) {
@@ -44,7 +45,7 @@ class LocalResourceId implements ResourceId {
}
private LocalResourceId(Path path, boolean isDirectory) {
- this.path = path.normalize();
+ this.pathString = path.normalize().toString();
this.isDirectory = isDirectory;
}
@@ -52,11 +53,12 @@ class LocalResourceId implements ResourceId {
public LocalResourceId resolve(String other, ResolveOptions resolveOptions) {
checkState(
isDirectory,
- String.format("Expected the path is a directory, but had [%s].", path));
+ "Expected the path is a directory, but had [%s].",
+ pathString);
checkArgument(
resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)
|| resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY),
- String.format("ResolveOptions: [%s] is not supported.", resolveOptions));
+ "ResolveOptions: [%s] is not supported.", resolveOptions);
checkArgument(
!(resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)
&& other.endsWith("/")),
@@ -73,13 +75,15 @@ class LocalResourceId implements ResourceId {
if (isDirectory) {
return this;
} else {
+ Path path = getPath();
Path parent = path.getParent();
if (parent == null && path.getNameCount() == 1) {
parent = Paths.get(".");
}
checkState(
parent != null,
- String.format("Failed to get the current directory for path: [%s].", path));
+ "Failed to get the current directory for path: [%s].",
+ pathString);
return fromPath(
parent,
true /* isDirectory */);
@@ -88,13 +92,13 @@ class LocalResourceId implements ResourceId {
private LocalResourceId resolveLocalPath(String other, ResolveOptions resolveOptions) {
return new LocalResourceId(
- path.resolve(other),
+ getPath().resolve(other),
resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY));
}
private LocalResourceId resolveLocalPathWindowsOS(String other, ResolveOptions resolveOptions) {
String uuid = UUID.randomUUID().toString();
- Path pathAsterisksReplaced = Paths.get(path.toString().replaceAll("\\*", uuid));
+ Path pathAsterisksReplaced = Paths.get(pathString.replaceAll("\\*", uuid));
String otherAsterisksReplaced = other.replaceAll("\\*", uuid);
return new LocalResourceId(
@@ -111,12 +115,15 @@ class LocalResourceId implements ResourceId {
}
Path getPath() {
- return path;
+ if (cachedPath == null) {
+ cachedPath = Paths.get(pathString);
+ }
+ return cachedPath;
}
@Override
public String toString() {
- return path.toString();
+ return pathString;
}
@Override
@@ -125,12 +132,12 @@ class LocalResourceId implements ResourceId {
return false;
}
LocalResourceId other = (LocalResourceId) obj;
- return this.path.equals(other.path)
+ return this.pathString.equals(other.pathString)
&& this.isDirectory == other.isDirectory;
}
@Override
public int hashCode() {
- return Objects.hash(path, isDirectory);
+ return Objects.hash(pathString, isDirectory);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 34dbe21..a920283 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.Read.Bounded;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -444,7 +445,7 @@ public class TFRecordIO {
static class TFRecordSource extends FileBasedSource<byte[]> {
@VisibleForTesting
TFRecordSource(String fileSpec) {
- super(fileSpec, 1L);
+ super(StaticValueProvider.of(fileSpec), 1L);
}
@VisibleForTesting
@@ -452,17 +453,17 @@ public class TFRecordIO {
super(fileSpec, Long.MAX_VALUE);
}
- private TFRecordSource(String fileName, long start, long end) {
- super(fileName, Long.MAX_VALUE, start, end);
+ private TFRecordSource(Metadata metadata, long start, long end) {
+ super(metadata, Long.MAX_VALUE, start, end);
}
@Override
protected FileBasedSource<byte[]> createForSubrangeOfFile(
- String fileName,
+ Metadata metadata,
long start,
long end) {
checkArgument(start == 0, "TFRecordSource is not splittable");
- return new TFRecordSource(fileName, start, end);
+ return new TFRecordSource(metadata, start, end);
}
@Override
@@ -501,6 +502,12 @@ public class TFRecordIO {
}
@Override
+ public boolean allowsDynamicSplitting() {
+ /* TFRecords cannot be dynamically split. */
+ return false;
+ }
+
+ @Override
protected long getCurrentOffset() throws NoSuchElementException {
if (!elementIsPresent) {
throw new NoSuchElementException();
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index fbd76df..2dac73a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -24,7 +24,6 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
-
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.ByteBuffer;
@@ -35,15 +34,14 @@ import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.NoSuchElementException;
import java.util.regex.Pattern;
-
import javax.annotation.Nullable;
-
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
import org.apache.beam.sdk.io.Read.Bounded;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -111,9 +109,6 @@ import org.apache.beam.sdk.values.PDone;
* }</pre>
*/
public class TextIO {
- /** The default coder, which returns each line of the input file as a string. */
- public static final Coder<String> DEFAULT_TEXT_CODER = StringUtf8Coder.of();
-
/**
* A {@link PTransform} that reads from a text file (or multiple text
* files matching a pattern) and returns a {@link PCollection} containing
@@ -316,7 +311,7 @@ public class TextIO {
@Override
protected Coder<String> getDefaultOutputCoder() {
- return DEFAULT_TEXT_CODER;
+ return StringUtf8Coder.of();
}
public String getFilepattern() {
@@ -871,7 +866,7 @@ public class TextIO {
/** The Coder to use to decode each line. */
@VisibleForTesting
TextSource(String fileSpec) {
- super(fileSpec, 1L);
+ super(StaticValueProvider.of(fileSpec), 1L);
}
@VisibleForTesting
@@ -879,16 +874,16 @@ public class TextIO {
super(fileSpec, 1L);
}
- private TextSource(String fileName, long start, long end) {
- super(fileName, 1L, start, end);
+ private TextSource(Metadata metadata, long start, long end) {
+ super(metadata, 1L, start, end);
}
@Override
protected FileBasedSource<String> createForSubrangeOfFile(
- String fileName,
+ Metadata metadata,
long start,
long end) {
- return new TextSource(fileName, start, end);
+ return new TextSource(metadata, start, end);
}
@Override
@@ -898,7 +893,7 @@ public class TextIO {
@Override
public Coder<String> getDefaultOutputCoder() {
- return DEFAULT_TEXT_CODER;
+ return StringUtf8Coder.of();
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
index 7416c85..4b7d3b4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
@@ -42,7 +42,9 @@ import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.JAXBCoder;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.codehaus.stax2.XMLInputFactory2;
@@ -54,18 +56,18 @@ public class XmlSource<T> extends FileBasedSource<T> {
private final XmlIO.Read<T> spec;
XmlSource(XmlIO.Read<T> spec) {
- super(spec.getFileOrPatternSpec(), spec.getMinBundleSize());
+ super(StaticValueProvider.of(spec.getFileOrPatternSpec()), spec.getMinBundleSize());
this.spec = spec;
}
- private XmlSource(XmlIO.Read<T> spec, long startOffset, long endOffset) {
- super(spec.getFileOrPatternSpec(), spec.getMinBundleSize(), startOffset, endOffset);
+ private XmlSource(XmlIO.Read<T> spec, Metadata metadata, long startOffset, long endOffset) {
+ super(metadata, spec.getMinBundleSize(), startOffset, endOffset);
this.spec = spec;
}
@Override
- protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
- return new XmlSource<T>(spec.from(fileName), start, end);
+ protected FileBasedSource<T> createForSubrangeOfFile(Metadata metadata, long start, long end) {
+ return new XmlSource<T>(spec.from(metadata.toString()), metadata, start, end);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
index 80ee00f..ab11d66 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.fs;
import com.google.auto.value.AutoValue;
import java.io.IOException;
+import java.io.Serializable;
/**
* The result of {@link org.apache.beam.sdk.io.FileSystem#match}.
@@ -92,7 +93,7 @@ public abstract class MatchResult {
* {@link Metadata} of a matched file.
*/
@AutoValue
- public abstract static class Metadata {
+ public abstract static class Metadata implements Serializable {
public abstract ResourceId resourceId();
public abstract long sizeBytes();
public abstract boolean isReadSeekEfficient();
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
index 938e24a..b7859ca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.fs;
+import java.io.Serializable;
import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
@@ -28,7 +29,7 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
*
* <p>TODO: add examples for how ResourceId is constructed and used.
*/
-public interface ResourceId {
+public interface ResourceId extends Serializable {
/**
* Returns a child {@code ResourceId} under {@code this}.
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
index d855b34..232f5eb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.util;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@@ -25,6 +28,8 @@ import java.util.Arrays;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
/**
* A set of utilities for working with Avro files.
@@ -43,9 +48,9 @@ public class AvroUtils {
private String schemaString;
AvroMetadata(byte[] syncMarker, String codec, String schemaString) {
- this.syncMarker = syncMarker;
- this.codec = codec;
- this.schemaString = schemaString;
+ this.syncMarker = checkNotNull(syncMarker, "syncMarker");
+ this.codec = checkNotNull(codec, "codec");
+ this.schemaString = checkNotNull(schemaString, "schemaString");
}
/**
@@ -74,6 +79,11 @@ public class AvroUtils {
}
}
+ @Deprecated // to be deleted
+ public static AvroMetadata readMetadataFromFile(String filename) throws IOException {
+ return readMetadataFromFile(FileSystems.matchSingleFileSpec(filename).resourceId());
+ }
+
/**
* Reads the {@link AvroMetadata} from the header of an Avro file.
*
@@ -83,12 +93,11 @@ public class AvroUtils {
*
* @throws IOException if the file is an invalid format.
*/
- public static AvroMetadata readMetadataFromFile(String fileName) throws IOException {
+ public static AvroMetadata readMetadataFromFile(ResourceId fileResource) throws IOException {
String codec = null;
String schemaString = null;
byte[] syncMarker;
- try (InputStream stream =
- Channels.newInputStream(IOChannelUtils.getFactory(fileName).open(fileName))) {
+ try (InputStream stream = Channels.newInputStream(FileSystems.open(fileResource))) {
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
// The header of an object container file begins with a four-byte magic number, followed
@@ -101,7 +110,7 @@ public class AvroUtils {
byte[] magic = new byte[DataFileConstants.MAGIC.length];
decoder.readFixed(magic);
if (!Arrays.equals(magic, DataFileConstants.MAGIC)) {
- throw new IOException("Missing Avro file signature: " + fileName);
+ throw new IOException("Missing Avro file signature: " + fileResource);
}
// Read the metadata to find the codec and schema.
@@ -132,6 +141,7 @@ public class AvroUtils {
syncMarker = new byte[DataFileConstants.SYNC_SIZE];
decoder.readFixed(syncMarker);
}
+ checkState(schemaString != null, "No schema present in Avro file metadata %s", fileResource);
return new AvroMetadata(syncMarker, codec, schemaString);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index 78485c7..64b0027 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.io.AvroSource.AvroReader;
import org.apache.beam.sdk.io.AvroSource.AvroReader.Seeker;
import org.apache.beam.sdk.io.BlockBasedSource.BlockBasedReader;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.SourceTestUtils;
@@ -433,8 +434,9 @@ public class AvroSourceTest {
List<Bird> birds = createRandomRecords(100);
String filename = generateTestFile("tmp.avro", birds, SyncBehavior.SYNC_DEFAULT, 0,
AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
- String schemaA = AvroUtils.readMetadataFromFile(filename).getSchemaString();
- String schemaB = AvroUtils.readMetadataFromFile(filename).getSchemaString();
+ Metadata fileMetadata = FileSystems.matchSingleFileSpec(filename);
+ String schemaA = AvroUtils.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
+ String schemaB = AvroUtils.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
assertNotSame(schemaA, schemaB);
AvroSource<GenericRecord> sourceA = AvroSource.from(filename).withSchema(schemaA);
@@ -451,14 +453,15 @@ public class AvroSourceTest {
List<Bird> birds = createRandomRecords(100);
String filename = generateTestFile("tmp.avro", birds, SyncBehavior.SYNC_DEFAULT, 0,
AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
- String schemaA = AvroUtils.readMetadataFromFile(filename).getSchemaString();
- String schemaB = AvroUtils.readMetadataFromFile(filename).getSchemaString();
+ Metadata fileMetadata = FileSystems.matchSingleFileSpec(filename);
+ String schemaA = AvroUtils.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
+ String schemaB = AvroUtils.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
assertNotSame(schemaA, schemaB);
AvroSource<GenericRecord> sourceA = (AvroSource<GenericRecord>) AvroSource.from(filename)
- .withSchema(schemaA).createForSubrangeOfFile(filename, 0L, 0L);
+ .withSchema(schemaA).createForSubrangeOfFile(fileMetadata, 0L, 0L);
AvroSource<GenericRecord> sourceB = (AvroSource<GenericRecord>) AvroSource.from(filename)
- .withSchema(schemaB).createForSubrangeOfFile(filename, 0L, 0L);
+ .withSchema(schemaB).createForSubrangeOfFile(fileMetadata, 0L, 0L);
assertSame(sourceA.getReadSchema(), sourceA.getFileSchema());
assertSame(sourceA.getReadSchema(), sourceB.getReadSchema());
assertSame(sourceA.getReadSchema(), sourceB.getFileSchema());
http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index 5cf3ada..014e16e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -56,8 +56,10 @@ import org.apache.beam.sdk.io.CompressedSource.CompressedReader;
import org.apache.beam.sdk.io.CompressedSource.CompressionMode;
import org.apache.beam.sdk.io.CompressedSource.DecompressingChannelFactory;
import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
@@ -567,16 +569,16 @@ public class CompressedSourceTest {
*/
private static class ByteSource extends FileBasedSource<Byte> {
public ByteSource(String fileOrPatternSpec, long minBundleSize) {
- super(fileOrPatternSpec, minBundleSize);
+ super(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
}
- public ByteSource(String fileName, long minBundleSize, long startOffset, long endOffset) {
- super(fileName, minBundleSize, startOffset, endOffset);
+ public ByteSource(Metadata metadata, long minBundleSize, long startOffset, long endOffset) {
+ super(metadata, minBundleSize, startOffset, endOffset);
}
@Override
- protected FileBasedSource<Byte> createForSubrangeOfFile(String fileName, long start, long end) {
- return new ByteSource(fileName, getMinBundleSize(), start, end);
+ protected ByteSource createForSubrangeOfFile(Metadata metadata, long start, long end) {
+ return new ByteSource(metadata, getMinBundleSize(), start, end);
}
@Override
[3/3] beam git commit: This closes #2563
Posted by dh...@apache.org.
This closes #2563
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e4491888
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e4491888
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e4491888
Branch: refs/heads/master
Commit: e44918881794ef80c83a6a119ec52861bece28a9
Parents: 4f8b1cc 49b8b23
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 20 15:10:14 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 20 15:10:14 2017 -0700
----------------------------------------------------------------------
.../UnboundedReadFromBoundedSource.java | 3 +-
.../UnboundedReadFromBoundedSourceTest.java | 14 +-
.../translation/wrappers/SourceInputFormat.java | 3 +-
.../beam/runners/spark/io/MicrobatchSource.java | 4 +-
.../java/org/apache/beam/sdk/io/AvroSource.java | 43 +--
.../apache/beam/sdk/io/BlockBasedSource.java | 10 +-
.../apache/beam/sdk/io/CompressedSource.java | 26 +-
.../org/apache/beam/sdk/io/FileBasedSource.java | 355 +++++++------------
.../org/apache/beam/sdk/io/FileSystems.java | 32 +-
.../org/apache/beam/sdk/io/LocalResourceId.java | 29 +-
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 17 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 21 +-
.../java/org/apache/beam/sdk/io/XmlSource.java | 12 +-
.../org/apache/beam/sdk/io/fs/MatchResult.java | 3 +-
.../org/apache/beam/sdk/io/fs/ResourceId.java | 3 +-
.../org/apache/beam/sdk/util/AvroUtils.java | 24 +-
.../org/apache/beam/sdk/io/AvroSourceTest.java | 15 +-
.../beam/sdk/io/CompressedSourceTest.java | 12 +-
.../apache/beam/sdk/io/FileBasedSourceTest.java | 192 +++-------
.../org/apache/beam/sdk/util/AvroUtilsTest.java | 9 +-
.../java/org/apache/beam/sdk/util/GcsUtil.java | 5 +-
.../org/apache/beam/sdk/util/gcsfs/GcsPath.java | 5 +-
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 6 +-
.../inputformat/HadoopInputFormatIOTest.java | 2 +-
24 files changed, 349 insertions(+), 496 deletions(-)
----------------------------------------------------------------------