You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/05/04 07:17:41 UTC

[35/50] [abbrv] beam git commit: Convert WriteFiles/FileBasedSink from IOChannelFactory to FileSystems

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index ea0395d..a5dacd1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -29,7 +29,6 @@ import static org.junit.Assert.assertThat;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
-
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
@@ -39,12 +38,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-
 import java.util.concurrent.ThreadLocalRandom;
-
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.SimpleSink.SimpleWriter;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -64,7 +64,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -149,7 +148,8 @@ public class WriteFilesTest {
   }
 
   private String getBaseOutputFilename() {
-    return appendToTempFolder("baseoutput");
+    return getBaseOutputDirectory()
+        .resolve("file", StandardResolveOptions.RESOLVE_FILE).toString();
   }
 
   /**
@@ -188,6 +188,15 @@ public class WriteFilesTest {
         Optional.of(1));
   }
 
+  private ResourceId getBaseOutputDirectory() {
+    return LocalResources.fromFile(tmpFolder.getRoot(), true)
+        .resolve("output", StandardResolveOptions.RESOLVE_DIRECTORY);
+
+  }
+  private SimpleSink makeSimpleSink() {
+    return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "simple");
+  }
+
   @Test
   @Category(NeedsRunner.class)
   public void testCustomShardedWrite() throws IOException {
@@ -204,7 +213,7 @@ public class WriteFilesTest {
       timestamps.add(i + 1);
     }
 
-    SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "");
+    SimpleSink sink = makeSimpleSink();
     WriteFiles<String> write = WriteFiles.to(sink).withSharding(new LargestInt());
     p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
         .apply(IDENTITY_MAP)
@@ -270,7 +279,7 @@ public class WriteFilesTest {
 
   @Test
   public void testBuildWrite() {
-    SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "");
+    SimpleSink sink = makeSimpleSink();
     WriteFiles<String> write = WriteFiles.to(sink).withNumShards(3);
     assertThat((SimpleSink) write.getSink(), is(sink));
     PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding =
@@ -293,7 +302,7 @@ public class WriteFilesTest {
 
   @Test
   public void testDisplayData() {
-    SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "") {
+    SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
         builder.add(DisplayData.item("foo", "bar"));
@@ -308,7 +317,7 @@ public class WriteFilesTest {
 
   @Test
   public void testShardedDisplayData() {
-    SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "") {
+    SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
         builder.add(DisplayData.item("foo", "bar"));
@@ -323,7 +332,7 @@ public class WriteFilesTest {
 
   @Test
   public void testCustomShardStrategyDisplayData() {
-    SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "") {
+    SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
         builder.add(DisplayData.item("foo", "bar"));
@@ -354,7 +363,7 @@ public class WriteFilesTest {
    * methods on a test sink in the correct order, as well as verifies that the elements of a
    * PCollection are written to the sink.
    */
