You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/20 22:10:23 UTC

[1/3] beam git commit: [BEAM-59] FileBasedSource: convert to FileSystem

Repository: beam
Updated Branches:
  refs/heads/master 4f8b1cc22 -> e44918881


http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index 94a29da..c15e667 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -26,11 +26,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.when;
 
-import com.google.common.collect.ImmutableList;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
@@ -39,21 +38,23 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Random;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader;
 import org.apache.beam.sdk.io.Source.Reader;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Rule;
 import org.junit.Test;
@@ -62,7 +63,6 @@ import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
 
 /**
  * Tests code common to all file-based sources.
@@ -70,7 +70,7 @@ import org.mockito.Mockito;
 @RunWith(JUnit4.class)
 public class FileBasedSourceTest {
 
-  Random random = new Random(0L);
+  private Random random = new Random(0L);
 
   @Rule public final TestPipeline p = TestPipeline.create();
   @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
@@ -89,16 +89,16 @@ public class FileBasedSourceTest {
     final String splitHeader;
 
     public TestFileBasedSource(String fileOrPattern, long minBundleSize, String splitHeader) {
-      super(fileOrPattern, minBundleSize);
+      super(StaticValueProvider.of(fileOrPattern), minBundleSize);
       this.splitHeader = splitHeader;
     }
 
     public TestFileBasedSource(
-        String fileOrPattern,
+        Metadata fileOrPattern,
         long minBundleSize,
         long startOffset,
         long endOffset,
-        String splitHeader) {
+        @Nullable String splitHeader) {
       super(fileOrPattern, minBundleSize, startOffset, endOffset);
       this.splitHeader = splitHeader;
     }
@@ -113,7 +113,7 @@ public class FileBasedSourceTest {
 
     @Override
     protected FileBasedSource<String> createForSubrangeOfFile(
-        String fileName, long start, long end) {
+        Metadata fileName, long start, long end) {
       return new TestFileBasedSource(fileName, getMinBundleSize(), start, end, splitHeader);
     }
 
@@ -397,30 +397,12 @@ public class FileBasedSourceTest {
   }
 
   @Test
-  public void testSplittingUsingFullThreadPool() throws Exception {
-    int numFiles = FileBasedSource.THREAD_POOL_SIZE * 5;
-    File file0 = null;
-    for (int i = 0; i < numFiles; i++) {
-      List<String> data = createStringDataset(3, 1000);
-      File file = createFileWithData("file" + i, data);
-      if (i == 0) {
-        file0 = file;
-      }
-    }
-
-    TestFileBasedSource source =
-        new TestFileBasedSource(file0.getParent() + "/" + "file*", Long.MAX_VALUE, null);
-    List<? extends BoundedSource<String>> splits = source.split(Long.MAX_VALUE, null);
-    assertEquals(numFiles, splits.size());
-  }
-
-  @Test
   public void testSplittingFailsOnEmptyFileExpansion() throws Exception {
     PipelineOptions options = PipelineOptionsFactory.create();
     String missingFilePath = tempFolder.newFolder().getAbsolutePath() + "/missing.txt";
     TestFileBasedSource source = new TestFileBasedSource(missingFilePath, Long.MAX_VALUE, null);
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(String.format("Unable to find any files matching %s", missingFilePath));
+    thrown.expect(FileNotFoundException.class);
+    thrown.expectMessage(String.format("No files found for spec: %s", missingFilePath));
     source.split(1234, options);
   }
 
@@ -460,16 +442,7 @@ public class FileBasedSourceTest {
     PipelineOptions options = PipelineOptionsFactory.create();
     File file1 = createFileWithData("file1", new ArrayList<String>());
 
-    IOChannelFactory mockIOFactory = Mockito.mock(IOChannelFactory.class);
-    String parent = file1.getParent();
-    String pattern = "mocked://test";
-    when(mockIOFactory.match(pattern))
-        .thenReturn(
-            ImmutableList.of(
-                new File(parent, "file1").getPath(),
-                new File(parent, "file2").getPath(),
-                new File(parent, "file3").getPath()));
-    IOChannelUtils.setIOFactoryInternal("mocked", mockIOFactory, true /* override */);
+    String pattern = file1.getParent() + "/file*";
 
     List<String> data2 = createStringDataset(3, 50);
     createFileWithData("file2", data2);
@@ -496,9 +469,10 @@ public class FileBasedSourceTest {
     String fileName = "file";
     File file = createFileWithData(fileName, data);
 
-    TestFileBasedSource source1 = new TestFileBasedSource(file.getPath(), 64, 0, 25, null);
+    Metadata metadata = FileSystems.matchSingleFileSpec(file.getPath());
+    TestFileBasedSource source1 = new TestFileBasedSource(metadata, 64, 0, 25, null);
     TestFileBasedSource source2 =
-        new TestFileBasedSource(file.getPath(), 64, 25, Long.MAX_VALUE, null);
+        new TestFileBasedSource(metadata, 64, 25, Long.MAX_VALUE, null);
 
     List<String> results = new ArrayList<String>();
     results.addAll(readFromSource(source1, options));
@@ -523,7 +497,7 @@ public class FileBasedSourceTest {
     List<String> expectedResults = new ArrayList<String>();
     expectedResults.addAll(data);
     // Remove all occurrences of header from expected results.
-    expectedResults.removeAll(Arrays.asList(header));
+    expectedResults.removeAll(Collections.singletonList(header));
 
     assertEquals(expectedResults, readFromSource(source, options));
   }
@@ -540,9 +514,10 @@ public class FileBasedSourceTest {
     String fileName = "file";
     File file = createFileWithData(fileName, data);
 
-    TestFileBasedSource source1 = new TestFileBasedSource(file.getPath(), 64, 0, 60, header);
+    Metadata metadata = FileSystems.matchSingleFileSpec(file.getPath());
+    TestFileBasedSource source1 = new TestFileBasedSource(metadata, 64, 0, 60, header);
     TestFileBasedSource source2 =
-        new TestFileBasedSource(file.getPath(), 64, 60, Long.MAX_VALUE, header);
+        new TestFileBasedSource(metadata, 64, 60, Long.MAX_VALUE, header);
 
     List<String> expectedResults = new ArrayList<String>();
     expectedResults.addAll(data);
@@ -568,16 +543,17 @@ public class FileBasedSourceTest {
     String fileName = "file";
     File file = createFileWithData(fileName, data);
 
-    TestFileBasedSource source1 = new TestFileBasedSource(file.getPath(), 64, 0, 42, header);
-    TestFileBasedSource source2 = new TestFileBasedSource(file.getPath(), 64, 42, 112, header);
+    Metadata metadata = FileSystems.matchSingleFileSpec(file.getPath());
+    TestFileBasedSource source1 = new TestFileBasedSource(metadata, 64, 0, 42, header);
+    TestFileBasedSource source2 = new TestFileBasedSource(metadata, 64, 42, 112, header);
     TestFileBasedSource source3 =
-        new TestFileBasedSource(file.getPath(), 64, 112, Long.MAX_VALUE, header);
+        new TestFileBasedSource(metadata, 64, 112, Long.MAX_VALUE, header);
 
     List<String> expectedResults = new ArrayList<String>();
 
     expectedResults.addAll(data);
     // Remove all occurrences of header from expected results.
-    expectedResults.removeAll(Arrays.asList(header));
+    expectedResults.removeAll(Collections.singletonList(header));
 
     List<String> results = new ArrayList<>();
     results.addAll(readFromSource(source1, options));
@@ -599,16 +575,17 @@ public class FileBasedSourceTest {
     String fileName = "file";
     File file = createFileWithData(fileName, data);
 
-    TestFileBasedSource source1 = new TestFileBasedSource(file.getPath(), 64, 0, 42, header);
-    TestFileBasedSource source2 = new TestFileBasedSource(file.getPath(), 64, 42, 62, header);
+    Metadata metadata = FileSystems.matchSingleFileSpec(file.getPath());
+    TestFileBasedSource source1 = new TestFileBasedSource(metadata, 64, 0, 42, header);
+    TestFileBasedSource source2 = new TestFileBasedSource(metadata, 64, 42, 62, header);
     TestFileBasedSource source3 =
-        new TestFileBasedSource(file.getPath(), 64, 62, Long.MAX_VALUE, header);
+        new TestFileBasedSource(metadata, 64, 62, Long.MAX_VALUE, header);
 
     List<String> expectedResults = new ArrayList<String>();
 
     expectedResults.addAll(data);
     // Remove all occurrences of header from expected results.
-    expectedResults.removeAll(Arrays.asList(header));
+    expectedResults.removeAll(Collections.singletonList(header));
 
     List<String> results = new ArrayList<>();
     results.addAll(readFromSource(source1, options));
@@ -633,19 +610,20 @@ public class FileBasedSourceTest {
     List<String> expectedResults = new ArrayList<String>();
     expectedResults.addAll(data.subList(10, data.size()));
     // Remove all occurrences of header from expected results.
-    expectedResults.removeAll(Arrays.asList(header));
+    expectedResults.removeAll(Collections.singletonList(header));
 
+    Metadata metadata = FileSystems.matchSingleFileSpec(file.getPath());
     // Split starts after "<" of the header
     TestFileBasedSource source =
-        new TestFileBasedSource(file.getPath(), 64, 1, Long.MAX_VALUE, header);
+        new TestFileBasedSource(metadata, 64, 1, Long.MAX_VALUE, header);
     assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
 
     // Split starts after "<h" of the header
-    source = new TestFileBasedSource(file.getPath(), 64, 2, Long.MAX_VALUE, header);
+    source = new TestFileBasedSource(metadata, 64, 2, Long.MAX_VALUE, header);
     assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
 
     // Split starts after "<h>" of the header
-    source = new TestFileBasedSource(file.getPath(), 64, 3, Long.MAX_VALUE, header);
+    source = new TestFileBasedSource(metadata, 64, 3, Long.MAX_VALUE, header);
     assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
   }
 
@@ -656,10 +634,11 @@ public class FileBasedSourceTest {
     String fileName = "file";
     File file = createFileWithData(fileName, data);
 
-    TestFileBasedSource source1 = new TestFileBasedSource(file.getPath(), 64, 0, 52, null);
-    TestFileBasedSource source2 = new TestFileBasedSource(file.getPath(), 64, 52, 72, null);
+    Metadata metadata = FileSystems.matchSingleFileSpec(file.getPath());
+    TestFileBasedSource source1 = new TestFileBasedSource(metadata, 64, 0, 52, null);
+    TestFileBasedSource source2 = new TestFileBasedSource(metadata, 64, 52, 72, null);
     TestFileBasedSource source3 =
-        new TestFileBasedSource(file.getPath(), 64, 72, Long.MAX_VALUE, null);
+        new TestFileBasedSource(metadata, 64, 72, Long.MAX_VALUE, null);
 
     List<String> results = new ArrayList<>();
     results.addAll(readFromSource(source1, options));
@@ -677,9 +656,10 @@ public class FileBasedSourceTest {
     String fileName = "file";
     File file = createFileWithData(fileName, data);
 
-    TestFileBasedSource source1 = new TestFileBasedSource(file.getPath(), 64, 0, 162, null);
+    Metadata metadata = FileSystems.matchSingleFileSpec(file.getPath());
+    TestFileBasedSource source1 = new TestFileBasedSource(metadata, 64, 0, 162, null);
     TestFileBasedSource source2 =
-        new TestFileBasedSource(file.getPath(), 1024, 162, Long.MAX_VALUE, null);
+        new TestFileBasedSource(metadata, 1024, 162, Long.MAX_VALUE, null);
 
     List<String> results = new ArrayList<>();
     results.addAll(readFromSource(source1, options));
@@ -793,74 +773,6 @@ public class FileBasedSourceTest {
   }
 
   @Test
-  public void testEstimatedSizeOfFilePatternAllThreads() throws Exception {
-    File file0 = null;
-    int numFiles = FileBasedSource.THREAD_POOL_SIZE * 5;
-    long totalSize = 0;
-    for (int i = 0; i < numFiles; i++) {
-      List<String> data = createStringDataset(3, 20);
-      File file = createFileWithData("file" + i, data);
-      if (i == 0) {
-        file0 = file;
-      }
-      totalSize += file.length();
-    }
-
-    TestFileBasedSource source =
-        new TestFileBasedSource(new File(file0.getParent(), "file*").getPath(), 64, null);
-
-    // Since all files are of equal size, sampling should produce the exact result.
-    assertEquals(totalSize, source.getEstimatedSizeBytes(null));
-  }
-
-  @Test
-  public void testEstimatedSizeOfFilePatternThroughSamplingEqualSize() throws Exception {
-    // When all files are of equal size, we should get the exact size.
-    int numFilesToTest = FileBasedSource.MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT * 2;
-    File file0 = null;
-    for (int i = 0; i < numFilesToTest; i++) {
-      List<String> data = createStringDataset(3, 20);
-      File file = createFileWithData("file" + i, data);
-      if (i == 0) {
-        file0 = file;
-      }
-    }
-
-    long actualTotalSize = file0.length() * numFilesToTest;
-    TestFileBasedSource source =
-        new TestFileBasedSource(new File(file0.getParent(), "file*").getPath(), 64, null);
-    assertEquals(actualTotalSize, source.getEstimatedSizeBytes(null));
-  }
-
-  @Test
-  public void testEstimatedSizeOfFilePatternThroughSamplingDifferentSizes() throws Exception {
-    float tolerableError = 0.2f;
-    int numFilesToTest = FileBasedSource.MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT * 2;
-    File file0 = null;
-
-    // Keeping sizes of files close to each other to make sure that the test passes reliably.
-    Random rand = new Random(System.currentTimeMillis());
-    int dataSizeBase = 100;
-    int dataSizeDelta = 10;
-
-    long actualTotalSize = 0;
-    for (int i = 0; i < numFilesToTest; i++) {
-      List<String> data = createStringDataset(
-          3, (int) (dataSizeBase + rand.nextFloat() * dataSizeDelta * 2 - dataSizeDelta));
-      File file = createFileWithData("file" + i, data);
-      if (i == 0) {
-        file0 = file;
-      }
-      actualTotalSize += file.length();
-    }
-
-    TestFileBasedSource source =
-        new TestFileBasedSource(new File(file0.getParent(), "file*").getPath(), 64, null);
-    assertEquals((double) actualTotalSize, (double) source.getEstimatedSizeBytes(null),
-        actualTotalSize * tolerableError);
-  }
-
-  @Test
   public void testReadAllSplitsOfFilePattern() throws Exception {
     PipelineOptions options = PipelineOptionsFactory.create();
     List<String> data1 = createStringDataset(3, 50);
@@ -900,7 +812,8 @@ public class FileBasedSourceTest {
     PipelineOptions options = PipelineOptionsFactory.create();
     File file = createFileWithData("file", createStringDataset(3, 100));
 
-    TestFileBasedSource source = new TestFileBasedSource(file.getPath(), 1, 0, file.length(), null);
+    Metadata metadata = FileSystems.matchSingleFileSpec(file.getPath());
+    TestFileBasedSource source = new TestFileBasedSource(metadata, 1, 0, file.length(), null);
     // Shouldn't be able to split while unstarted.
     assertSplitAtFractionFails(source, 0, 0.7, options);
     assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.7, options);
@@ -918,21 +831,16 @@ public class FileBasedSourceTest {
     // Smaller file for exhaustive testing.
     File file = createFileWithData("file", createStringDataset(3, 20));
 
-    TestFileBasedSource source = new TestFileBasedSource(file.getPath(), 1, 0, file.length(), null);
+    Metadata metadata = FileSystems.matchSingleFileSpec(file.getPath());
+    TestFileBasedSource source = new TestFileBasedSource(metadata, 1, 0, file.length(), null);
     assertSplitAtFractionExhaustive(source, options);
   }
 
   @Test
   public void testToStringFile() throws Exception {
-    String path = "/tmp/foo";
-    TestFileBasedSource source = new TestFileBasedSource(path, 1, 0, 10, null);
-    assertEquals(String.format("%s range [0, 10)", path), source.toString());
-  }
-
-  @Test
-  public void testToStringPattern() throws Exception {
-    String path = "/tmp/foo/*";
-    TestFileBasedSource source = new TestFileBasedSource(path, 1, 0, 10, null);
-    assertEquals(String.format("%s range [0, 10)", path), source.toString());
+    File f = createFileWithData("foo", Collections.<String>emptyList());
+    Metadata metadata = FileSystems.matchSingleFileSpec(f.getPath());
+    TestFileBasedSource source = new TestFileBasedSource(metadata, 1, 0, 10, null);
+    assertEquals(String.format("%s range [0, 10)", f.getAbsolutePath()), source.toString());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java
index d8c345c..d42f8ed 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java
@@ -33,6 +33,8 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.Nullable;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.util.AvroUtils.AvroMetadata;
 import org.junit.Rule;
 import org.junit.Test;
@@ -83,7 +85,9 @@ public class AvroUtilsTest {
     for (String codec : codecs) {
       String filename = generateTestFile(
           codec, expected, AvroCoder.of(Bird.class), codec);
-      AvroMetadata metadata = AvroUtils.readMetadataFromFile(filename);
+
+      Metadata fileMeta = FileSystems.matchSingleFileSpec(filename);
+      AvroMetadata metadata = AvroUtils.readMetadataFromFile(fileMeta.resourceId());
       assertEquals(codec, metadata.getCodec());
     }
   }
@@ -94,7 +98,8 @@ public class AvroUtilsTest {
     String codec = DataFileConstants.NULL_CODEC;
     String filename = generateTestFile(
         codec, expected, AvroCoder.of(Bird.class), codec);
-    AvroMetadata metadata = AvroUtils.readMetadataFromFile(filename);
+    Metadata fileMeta = FileSystems.matchSingleFileSpec(filename);
+    AvroMetadata metadata = AvroUtils.readMetadataFromFile(fileMeta.resourceId());
     // By default, parse validates the schema, which is what we want.
     Schema schema = new Schema.Parser().parse(metadata.getSchemaString());
     assertEquals(8, schema.getFields().size());

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 1c853bb..d0dfd3e 100644
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -636,9 +636,8 @@ public class GcsUtil {
     return batches;
   }
 
-  public void copy(Iterable<String> srcFilenames,
-                   Iterable<String> destFilenames) throws
-      IOException {
+  public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames)
+      throws IOException {
     executeBatches(makeCopyBatches(srcFilenames, destFilenames));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
index 863b01b..6a71bdc 100644
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/gcsfs/GcsPath.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Strings.isNullOrEmpty;
 import com.google.api.services.storage.model.StorageObject;
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.file.FileSystem;
@@ -68,7 +69,7 @@ import javax.annotation.Nullable;
  * "http://docs.oracle.com/javase/tutorial/essential/io/pathOps.html"
  * >Java Tutorials: Path Operations</a>
  */
-public class GcsPath implements Path {
+public class GcsPath implements Path, Serializable {
 
   public static final String SCHEME = "gs";
 
@@ -176,7 +177,7 @@ public class GcsPath implements Path {
   }
 
   @Nullable
-  private FileSystem fs;
+  private transient FileSystem fs;
   @Nonnull
   private final String bucket;
   @Nonnull

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 3653753..713a9a9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -503,8 +503,7 @@ public class BigtableIOTest {
         null /*filter*/,
         ByteKeyRange.ALL_KEYS,
         null /*size*/);
-    List<BigtableSource> splits =
-        source.split(numRows * bytesPerRow / numSplits, null);
+    List<BigtableSource> splits = source.split(numRows * bytesPerRow / numSplits, null);
 
     // Test num splits and split equality.
     assertThat(splits, hasSize(numSplits));
@@ -529,8 +528,7 @@ public class BigtableIOTest {
         RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*17.*")).build();
     BigtableSource source =
         new BigtableSource(serviceFactory, table, filter, ByteKeyRange.ALL_KEYS, null /*size*/);
-    List<BigtableSource> splits =
-        source.split(numRows * bytesPerRow / numSplits, null);
+    List<BigtableSource> splits = source.split(numRows * bytesPerRow / numSplits, null);
 
     // Test num splits and split equality.
     assertThat(splits, hasSize(numSplits));

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
index da70632..ccde03f 100644
--- a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
@@ -643,7 +643,7 @@ public class HadoopInputFormatIOTest {
    * {@link HadoopInputFormatBoundedSource#createReader(PipelineOptions)}
    * createReader()} method when
    * {@link HadoopInputFormatBoundedSource#split(long, PipelineOptions)}
-   * split()} is not called.
+   * is not called.
    */
   @Test
   public void testCreateReaderIfSplitNotCalled() throws Exception {


[2/3] beam git commit: [BEAM-59] FileBasedSource: convert to FileSystem

Posted by dh...@apache.org.
[BEAM-59] FileBasedSource: convert to FileSystem

* Make ResourceId serializable
* Update all implementations


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/49b8b230
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/49b8b230
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/49b8b230

Branch: refs/heads/master
Commit: 49b8b2302f4cbc67690617dee788356de2337d65
Parents: 4f8b1cc
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 20 15:08:34 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 20 15:08:34 2017 -0700

----------------------------------------------------------------------
 .../UnboundedReadFromBoundedSource.java         |   3 +-
 .../UnboundedReadFromBoundedSourceTest.java     |  14 +-
 .../translation/wrappers/SourceInputFormat.java |   3 +-
 .../beam/runners/spark/io/MicrobatchSource.java |   4 +-
 .../java/org/apache/beam/sdk/io/AvroSource.java |  43 +--
 .../apache/beam/sdk/io/BlockBasedSource.java    |  10 +-
 .../apache/beam/sdk/io/CompressedSource.java    |  26 +-
 .../org/apache/beam/sdk/io/FileBasedSource.java | 355 +++++++------------
 .../org/apache/beam/sdk/io/FileSystems.java     |  32 +-
 .../org/apache/beam/sdk/io/LocalResourceId.java |  29 +-
 .../java/org/apache/beam/sdk/io/TFRecordIO.java |  17 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     |  21 +-
 .../java/org/apache/beam/sdk/io/XmlSource.java  |  12 +-
 .../org/apache/beam/sdk/io/fs/MatchResult.java  |   3 +-
 .../org/apache/beam/sdk/io/fs/ResourceId.java   |   3 +-
 .../org/apache/beam/sdk/util/AvroUtils.java     |  24 +-
 .../org/apache/beam/sdk/io/AvroSourceTest.java  |  15 +-
 .../beam/sdk/io/CompressedSourceTest.java       |  12 +-
 .../apache/beam/sdk/io/FileBasedSourceTest.java | 192 +++-------
 .../org/apache/beam/sdk/util/AvroUtilsTest.java |   9 +-
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |   5 +-
 .../org/apache/beam/sdk/util/gcsfs/GcsPath.java |   5 +-
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     |   6 +-
 .../inputformat/HadoopInputFormatIOTest.java    |   2 +-
 24 files changed, 349 insertions(+), 496 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index f67af8a..6b99522 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -61,8 +61,7 @@ import org.slf4j.LoggerFactory;
 /**
  * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
  *
- * <p>{@link BoundedSource} is read directly without calling
- * {@link BoundedSource#split},
+ * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#split},
  * and element timestamps are propagated. While any elements remain, the watermark is the beginning
  * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
  * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
index c905cf5..0e48a9d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java
@@ -44,8 +44,10 @@ import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.FileBasedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -299,21 +301,21 @@ public class UnboundedReadFromBoundedSourceTest {
    */
   private static class UnsplittableSource extends FileBasedSource<Byte> {
     public UnsplittableSource(String fileOrPatternSpec, long minBundleSize) {
-      super(fileOrPatternSpec, minBundleSize);
+      super(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
     }
 
     public UnsplittableSource(
-        String fileName, long minBundleSize, long startOffset, long endOffset) {
-      super(fileName, minBundleSize, startOffset, endOffset);
+        Metadata metadata, long minBundleSize, long startOffset, long endOffset) {
+      super(metadata, minBundleSize, startOffset, endOffset);
     }
 
     @Override
-    protected FileBasedSource<Byte> createForSubrangeOfFile(String fileName, long start, long end) {
-      return new UnsplittableSource(fileName, getMinBundleSize(), start, end);
+    protected UnsplittableSource createForSubrangeOfFile(Metadata metadata, long start, long end) {
+      return new UnsplittableSource(metadata, getMinBundleSize(), start, end);
     }
 
     @Override
-    protected FileBasedReader<Byte> createSingleFileReader(PipelineOptions options) {
+    protected UnsplittableReader createSingleFileReader(PipelineOptions options) {
       return new UnsplittableReader(this);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
index a87472b..12be8eb 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -100,8 +100,7 @@ public class SourceInputFormat<T>
   public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException {
     try {
       long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
-      List<? extends Source<T>> shards =
-          initialSource.split(desiredSizeBytes, options);
+      List<? extends Source<T>> shards = initialSource.split(desiredSizeBytes, options);
       int numShards = shards.size();
       SourceInputSplit<T>[] sourceInputSplits = new SourceInputSplit[numShards];
       for (int i = 0; i < numShards; i++) {

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 7c07920..fde5f9a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -102,8 +102,8 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
   }
 
   @Override
-  public List<? extends BoundedSource<T>> split(long desiredBundleSizeBytes,
-                       PipelineOptions options) throws Exception {
+  public List<? extends BoundedSource<T>> split(
+      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
     List<MicrobatchSource<T, CheckpointMarkT>> result = new ArrayList<>();
     List<? extends UnboundedSource<T, CheckpointMarkT>> splits =
         source.split(numInitialSplits, options);

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 0c52dea..5e0900a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
@@ -34,7 +33,6 @@ import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SeekableByteChannel;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Map;
 import java.util.WeakHashMap;
 import java.util.zip.Inflater;
@@ -52,6 +50,7 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.util.AvroUtils;
@@ -84,15 +83,6 @@ import org.apache.commons.compress.utils.CountingInputStream;
  * }
  * </pre>
  *
- * <p>The {@link AvroSource#readFromFileWithClass(String, Class)} method is a convenience method
- * that returns a read transform. For example:
- *
- * <pre>
- * {@code
- * PCollection<MyType> records = AvroSource.readFromFileWithClass(file.toPath(), MyType.class));
- * }
- * </pre>
- *
  * <p>This class's implementation is based on the <a
  * href="https://avro.apache.org/docs/1.7.7/spec.html">Avro 1.7.7</a> specification and implements
  * parsing of some parts of Avro Object Container Files. The rationale for doing so is that the Avro
@@ -165,15 +155,6 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   private transient Schema readSchema;
 
   /**
-   * Creates a {@link Read} transform that will read from an {@link AvroSource} that is configured
-   * to read records of the given type from a file pattern.
-   */
-  public static <T> Read.Bounded<T> readFromFileWithClass(String filePattern, Class<T> clazz) {
-    return Read.from(new AvroSource<>(filePattern, DEFAULT_MIN_BUNDLE_SIZE,
-        ReflectData.get().getSchema(clazz).toString(), clazz, null, null));
-  }
-
-  /**
    * Creates an {@link AvroSource} that reads from the given file name or pattern ("glob"). The
    * returned source can be further configured by calling {@link #withSchema} to return a type other
    * than {@link GenericRecord}.
@@ -237,9 +218,9 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     this.fileSchemaString = null;
   }
 
-  private AvroSource(String fileName, long minBundleSize, long startOffset, long endOffset,
+  private AvroSource(Metadata metadata, long minBundleSize, long startOffset, long endOffset,
       String schema, Class<T> type, String codec, byte[] syncMarker, String fileSchema) {
-    super(fileName, minBundleSize, startOffset, endOffset);
+    super(metadata, minBundleSize, startOffset, endOffset);
     this.readSchemaString = internSchemaString(schema);
     this.codec = codec;
     this.syncMarker = syncMarker;
@@ -254,8 +235,14 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     super.validate();
   }
 
+  @Deprecated // Added to let DataflowRunner migrate off of this; to be deleted.
+  public BlockBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end)
+      throws IOException {
+    return createForSubrangeOfFile(FileSystems.matchSingleFileSpec(fileName), start, end);
+  }
+
   @Override
-  public BlockBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
+  public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, long start, long end) {
     byte[] syncMarker = this.syncMarker;
     String codec = this.codec;
     String readSchemaString = this.readSchemaString;
@@ -267,11 +254,9 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     if (codec == null || syncMarker == null || fileSchemaString == null) {
       AvroMetadata metadata;
       try {
-        Collection<String> files = FileBasedSource.expandFilePattern(fileName);
-        checkArgument(files.size() <= 1, "More than 1 file matched %s");
-        metadata = AvroUtils.readMetadataFromFile(fileName);
+        metadata = AvroUtils.readMetadataFromFile(fileMetadata.resourceId());
       } catch (IOException e) {
-        throw new RuntimeException("Error reading metadata from file " + fileName, e);
+        throw new RuntimeException("Error reading metadata from file " + fileMetadata, e);
       }
       codec = metadata.getCodec();
       syncMarker = metadata.getSyncMarker();
@@ -287,7 +272,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     // readSchemaString. This allows for Java to have an efficient serialization since it
     // will only encode the schema once while just storing pointers to the encoded version
     // within this source.
-    return new AvroSource<>(fileName, getMinBundleSize(), start, end, readSchemaString, type,
+    return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, readSchemaString, type,
         codec, syncMarker, fileSchemaString);
   }
 
@@ -389,7 +374,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     switch (getMode()) {
       case SINGLE_FILE_OR_SUBRANGE:
         return new AvroSource<>(
-            getFileOrPatternSpec(),
+            getSingleFileMetadata(),
             getMinBundleSize(),
             getStartOffset(),
             getEndOffset(),

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
index 83336ff..cf6671e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
@@ -21,7 +21,9 @@ import java.io.IOException;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 
 /**
  * A {@code BlockBasedSource} is a {@link FileBasedSource} where a file consists of blocks of
@@ -64,7 +66,7 @@ public abstract class BlockBasedSource<T> extends FileBasedSource<T> {
    * {@link FileBasedSource} for more information.
    */
   public BlockBasedSource(String fileOrPatternSpec, long minBundleSize) {
-    super(fileOrPatternSpec, minBundleSize);
+    super(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
   }
 
   /**
@@ -72,8 +74,8 @@ public abstract class BlockBasedSource<T> extends FileBasedSource<T> {
    * when implementing {@link BlockBasedSource#createForSubrangeOfFile}. See documentation in
    * {@link FileBasedSource}.
    */
-  public BlockBasedSource(String fileName, long minBundleSize, long startOffset, long endOffset) {
-    super(fileName, minBundleSize, startOffset, endOffset);
+  public BlockBasedSource(Metadata metadata, long minBundleSize, long startOffset, long endOffset) {
+    super(metadata, minBundleSize, startOffset, endOffset);
   }
 
   /**
@@ -81,7 +83,7 @@ public abstract class BlockBasedSource<T> extends FileBasedSource<T> {
    */
   @Override
   protected abstract BlockBasedSource<T> createForSubrangeOfFile(
-      String fileName, long start, long end);
+      Metadata metadata, long start, long end);
 
   /**
    * Creates a {@code BlockBasedReader}.

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index 1d940cb..f2fc37b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -36,6 +36,7 @@ import java.util.zip.ZipInputStream;
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
@@ -287,16 +288,6 @@ public class CompressedSource<T> extends FileBasedSource<T> {
   private final DecompressingChannelFactory channelFactory;
 
   /**
-   * Creates a {@link Read} transform that reads from that reads from the underlying
-   * {@link FileBasedSource} {@code sourceDelegate} after decompressing it with a {@link
-   * DecompressingChannelFactory}.
-   */
-  public static <T> Read.Bounded<T> readFromSource(
-      FileBasedSource<T> sourceDelegate, DecompressingChannelFactory channelFactory) {
-    return Read.from(new CompressedSource<>(sourceDelegate, channelFactory));
-  }
-
-  /**
    * Creates a {@code CompressedSource} from an underlying {@code FileBasedSource}. The type
    * of compression used will be based on the file name extension unless explicitly
    * configured via {@link CompressedSource#withDecompression}.
@@ -329,12 +320,12 @@ public class CompressedSource<T> extends FileBasedSource<T> {
    * CompressedSource#createForSubrangeOfFile}.
    */
   private CompressedSource(FileBasedSource<T> sourceDelegate,
-      DecompressingChannelFactory channelFactory, String filePatternOrSpec, long minBundleSize,
+      DecompressingChannelFactory channelFactory, Metadata metadata, long minBundleSize,
       long startOffset, long endOffset) {
-    super(filePatternOrSpec, minBundleSize, startOffset, endOffset);
+    super(metadata, minBundleSize, startOffset, endOffset);
     this.sourceDelegate = sourceDelegate;
     this.channelFactory = channelFactory;
-    boolean splittable = false;
+    boolean splittable;
     try {
       splittable = isSplittable();
     } catch (Exception e) {
@@ -342,7 +333,8 @@ public class CompressedSource<T> extends FileBasedSource<T> {
     }
     checkArgument(
         splittable || startOffset == 0,
-        "CompressedSources must start reading at offset 0. Requested offset: " + startOffset);
+        "CompressedSources must start reading at offset 0. Requested offset: %s",
+        startOffset);
   }
 
   /**
@@ -361,9 +353,9 @@ public class CompressedSource<T> extends FileBasedSource<T> {
    * source for a single file.
    */
   @Override
-  protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
-    return new CompressedSource<>(sourceDelegate.createForSubrangeOfFile(fileName, start, end),
-        channelFactory, fileName, sourceDelegate.getMinBundleSize(), start, end);
+  protected FileBasedSource<T> createForSubrangeOfFile(Metadata metadata, long start, long end) {
+    return new CompressedSource<>(sourceDelegate.createForSubrangeOfFile(metadata, start, end),
+        channelFactory, metadata, sourceDelegate.getMinBundleSize(), start, end);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index 95e6078..b2a4075 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -18,32 +18,28 @@
 package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SeekableByteChannel;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,9 +52,9 @@ import org.slf4j.LoggerFactory;
  * glob, a single file, or a offset range for a single file. See {@link OffsetBasedSource} and
  * {@link org.apache.beam.sdk.io.range.RangeTracker} for semantics of offset ranges.
  *
- * <p>This source stores a {@code String} that is an {@link IOChannelFactory} specification for a
- * file or file pattern. There should be an {@code IOChannelFactory} defined for the file
- * specification provided. Please refer to {@link IOChannelUtils} and {@link IOChannelFactory} for
+ * <p>This source stores a {@code String} that is a {@link FileSystems} specification for a
+ * file or file pattern. There should be a {@link FileSystem} registered for the file
+ * specification provided. Please refer to {@link FileSystems} and {@link FileSystem} for
  * more information on this.
  *
  * <p>In addition to the methods left abstract from {@code BoundedSource}, subclasses must implement
@@ -70,16 +66,9 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
   private static final Logger LOG = LoggerFactory.getLogger(FileBasedSource.class);
-  private static final float FRACTION_OF_FILES_TO_STAT = 0.01f;
-
-  // Package-private for testing
-  static final int MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT = 100;
-
-  // Size of the thread pool to be used for performing file operations in parallel.
-  // Package-private for testing.
-  static final int THREAD_POOL_SIZE = 128;
 
   private final ValueProvider<String> fileOrPatternSpec;
+  @Nullable private MatchResult.Metadata singleFileMetadata;
   private final Mode mode;
 
   /**
@@ -91,25 +80,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
   }
 
   /**
-   * Create a {@code FileBaseSource} based on a file or a file pattern specification. This
-   * constructor must be used when creating a new {@code FileBasedSource} for a file pattern.
-   *
-   * <p>See {@link OffsetBasedSource} for a detailed description of {@code minBundleSize}.
-   *
-   * @param fileOrPatternSpec {@link IOChannelFactory} specification of file or file pattern
-   *        represented by the {@link FileBasedSource}.
-   * @param minBundleSize minimum bundle size in bytes.
-   */
-  public FileBasedSource(String fileOrPatternSpec, long minBundleSize) {
-    this(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
-  }
-
-  /**
    * Create a {@code FileBaseSource} based on a file or a file pattern specification.
-   * Same as the {@code String} constructor, but accepting a {@link ValueProvider}
-   * to allow for runtime configuration of the source.
    */
-  public FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
+  protected FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
     super(0, Long.MAX_VALUE, minBundleSize);
     mode = Mode.FILEPATTERN;
     this.fileOrPatternSpec = fileOrPatternSpec;
@@ -124,18 +97,38 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
    * <p>See {@link OffsetBasedSource} for detailed descriptions of {@code minBundleSize},
    * {@code startOffset}, and {@code endOffset}.
    *
-   * @param fileName {@link IOChannelFactory} specification of the file represented by the
-   *        {@link FileBasedSource}.
+   * @param fileMetadata specification of the file represented by the {@link FileBasedSource}, in
+   *        suitable form for use with {@link FileSystems#match(List)}.
    * @param minBundleSize minimum bundle size in bytes.
    * @param startOffset starting byte offset.
    * @param endOffset ending byte offset. If the specified value {@code >= #getMaxEndOffset()} it
    *        implies {@code #getMaxEndOffSet()}.
    */
-  public FileBasedSource(String fileName, long minBundleSize,
-      long startOffset, long endOffset) {
+  protected FileBasedSource(
+      Metadata fileMetadata, long minBundleSize, long startOffset, long endOffset) {
     super(startOffset, endOffset, minBundleSize);
     mode = Mode.SINGLE_FILE_OR_SUBRANGE;
-    this.fileOrPatternSpec = StaticValueProvider.of(fileName);
+    this.singleFileMetadata = checkNotNull(fileMetadata, "fileMetadata");
+    this.fileOrPatternSpec = StaticValueProvider.of(fileMetadata.resourceId().toString());
+  }
+
+  /**
+   * Returns the information about the single file that this source is reading from.
+   *
+   * @throws IllegalArgumentException if this source is in {@link Mode#FILEPATTERN} mode.
+   */
+  protected final MatchResult.Metadata getSingleFileMetadata() {
+    checkArgument(
+        mode == Mode.SINGLE_FILE_OR_SUBRANGE,
+        "This function should only be called for a single file, not %s",
+        this);
+    checkState(
+        singleFileMetadata != null,
+        "It should not be possible to construct a %s in mode %s with null metadata: %s",
+        FileBasedSource.class,
+        mode,
+        this);
+    return singleFileMetadata;
   }
 
   public final String getFileOrPatternSpec() {
@@ -163,10 +156,12 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
         "End offset value %s of the subrange cannot be larger than the end offset value %s",
         end,
         getEndOffset());
+    checkState(
+        singleFileMetadata != null,
+        "A single file source should not have null metadata: %s",
+        this);
 
-    checkState(fileOrPatternSpec.isAccessible(),
-               "Subrange creation should only happen at execution time.");
-    FileBasedSource<T> source = createForSubrangeOfFile(fileOrPatternSpec.get(), start, end);
+    FileBasedSource<T> source = createForSubrangeOfFile(singleFileMetadata, start, end);
     if (start > 0 || end != Long.MAX_VALUE) {
       checkArgument(source.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE,
           "Source created for the range [%s,%s) must be a subrange source", start, end);
@@ -178,54 +173,51 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
    * Creates and returns a new {@code FileBasedSource} of the same type as the current
    * {@code FileBasedSource} backed by a given file and an offset range. When current source is
    * being split, this method is used to generate new sub-sources. When creating the source
-   * subclasses must call the constructor {@link #FileBasedSource(String, long, long, long)} of
+   * subclasses must call the constructor {@link #FileBasedSource(Metadata, long, long, long)} of
    * {@code FileBasedSource} with corresponding parameter values passed here.
    *
-   * @param fileName file backing the new {@code FileBasedSource}.
+   * @param fileMetadata file backing the new {@code FileBasedSource}.
    * @param start starting byte offset of the new {@code FileBasedSource}.
    * @param end ending byte offset of the new {@code FileBasedSource}. May be Long.MAX_VALUE,
    *        in which case it will be inferred using {@link #getMaxEndOffset}.
    */
   protected abstract FileBasedSource<T> createForSubrangeOfFile(
-      String fileName, long start, long end);
+      Metadata fileMetadata, long start, long end);
 
   /**
    * Creates and returns an instance of a {@code FileBasedReader} implementation for the current
    * source assuming the source represents a single file. File patterns will be handled by
    * {@code FileBasedSource} implementation automatically.
    */
-  protected abstract FileBasedReader<T> createSingleFileReader(
-      PipelineOptions options);
+  protected abstract FileBasedReader<T> createSingleFileReader(PipelineOptions options);
 
   @Override
   public final long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
     // This implementation of method getEstimatedSizeBytes is provided to simplify subclasses. Here
     // we perform the size estimation of files and file patterns using the interface provided by
-    // IOChannelFactory.
+    // FileSystem.
 
     if (mode == Mode.FILEPATTERN) {
       checkState(fileOrPatternSpec.isAccessible(),
                  "Size estimation should be done at execution time.");
-      IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get());
-      // TODO Implement a more efficient parallel/batch size estimation mechanism for file patterns.
-      long startTime = System.currentTimeMillis();
+      String pattern = fileOrPatternSpec.get();
       long totalSize = 0;
-      Collection<String> inputs = factory.match(fileOrPatternSpec.get());
-      if (inputs.size() <= MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT) {
-        totalSize = getExactTotalSizeOfFiles(inputs, factory);
-        LOG.debug("Size estimation of all files of pattern {} took {} ms",
-            fileOrPatternSpec,
-            System.currentTimeMillis() - startTime);
-      } else {
-        totalSize = getEstimatedSizeOfFilesBySampling(inputs, factory);
-        LOG.debug("Size estimation of pattern {} by sampling took {} ms",
-            fileOrPatternSpec,
-            System.currentTimeMillis() - startTime);
+      List<MatchResult> inputs =
+          FileSystems.match(Collections.singletonList(pattern));
+      MatchResult result = Iterables.getOnlyElement(inputs);
+      checkArgument(
+          result.status() == Status.OK,
+          "Error matching the pattern or glob %s: status %s",
+          pattern,
+          result.status());
+      Metadata[] allMatches = result.metadata();
+      for (Metadata metadata : allMatches) {
+        totalSize += metadata.sizeBytes();
       }
       LOG.info(
           "Filepattern {} matched {} files with total size {}",
           fileOrPatternSpec.get(),
-          inputs.size(),
+          allMatches.length,
           totalSize);
       return totalSize;
     } else {
@@ -235,98 +227,15 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     }
   }
 
-  // Get the exact total size of the given set of files.
-  // Invokes multiple requests for size estimation in parallel using a thread pool.
-  // TODO: replace this with bulk request API when it is available. Will require updates
-  // to IOChannelFactory interface.
-  private static long getExactTotalSizeOfFiles(
-      Collection<String> files, IOChannelFactory ioChannelFactory) throws IOException {
-    List<ListenableFuture<Long>> futures = new ArrayList<>();
-    ListeningExecutorService service =
-        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE));
-    try {
-      long totalSize = 0;
-      for (String file : files) {
-        futures.add(createFutureForSizeEstimation(file, ioChannelFactory, service));
-      }
-
-      for (Long val : Futures.allAsList(futures).get()) {
-        totalSize += val;
-      }
-
-      return totalSize;
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IOException(e);
-    } catch (ExecutionException e) {
-      throw new IOException(e.getCause());
-    }  finally {
-      service.shutdown();
-    }
-  }
-
-  private static ListenableFuture<Long> createFutureForSizeEstimation(
-      final String file,
-      final IOChannelFactory ioChannelFactory,
-      ListeningExecutorService service) {
-    return service.submit(
-        new Callable<Long>() {
-          @Override
-          public Long call() throws IOException {
-            return ioChannelFactory.getSizeBytes(file);
-          }
-        });
-  }
-
-  // Estimate the total size of the given set of files through sampling and extrapolation.
-  // Currently we use uniform sampling which requires a linear sampling size for a reasonable
-  // estimate.
-  // TODO: Implement a more efficient sampling mechanism.
-  private static long getEstimatedSizeOfFilesBySampling(
-      Collection<String> files, IOChannelFactory ioChannelFactory) throws IOException {
-    int sampleSize = (int) (FRACTION_OF_FILES_TO_STAT * files.size());
-    sampleSize = Math.max(MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT, sampleSize);
-
-    List<String> selectedFiles = new ArrayList<String>(files);
-    Collections.shuffle(selectedFiles);
-    selectedFiles = selectedFiles.subList(0, sampleSize);
-
-    long exactTotalSampleSize = getExactTotalSizeOfFiles(selectedFiles, ioChannelFactory);
-    double avgSize = 1.0 * exactTotalSampleSize / selectedFiles.size();
-    long totalSize = Math.round(files.size() * avgSize);
-    LOG.info(
-        "Sampling {} files gave {} total bytes ({} average per file), "
-            + "inferring total size of {} files to be {}",
-        selectedFiles.size(),
-        exactTotalSampleSize,
-        avgSize,
-        files.size(),
-        totalSize);
-    return totalSize;
-  }
-
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
-    String patternDisplay = getFileOrPatternSpecProvider().isAccessible()
-      ? getFileOrPatternSpecProvider().get()
-      : getFileOrPatternSpecProvider().toString();
-    builder.add(DisplayData.item("filePattern", patternDisplay)
-      .withLabel("File Pattern"));
-  }
-
-  private ListenableFuture<List<? extends FileBasedSource<T>>> createFutureForFileSplit(
-      final String file,
-      final long desiredBundleSizeBytes,
-      final PipelineOptions options,
-      ListeningExecutorService service) {
-    return service.submit(new Callable<List<? extends FileBasedSource<T>>>() {
-      @Override
-      public List<? extends FileBasedSource<T>> call() throws Exception {
-        return createForSubrangeOfFile(file, 0, Long.MAX_VALUE)
-            .split(desiredBundleSizeBytes, options);
-      }
-    });
+    if (mode == Mode.FILEPATTERN) {
+      String patternDisplay = getFileOrPatternSpecProvider().isAccessible()
+          ? getFileOrPatternSpecProvider().get()
+          : getFileOrPatternSpecProvider().toString();
+      builder.add(DisplayData.item("filePattern", patternDisplay).withLabel("File Pattern"));
+    }
   }
 
   @Override
@@ -339,42 +248,38 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
 
     if (mode == Mode.FILEPATTERN) {
       long startTime = System.currentTimeMillis();
-      List<ListenableFuture<List<? extends FileBasedSource<T>>>> futures = new ArrayList<>();
-
-      ListeningExecutorService service =
-          MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE));
-      try {
-        checkState(fileOrPatternSpec.isAccessible(),
-                   "Bundle splitting should only happen at execution time.");
-        Collection<String> expandedFiles =
-            FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
-        checkArgument(!expandedFiles.isEmpty(),
-            "Unable to find any files matching %s", fileOrPatternSpec.get());
-        for (final String file : expandedFiles) {
-          futures.add(createFutureForFileSplit(file, desiredBundleSizeBytes, options, service));
-        }
-        List<? extends FileBasedSource<T>> splitResults =
-            ImmutableList.copyOf(Iterables.concat(Futures.allAsList(futures).get()));
-        LOG.info(
-            "Splitting filepattern {} into bundles of size {} took {} ms "
-                + "and produced {} files and {} bundles",
-            fileOrPatternSpec,
-            desiredBundleSizeBytes,
-            System.currentTimeMillis() - startTime,
-            expandedFiles.size(),
-            splitResults.size());
-        return splitResults;
-      } finally {
-        service.shutdown();
+      checkState(fileOrPatternSpec.isAccessible(),
+                 "Bundle splitting should only happen at execution time.");
+      List<Metadata> expandedFiles = FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
+      checkArgument(!expandedFiles.isEmpty(),
+          "Unable to find any files matching %s", fileOrPatternSpec.get());
+      List<FileBasedSource<T>> splitResults = new ArrayList<>(expandedFiles.size());
+      for (Metadata metadata : expandedFiles) {
+        FileBasedSource<T> split = createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
+        verify(split.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE,
+            "%s.createForSubrangeOfFile must return a source in mode %s",
+            split,
+            Mode.SINGLE_FILE_OR_SUBRANGE);
+        // The split is NOT in FILEPATTERN mode, so we can call its split without fear
+        // of recursion. This will break a single file into multiple splits when the file is
+        // splittable and larger than the desired bundle size.
+        splitResults.addAll(split.split(desiredBundleSizeBytes, options));
       }
+      LOG.info(
+          "Splitting filepattern {} into bundles of size {} took {} ms "
+              + "and produced {} files and {} bundles",
+          fileOrPatternSpec,
+          desiredBundleSizeBytes,
+          System.currentTimeMillis() - startTime,
+          expandedFiles.size(),
+          splitResults.size());
+      return splitResults;
     } else {
       if (isSplittable()) {
-        List<FileBasedSource<T>> splitResults = new ArrayList<>();
-        for (OffsetBasedSource<T> split :
-            super.split(desiredBundleSizeBytes, options)) {
-          splitResults.add((FileBasedSource<T>) split);
-        }
-        return splitResults;
+        @SuppressWarnings("unchecked")
+        List<FileBasedSource<T>> splits =
+            (List<FileBasedSource<T>>) super.split(desiredBundleSizeBytes, options);
+        return splits;
       } else {
         LOG.debug("The source for file {} is not split into sub-range based sources since "
             + "the file is not seekable",
@@ -387,17 +292,23 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
   /**
    * Determines whether a file represented by this source is can be split into bundles.
    *
-   * <p>By default, a file is splittable if it is on a file system that supports efficient read
-   * seeking. Subclasses may override to provide different behavior.
+   * <p>By default, a source in mode {@link Mode#FILEPATTERN} is always splittable, because
+   * splitting will involve expanding the file pattern and producing single-file/subrange sources,
+   * which may or may not be splittable themselves.
+   *
+   * <p>By default, a source in {@link Mode#SINGLE_FILE_OR_SUBRANGE} is splittable if it is on a
+   * file system that supports efficient read seeking.
+   *
+   * <p>Subclasses may override to provide different behavior.
    */
   protected boolean isSplittable() throws Exception {
-    // We split a file-based source into subranges only if the file is efficiently seekable.
-    // If a file is not efficiently seekable it would be highly inefficient to create and read a
-    // source based on a subrange of that file.
-    checkState(fileOrPatternSpec.isAccessible(),
-        "isSplittable should only be called at runtime.");
-    IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get());
-    return factory.isReadSeekEfficient(fileOrPatternSpec.get());
+    if (mode == Mode.FILEPATTERN) {
+      // split will expand file pattern and return single file or subrange sources that
+      // may or may not be splittable.
+      return true;
+    }
+
+    return getSingleFileMetadata().isReadSeekEfficient();
   }
 
   @Override
@@ -407,18 +318,12 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
 
     if (mode == Mode.FILEPATTERN) {
       long startTime = System.currentTimeMillis();
-      Collection<String> files = FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
+      List<Metadata> fileMetadata = FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
       List<FileBasedReader<T>> fileReaders = new ArrayList<>();
-      for (String fileName : files) {
-        long endOffset;
-        try {
-          endOffset = IOChannelUtils.getFactory(fileName).getSizeBytes(fileName);
-        } catch (IOException e) {
-          LOG.warn("Failed to get size of {}", fileName, e);
-          endOffset = Long.MAX_VALUE;
-        }
+      for (Metadata metadata : fileMetadata) {
+        long endOffset = metadata.sizeBytes();
         fileReaders.add(
-            createForSubrangeOfFile(fileName, 0, endOffset).createSingleFileReader(options));
+            createForSubrangeOfFile(metadata, 0, endOffset).createSingleFileReader(options));
       }
       LOG.debug(
           "Creating a reader for file pattern {} took {} ms",
@@ -471,20 +376,15 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
   public final long getMaxEndOffset(PipelineOptions options) throws IOException {
     checkArgument(
             mode != Mode.FILEPATTERN, "Cannot determine the exact end offset of a file pattern");
-    if (getEndOffset() == Long.MAX_VALUE) {
-      IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get());
-      return factory.getSizeBytes(fileOrPatternSpec.get());
-    } else {
-      return getEndOffset();
-    }
+    Metadata metadata = getSingleFileMetadata();
+    return metadata.sizeBytes();
   }
 
-  protected static final Collection<String> expandFilePattern(String fileOrPatternSpec)
-      throws IOException {
-    IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
-    Collection<String> matches = factory.match(fileOrPatternSpec);
-    LOG.info("Matched {} files for pattern {}", matches.size(), fileOrPatternSpec);
-    return matches;
+  private static List<Metadata> expandFilePattern(String fileOrPatternSpec) throws IOException {
+    MatchResult matches =
+        Iterables.getOnlyElement(FileSystems.match(Collections.singletonList(fileOrPatternSpec)));
+    LOG.info("Matched {} files for pattern {}", matches.metadata().length, fileOrPatternSpec);
+    return ImmutableList.copyOf(matches.metadata());
   }
 
   /**
@@ -545,10 +445,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     @Override
     protected final boolean startImpl() throws IOException {
       FileBasedSource<T> source = getCurrentSource();
-      IOChannelFactory factory = IOChannelUtils.getFactory(
-        source.getFileOrPatternSpecProvider().get());
-      this.channel = factory.open(source.getFileOrPatternSpecProvider().get());
-
+      this.channel = FileSystems.open(source.getSingleFileMetadata().resourceId());
       if (channel instanceof SeekableByteChannel) {
         SeekableByteChannel seekChannel = (SeekableByteChannel) channel;
         seekChannel.position(source.getStartOffset());
@@ -585,6 +482,16 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
       }
     }
 
+    @Override
+    public boolean allowsDynamicSplitting() {
+      try {
+        return getCurrentSource().isSplittable();
+      } catch (Exception e) {
+        throw new RuntimeException(
+            String.format("Error determining if %s allows dynamic splitting", this), e);
+      }
+    }
+
     /**
      * Performs any initialization of the subclass of {@code FileBasedReader} that involves IO
      * operations. Will only be invoked once and before that invocation the base class will seek the

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index 96306dc..aa247c3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -31,12 +31,13 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import com.google.common.collect.TreeMultimap;
-
 import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -46,7 +47,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nonnull;
-
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
@@ -111,6 +111,34 @@ public class FileSystems {
   }
 
   /**
+   * Returns the {@link Metadata} for a single file resource. Expects a resource specification
+   * {@code spec} that matches a single result.
+   *
+   * @param spec a resource specification that matches exactly one result.
+   * @return the {@link Metadata} for the specified resource.
+   * @throws IOException in the event of an error in the inner call to {@link #match},
+   * or if the given spec does not match exactly 1 result.
+   */
+  public static Metadata matchSingleFileSpec(String spec) throws IOException {
+    List<MatchResult> matches = FileSystems.match(Collections.singletonList(spec));
+    MatchResult matchResult = Iterables.getOnlyElement(matches);
+    if (matchResult.status() != Status.OK) {
+      throw new IOException(
+          String.format("Error matching file spec %s: status %s", spec, matchResult.status()));
+    }
+    Metadata[] metadata = matchResult.metadata();
+    if (metadata.length != 1) {
+      throw new IOException(
+        String.format(
+            "Expecting spec %s to match exactly one file, but matched %s: %s",
+            spec,
+            metadata.length,
+            Arrays.toString(metadata)));
+    }
+    return metadata[0];
+  }
+
+  /**
    * Returns {@link MatchResult MatchResults} for the given {@link ResourceId resourceIds}.
    *
    * @param resourceIds {@link ResourceId resourceIds} that might be derived from {@link #match},

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
index 2272a06..091e955 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResourceId.java
@@ -35,7 +35,8 @@ import org.apache.commons.lang3.SystemUtils;
  */
 class LocalResourceId implements ResourceId {
 
-  private final Path path;
+  private final String pathString;
+  private transient volatile Path cachedPath;
   private final boolean isDirectory;
 
   static LocalResourceId fromPath(Path path, boolean isDirectory) {
@@ -44,7 +45,7 @@ class LocalResourceId implements ResourceId {
   }
 
   private LocalResourceId(Path path, boolean isDirectory) {
-    this.path = path.normalize();
+    this.pathString = path.normalize().toString();
     this.isDirectory = isDirectory;
   }
 
@@ -52,11 +53,12 @@ class LocalResourceId implements ResourceId {
   public LocalResourceId resolve(String other, ResolveOptions resolveOptions) {
     checkState(
         isDirectory,
-        String.format("Expected the path is a directory, but had [%s].", path));
+        "Expected the path is a directory, but had [%s].",
+        pathString);
     checkArgument(
         resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)
             || resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY),
-        String.format("ResolveOptions: [%s] is not supported.", resolveOptions));
+        "ResolveOptions: [%s] is not supported.", resolveOptions);
     checkArgument(
         !(resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)
             && other.endsWith("/")),
@@ -73,13 +75,15 @@ class LocalResourceId implements ResourceId {
     if (isDirectory) {
       return this;
     } else {
+      Path path = getPath();
       Path parent = path.getParent();
       if (parent == null && path.getNameCount() == 1) {
         parent = Paths.get(".");
       }
       checkState(
           parent != null,
-          String.format("Failed to get the current directory for path: [%s].", path));
+          "Failed to get the current directory for path: [%s].",
+          pathString);
       return fromPath(
           parent,
           true /* isDirectory */);
@@ -88,13 +92,13 @@ class LocalResourceId implements ResourceId {
 
   private LocalResourceId resolveLocalPath(String other, ResolveOptions resolveOptions) {
     return new LocalResourceId(
-        path.resolve(other),
+        getPath().resolve(other),
         resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY));
   }
 
   private LocalResourceId resolveLocalPathWindowsOS(String other, ResolveOptions resolveOptions) {
     String uuid = UUID.randomUUID().toString();
-    Path pathAsterisksReplaced = Paths.get(path.toString().replaceAll("\\*", uuid));
+    Path pathAsterisksReplaced = Paths.get(pathString.replaceAll("\\*", uuid));
     String otherAsterisksReplaced = other.replaceAll("\\*", uuid);
 
     return new LocalResourceId(
@@ -111,12 +115,15 @@ class LocalResourceId implements ResourceId {
   }
 
   Path getPath() {
-    return path;
+    if (cachedPath == null) {
+      cachedPath = Paths.get(pathString);
+    }
+    return cachedPath;
   }
 
   @Override
   public String toString() {
-    return path.toString();
+    return pathString;
   }
 
   @Override
@@ -125,12 +132,12 @@ class LocalResourceId implements ResourceId {
       return false;
     }
     LocalResourceId other = (LocalResourceId) obj;
-    return this.path.equals(other.path)
+    return this.pathString.equals(other.pathString)
         && this.isDirectory == other.isDirectory;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(path, isDirectory);
+    return Objects.hash(pathString, isDirectory);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 34dbe21..a920283 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read.Bounded;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -444,7 +445,7 @@ public class TFRecordIO {
   static class TFRecordSource extends FileBasedSource<byte[]> {
     @VisibleForTesting
     TFRecordSource(String fileSpec) {
-      super(fileSpec, 1L);
+      super(StaticValueProvider.of(fileSpec), 1L);
     }
 
     @VisibleForTesting
@@ -452,17 +453,17 @@ public class TFRecordIO {
       super(fileSpec, Long.MAX_VALUE);
     }
 
-    private TFRecordSource(String fileName, long start, long end) {
-      super(fileName, Long.MAX_VALUE, start, end);
+    private TFRecordSource(Metadata metadata, long start, long end) {
+      super(metadata, Long.MAX_VALUE, start, end);
     }
 
     @Override
     protected FileBasedSource<byte[]> createForSubrangeOfFile(
-        String fileName,
+        Metadata metadata,
         long start,
         long end) {
       checkArgument(start == 0, "TFRecordSource is not splittable");
-      return new TFRecordSource(fileName, start, end);
+      return new TFRecordSource(metadata, start, end);
     }
 
     @Override
@@ -501,6 +502,12 @@ public class TFRecordIO {
       }
 
       @Override
+      public boolean allowsDynamicSplitting() {
+        /* TFRecords cannot be dynamically split. */
+        return false;
+      }
+
+      @Override
       protected long getCurrentOffset() throws NoSuchElementException {
         if (!elementIsPresent) {
           throw new NoSuchElementException();

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index fbd76df..2dac73a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -24,7 +24,6 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
-
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.nio.ByteBuffer;
@@ -35,15 +34,14 @@ import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.NoSuchElementException;
 import java.util.regex.Pattern;
-
 import javax.annotation.Nullable;
-
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.Read.Bounded;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -111,9 +109,6 @@ import org.apache.beam.sdk.values.PDone;
  * }</pre>
  */
 public class TextIO {
-  /** The default coder, which returns each line of the input file as a string. */
-  public static final Coder<String> DEFAULT_TEXT_CODER = StringUtf8Coder.of();
-
   /**
    * A {@link PTransform} that reads from a text file (or multiple text
    * files matching a pattern) and returns a {@link PCollection} containing
@@ -316,7 +311,7 @@ public class TextIO {
 
       @Override
       protected Coder<String> getDefaultOutputCoder() {
-        return DEFAULT_TEXT_CODER;
+        return StringUtf8Coder.of();
       }
 
       public String getFilepattern() {
@@ -871,7 +866,7 @@ public class TextIO {
     /** The Coder to use to decode each line. */
     @VisibleForTesting
     TextSource(String fileSpec) {
-      super(fileSpec, 1L);
+      super(StaticValueProvider.of(fileSpec), 1L);
     }
 
     @VisibleForTesting
@@ -879,16 +874,16 @@ public class TextIO {
       super(fileSpec, 1L);
     }
 
-    private TextSource(String fileName, long start, long end) {
-      super(fileName, 1L, start, end);
+    private TextSource(Metadata metadata, long start, long end) {
+      super(metadata, 1L, start, end);
     }
 
     @Override
     protected FileBasedSource<String> createForSubrangeOfFile(
-        String fileName,
+        Metadata metadata,
         long start,
         long end) {
-      return new TextSource(fileName, start, end);
+      return new TextSource(metadata, start, end);
     }
 
     @Override
@@ -898,7 +893,7 @@ public class TextIO {
 
     @Override
     public Coder<String> getDefaultOutputCoder() {
-      return DEFAULT_TEXT_CODER;
+      return StringUtf8Coder.of();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
index 7416c85..4b7d3b4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
@@ -42,7 +42,9 @@ import javax.xml.stream.XMLStreamException;
 import javax.xml.stream.XMLStreamReader;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.JAXBCoder;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.codehaus.stax2.XMLInputFactory2;
 
@@ -54,18 +56,18 @@ public class XmlSource<T> extends FileBasedSource<T> {
   private final XmlIO.Read<T> spec;
 
   XmlSource(XmlIO.Read<T> spec) {
-    super(spec.getFileOrPatternSpec(), spec.getMinBundleSize());
+    super(StaticValueProvider.of(spec.getFileOrPatternSpec()), spec.getMinBundleSize());
     this.spec = spec;
   }
 
-  private XmlSource(XmlIO.Read<T> spec, long startOffset, long endOffset) {
-    super(spec.getFileOrPatternSpec(), spec.getMinBundleSize(), startOffset, endOffset);
+  private XmlSource(XmlIO.Read<T> spec, Metadata metadata, long startOffset, long endOffset) {
+    super(metadata, spec.getMinBundleSize(), startOffset, endOffset);
     this.spec = spec;
   }
 
   @Override
-  protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
-    return new XmlSource<T>(spec.from(fileName), start, end);
+  protected FileBasedSource<T> createForSubrangeOfFile(Metadata metadata, long start, long end) {
+    return new XmlSource<T>(spec.from(metadata.toString()), metadata, start, end);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
index 80ee00f..ab11d66 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.fs;
 
 import com.google.auto.value.AutoValue;
 import java.io.IOException;
+import java.io.Serializable;
 
 /**
  * The result of {@link org.apache.beam.sdk.io.FileSystem#match}.
@@ -92,7 +93,7 @@ public abstract class MatchResult {
    * {@link Metadata} of a matched file.
    */
   @AutoValue
-  public abstract static class Metadata {
+  public abstract static class Metadata implements Serializable {
     public abstract ResourceId resourceId();
     public abstract long sizeBytes();
     public abstract boolean isReadSeekEfficient();

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
index 938e24a..b7859ca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.fs;
 
+import java.io.Serializable;
 import org.apache.beam.sdk.io.FileSystem;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 
@@ -28,7 +29,7 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
  *
  * <p>TODO: add examples for how ResourceId is constructed and used.
  */
-public interface ResourceId {
+public interface ResourceId extends Serializable {
 
   /**
    * Returns a child {@code ResourceId} under {@code this}.

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
index d855b34..232f5eb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.util;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
@@ -25,6 +28,8 @@ import java.util.Arrays;
 import org.apache.avro.file.DataFileConstants;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
 
 /**
  * A set of utilities for working with Avro files.
@@ -43,9 +48,9 @@ public class AvroUtils {
     private String schemaString;
 
     AvroMetadata(byte[] syncMarker, String codec, String schemaString) {
-      this.syncMarker = syncMarker;
-      this.codec = codec;
-      this.schemaString = schemaString;
+      this.syncMarker = checkNotNull(syncMarker, "syncMarker");
+      this.codec = checkNotNull(codec, "codec");
+      this.schemaString = checkNotNull(schemaString, "schemaString");
     }
 
     /**
@@ -74,6 +79,11 @@ public class AvroUtils {
     }
   }
 
+  @Deprecated  // to be deleted
+  public static AvroMetadata readMetadataFromFile(String filename) throws IOException {
+    return readMetadataFromFile(FileSystems.matchSingleFileSpec(filename).resourceId());
+  }
+
   /**
    * Reads the {@link AvroMetadata} from the header of an Avro file.
    *
@@ -83,12 +93,11 @@ public class AvroUtils {
    *
    * @throws IOException if the file is an invalid format.
    */
-  public static AvroMetadata readMetadataFromFile(String fileName) throws IOException {
+  public static AvroMetadata readMetadataFromFile(ResourceId fileResource) throws IOException {
     String codec = null;
     String schemaString = null;
     byte[] syncMarker;
-    try (InputStream stream =
-        Channels.newInputStream(IOChannelUtils.getFactory(fileName).open(fileName))) {
+    try (InputStream stream = Channels.newInputStream(FileSystems.open(fileResource))) {
       BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
 
       // The header of an object container file begins with a four-byte magic number, followed
@@ -101,7 +110,7 @@ public class AvroUtils {
       byte[] magic = new byte[DataFileConstants.MAGIC.length];
       decoder.readFixed(magic);
       if (!Arrays.equals(magic, DataFileConstants.MAGIC)) {
-        throw new IOException("Missing Avro file signature: " + fileName);
+        throw new IOException("Missing Avro file signature: " + fileResource);
       }
 
       // Read the metadata to find the codec and schema.
@@ -132,6 +141,7 @@ public class AvroUtils {
       syncMarker = new byte[DataFileConstants.SYNC_SIZE];
       decoder.readFixed(syncMarker);
     }
+    checkState(schemaString != null, "No schema present in Avro file metadata %s", fileResource);
     return new AvroMetadata(syncMarker, codec, schemaString);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index 78485c7..64b0027 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.io.AvroSource.AvroReader;
 import org.apache.beam.sdk.io.AvroSource.AvroReader.Seeker;
 import org.apache.beam.sdk.io.BlockBasedSource.BlockBasedReader;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.SourceTestUtils;
@@ -433,8 +434,9 @@ public class AvroSourceTest {
     List<Bird> birds = createRandomRecords(100);
     String filename = generateTestFile("tmp.avro", birds, SyncBehavior.SYNC_DEFAULT, 0,
         AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
-    String schemaA = AvroUtils.readMetadataFromFile(filename).getSchemaString();
-    String schemaB = AvroUtils.readMetadataFromFile(filename).getSchemaString();
+    Metadata fileMetadata = FileSystems.matchSingleFileSpec(filename);
+    String schemaA = AvroUtils.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
+    String schemaB = AvroUtils.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
     assertNotSame(schemaA, schemaB);
 
     AvroSource<GenericRecord> sourceA = AvroSource.from(filename).withSchema(schemaA);
@@ -451,14 +453,15 @@ public class AvroSourceTest {
     List<Bird> birds = createRandomRecords(100);
     String filename = generateTestFile("tmp.avro", birds, SyncBehavior.SYNC_DEFAULT, 0,
         AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC);
-    String schemaA = AvroUtils.readMetadataFromFile(filename).getSchemaString();
-    String schemaB = AvroUtils.readMetadataFromFile(filename).getSchemaString();
+    Metadata fileMetadata = FileSystems.matchSingleFileSpec(filename);
+    String schemaA = AvroUtils.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
+    String schemaB = AvroUtils.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
     assertNotSame(schemaA, schemaB);
 
     AvroSource<GenericRecord> sourceA = (AvroSource<GenericRecord>) AvroSource.from(filename)
-        .withSchema(schemaA).createForSubrangeOfFile(filename, 0L, 0L);
+        .withSchema(schemaA).createForSubrangeOfFile(fileMetadata, 0L, 0L);
     AvroSource<GenericRecord> sourceB = (AvroSource<GenericRecord>) AvroSource.from(filename)
-        .withSchema(schemaB).createForSubrangeOfFile(filename, 0L, 0L);
+        .withSchema(schemaB).createForSubrangeOfFile(fileMetadata, 0L, 0L);
     assertSame(sourceA.getReadSchema(), sourceA.getFileSchema());
     assertSame(sourceA.getReadSchema(), sourceB.getReadSchema());
     assertSame(sourceA.getReadSchema(), sourceB.getFileSchema());

http://git-wip-us.apache.org/repos/asf/beam/blob/49b8b230/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index 5cf3ada..014e16e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -56,8 +56,10 @@ import org.apache.beam.sdk.io.CompressedSource.CompressedReader;
 import org.apache.beam.sdk.io.CompressedSource.CompressionMode;
 import org.apache.beam.sdk.io.CompressedSource.DecompressingChannelFactory;
 import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
@@ -567,16 +569,16 @@ public class CompressedSourceTest {
    */
   private static class ByteSource extends FileBasedSource<Byte> {
     public ByteSource(String fileOrPatternSpec, long minBundleSize) {
-      super(fileOrPatternSpec, minBundleSize);
+      super(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
     }
 
-    public ByteSource(String fileName, long minBundleSize, long startOffset, long endOffset) {
-      super(fileName, minBundleSize, startOffset, endOffset);
+    public ByteSource(Metadata metadata, long minBundleSize, long startOffset, long endOffset) {
+      super(metadata, minBundleSize, startOffset, endOffset);
     }
 
     @Override
-    protected FileBasedSource<Byte> createForSubrangeOfFile(String fileName, long start, long end) {
-      return new ByteSource(fileName, getMinBundleSize(), start, end);
+    protected ByteSource createForSubrangeOfFile(Metadata metadata, long start, long end) {
+      return new ByteSource(metadata, getMinBundleSize(), start, end);
     }
 
     @Override


[3/3] beam git commit: This closes #2563

Posted by dh...@apache.org.
This closes #2563


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e4491888
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e4491888
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e4491888

Branch: refs/heads/master
Commit: e44918881794ef80c83a6a119ec52861bece28a9
Parents: 4f8b1cc 49b8b23
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 20 15:10:14 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 20 15:10:14 2017 -0700

----------------------------------------------------------------------
 .../UnboundedReadFromBoundedSource.java         |   3 +-
 .../UnboundedReadFromBoundedSourceTest.java     |  14 +-
 .../translation/wrappers/SourceInputFormat.java |   3 +-
 .../beam/runners/spark/io/MicrobatchSource.java |   4 +-
 .../java/org/apache/beam/sdk/io/AvroSource.java |  43 +--
 .../apache/beam/sdk/io/BlockBasedSource.java    |  10 +-
 .../apache/beam/sdk/io/CompressedSource.java    |  26 +-
 .../org/apache/beam/sdk/io/FileBasedSource.java | 355 +++++++------------
 .../org/apache/beam/sdk/io/FileSystems.java     |  32 +-
 .../org/apache/beam/sdk/io/LocalResourceId.java |  29 +-
 .../java/org/apache/beam/sdk/io/TFRecordIO.java |  17 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     |  21 +-
 .../java/org/apache/beam/sdk/io/XmlSource.java  |  12 +-
 .../org/apache/beam/sdk/io/fs/MatchResult.java  |   3 +-
 .../org/apache/beam/sdk/io/fs/ResourceId.java   |   3 +-
 .../org/apache/beam/sdk/util/AvroUtils.java     |  24 +-
 .../org/apache/beam/sdk/io/AvroSourceTest.java  |  15 +-
 .../beam/sdk/io/CompressedSourceTest.java       |  12 +-
 .../apache/beam/sdk/io/FileBasedSourceTest.java | 192 +++-------
 .../org/apache/beam/sdk/util/AvroUtilsTest.java |   9 +-
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |   5 +-
 .../org/apache/beam/sdk/util/gcsfs/GcsPath.java |   5 +-
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     |   6 +-
 .../inputformat/HadoopInputFormatIOTest.java    |   2 +-
 24 files changed, 349 insertions(+), 496 deletions(-)
----------------------------------------------------------------------