-  private static void runWrite(
+  private void runWrite(
       List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform,
       String baseName) throws IOException {
     runShardedWrite(inputs, transform, baseName, Optional.<Integer>absent());
@@ -366,7 +375,7 @@ public class WriteFilesTest {
    * verifies that the elements of a PCollection are written to the sink. If numConfiguredShards
    * is not null, also verifies that the output number of shards is correct.
    */
-  private static void runShardedWrite(
+  private void runShardedWrite(
       List<String> inputs,
       PTransform<PCollection<String>, PCollection<String>> transform,
       String baseName,
@@ -382,7 +391,7 @@ public class WriteFilesTest {
       timestamps.add(i + 1);
     }
 
-    SimpleSink sink = new SimpleSink(baseName, "");
+    SimpleSink sink = makeSimpleSink();
     WriteFiles<String> write = WriteFiles.to(sink);
     if (numConfiguredShards.isPresent()) {
       write = write.withNumShards(numConfiguredShards.get());
@@ -399,8 +408,10 @@ public class WriteFilesTest {
                                 Optional<Integer> numExpectedShards) throws IOException {
     List<File> outputFiles = Lists.newArrayList();
     final String pattern = baseName + "*";
-    for (String outputFileName : IOChannelUtils.getFactory(pattern).match(pattern)) {
-      outputFiles.add(new File(outputFileName));
+    List<Metadata> metadata =
+        FileSystems.match(Collections.singletonList(pattern)).get(0).metadata();
+    for (Metadata meta : metadata) {
+      outputFiles.add(new File(meta.resourceId().toString()));
     }
     if (numExpectedShards.isPresent()) {
       assertEquals(numExpectedShards.get().intValue(), outputFiles.size());

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
index 4d58424..9ad4152 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.IOException;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 
@@ -44,12 +45,11 @@ public class GcsPathValidator implements PathValidator {
    * is well formed.
    */
   @Override
-  public String validateInputFilePatternSupported(String filepattern) {
+  public void validateInputFilePatternSupported(String filepattern) {
     GcsPath gcsPath = getGcsPath(filepattern);
-    checkArgument(gcpOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
-    String returnValue = verifyPath(filepattern);
+    checkArgument(GcsUtil.isGcsPatternSupported(gcsPath.getObject()));
+    verifyPath(filepattern);
     verifyPathIsAccessible(filepattern, "Could not find file %s");
-    return returnValue;
   }
 
   /**
@@ -57,10 +57,18 @@ public class GcsPathValidator implements PathValidator {
    * is well formed.
    */
   @Override
-  public String validateOutputFilePrefixSupported(String filePrefix) {
-    String returnValue = verifyPath(filePrefix);
+  public void validateOutputFilePrefixSupported(String filePrefix) {
+    verifyPath(filePrefix);
     verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
-    return returnValue;
+  }
+
+  @Override
+  public void validateOutputResourceSupported(ResourceId resourceId) {
+    checkArgument(
+        resourceId.getScheme().equals("gs"),
+        "Expected a valid 'gs://' path but was given: '%s'",
+        resourceId);
+    verifyPath(resourceId.toString());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
index c41d6bc..a34fb0f 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
@@ -21,19 +21,19 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
-
 import java.nio.charset.Charset;
-
 import javax.annotation.Nullable;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.ValidationEventHandler;
-
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CompressedSource;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -450,7 +450,7 @@ public class XmlIO {
   @AutoValue
   public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
     @Nullable
-    abstract String getFilenamePrefix();
+    abstract ValueProvider<ResourceId> getFilenamePrefix();
 
     @Nullable
     abstract Class<T> getRecordClass();
@@ -465,7 +465,7 @@ public class XmlIO {
 
     @AutoValue.Builder
     abstract static class Builder<T> {
-      abstract Builder<T> setFilenamePrefix(String baseOutputFilename);
+      abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> prefix);
 
       abstract Builder<T> setRecordClass(Class<T> recordClass);
 
@@ -482,8 +482,9 @@ public class XmlIO {
      * <p>Output files will have the name {@literal {filenamePrefix}-0000i-of-0000n.xml} where n is
      * the number of output bundles.
      */
-    public Write<T> toFilenamePrefix(String filenamePrefix) {
-      return toBuilder().setFilenamePrefix(filenamePrefix).build();
+    public Write<T> to(String filenamePrefix) {
+      ResourceId resourceId = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
+      return toBuilder().setFilenamePrefix(StaticValueProvider.of(resourceId)).build();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
index 6f87d75..963ab1b 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -24,7 +24,10 @@ import java.nio.channels.WritableByteChannel;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.Marshaller;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -32,12 +35,17 @@ import org.apache.beam.sdk.util.MimeTypes;
 
 /** Implementation of {@link XmlIO#write}. */
 class XmlSink<T> extends FileBasedSink<T> {
-  protected static final String XML_EXTENSION = "xml";
+  private static final String XML_EXTENSION = ".xml";
 
   private final XmlIO.Write<T> spec;
 
+  private static DefaultFilenamePolicy makeFilenamePolicy(XmlIO.Write<?> spec) {
+    return DefaultFilenamePolicy.constructUsingStandardParameters(
+        spec.getFilenamePrefix(), ShardNameTemplate.INDEX_OF_MAX, XML_EXTENSION);
+  }
+
   XmlSink(XmlIO.Write<T> spec) {
-    super(spec.getFilenamePrefix(), XML_EXTENSION);
+    super(spec.getFilenamePrefix(), makeFilenamePolicy(spec));
     this.spec = spec;
   }
 
@@ -79,7 +87,7 @@ class XmlSink<T> extends FileBasedSink<T> {
      * Creates a {@link XmlWriter} with a marshaller for the type it will write.
      */
     @Override
-    public XmlWriter<T> createWriter(PipelineOptions options) throws Exception {
+    public XmlWriter<T> createWriter() throws Exception {
       JAXBContext context;
       Marshaller marshaller;
       context = JAXBContext.newInstance(getSink().spec.getRecordClass());
@@ -99,7 +107,7 @@ class XmlSink<T> extends FileBasedSink<T> {
     }
 
     @VisibleForTesting
-    String getTemporaryDirectory() {
+    ResourceId getTemporaryDirectory() {
       return this.tempDirectory.get();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
index 7f9a8c5..aa0c1c3 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.xml;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
@@ -62,7 +63,7 @@ public class XmlSinkTest {
   public ExpectedException thrown = ExpectedException.none();
 
   private String testRootElement = "testElement";
-  private String testFilePrefix = "/path/to/testPrefix";
+  private String testFilePrefix = "/path/to/file";
 
   /**
    * An XmlWriter correctly writes objects as Xml elements with an enclosing root element.
@@ -72,12 +73,12 @@ public class XmlSinkTest {
     PipelineOptions options = PipelineOptionsFactory.create();
     XmlWriteOperation<Bird> writeOp =
         XmlIO.<Bird>write()
-            .toFilenamePrefix(testFilePrefix)
+            .to(testFilePrefix)
             .withRecordClass(Bird.class)
             .withRootElement("birds")
             .createSink()
             .createWriteOperation();
-    XmlWriter<Bird> writer = writeOp.createWriter(options);
+    XmlWriter<Bird> writer = writeOp.createWriter();
 
     List<Bird> bundle =
         Lists.newArrayList(new Bird("bemused", "robin"), new Bird("evasive", "goose"));
@@ -89,16 +90,15 @@ public class XmlSinkTest {
 
   @Test
   public void testXmlWriterCharset() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
     XmlWriteOperation<Bird> writeOp =
         XmlIO.<Bird>write()
-            .toFilenamePrefix(testFilePrefix)
+            .to(testFilePrefix)
             .withRecordClass(Bird.class)
             .withRootElement("birds")
             .withCharset(StandardCharsets.ISO_8859_1)
             .createSink()
             .createWriteOperation();
-    XmlWriter<Bird> writer = writeOp.createWriter(options);
+    XmlWriter<Bird> writer = writeOp.createWriter();
 
     List<Bird> bundle = Lists.newArrayList(new Bird("bréche", "pinçon"));
     List<String> lines = Arrays.asList("<birds>", "<bird>", "<species>pinçon</species>",
@@ -113,12 +113,15 @@ public class XmlSinkTest {
   public void testBuildXmlWriteTransform() {
     XmlIO.Write<Bird> write =
         XmlIO.<Bird>write()
-            .toFilenamePrefix(testFilePrefix)
+            .to(testFilePrefix)
             .withRecordClass(Bird.class)
             .withRootElement(testRootElement);
     assertEquals(Bird.class, write.getRecordClass());
     assertEquals(testRootElement, write.getRootElement());
-    assertEquals(testFilePrefix, write.getFilenamePrefix());
+    assertNotNull(write.getFilenamePrefix());
+    assertThat(
+        write.getFilenamePrefix().toString(),
+        containsString(testFilePrefix));
   }
 
   /** Validation ensures no fields are missing. */
@@ -126,19 +129,21 @@ public class XmlSinkTest {
   public void testValidateXmlSinkMissingRecordClass() {
     thrown.expect(NullPointerException.class);
     XmlIO.<Bird>write()
+        .to(testFilePrefix)
         .withRootElement(testRootElement)
-        .toFilenamePrefix(testFilePrefix)
         .validate(null);
   }
 
   @Test
   public void testValidateXmlSinkMissingRootElement() {
     thrown.expect(NullPointerException.class);
-    XmlIO.<Bird>write().withRecordClass(Bird.class).toFilenamePrefix(testFilePrefix).validate(null);
+    XmlIO.<Bird>write().withRecordClass(Bird.class)
+        .to(testFilePrefix)
+        .validate(null);
   }
 
   @Test
-  public void testValidateXmlSinkMissingFilePrefix() {
+  public void testValidateXmlSinkMissingOutputDirectory() {
     thrown.expect(NullPointerException.class);
     XmlIO.<Bird>write().withRecordClass(Bird.class).withRootElement(testRootElement).validate(null);
   }
@@ -151,16 +156,15 @@ public class XmlSinkTest {
     PipelineOptions options = PipelineOptionsFactory.create();
     XmlSink<Bird> sink =
         XmlIO.<Bird>write()
+            .to(testFilePrefix)
             .withRecordClass(Bird.class)
             .withRootElement(testRootElement)
-            .toFilenamePrefix(testFilePrefix)
             .createSink();
     XmlWriteOperation<Bird> writeOp = sink.createWriteOperation();
     Path outputPath = new File(testFilePrefix).toPath();
-    Path tempPath = new File(writeOp.getTemporaryDirectory()).toPath();
-    assertEquals(outputPath.getParent(), tempPath.getParent());
-    assertThat(
-        tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName()));
+    Path tempPath = new File(writeOp.getTemporaryDirectory().toString()).toPath();
+    assertThat(tempPath.getParent(), equalTo(outputPath.getParent()));
+    assertThat(tempPath.getFileName().toString(), containsString("temp-beam-"));
   }
 
   /**
@@ -173,28 +177,28 @@ public class XmlSinkTest {
         XmlIO.<Bird>write()
             .withRecordClass(Bird.class)
             .withRootElement(testRootElement)
-            .toFilenamePrefix(testFilePrefix)
+            .to(testFilePrefix)
             .createSink()
             .createWriteOperation();
-    XmlWriter<Bird> writer = writeOp.createWriter(options);
+    XmlWriter<Bird> writer = writeOp.createWriter();
     Path outputPath = new File(testFilePrefix).toPath();
-    Path tempPath = new File(writer.getWriteOperation().getTemporaryDirectory()).toPath();
-    assertEquals(outputPath.getParent(), tempPath.getParent());
-    assertThat(
-        tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName()));
+    Path tempPath = new File(writer.getWriteOperation().getTemporaryDirectory().toString())
+        .toPath();
+    assertThat(tempPath.getParent(), equalTo(outputPath.getParent()));
+    assertThat(tempPath.getFileName().toString(), containsString("temp-beam-"));
     assertNotNull(writer.marshaller);
   }
 
   @Test
   public void testDisplayData() {
     XmlIO.Write<Integer> write = XmlIO.<Integer>write()
-        .toFilenamePrefix("foobar")
+        .to(testFilePrefix)
         .withRootElement("bird")
         .withRecordClass(Integer.class);
 
     DisplayData displayData = DisplayData.from(write);
 
-    assertThat(displayData, hasDisplayItem("fileNamePattern", "foobar-SSSSS-of-NNNNN.xml"));
+    assertThat(displayData, hasDisplayItem("filenamePattern", "file-SSSSS-of-NNNNN.xml"));
     assertThat(displayData, hasDisplayItem("rootElement", "bird"));
     assertThat(displayData, hasDisplayItem("recordClass", Integer.class));
   }