You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/05/04 07:17:41 UTC
[35/50] [abbrv] beam git commit: Convert WriteFiles/FileBasedSink
from IOChannelFactory to FileSystems
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index ea0395d..a5dacd1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -29,7 +29,6 @@ import static org.junit.Assert.assertThat;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
-
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
@@ -39,12 +38,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-
import java.util.concurrent.ThreadLocalRandom;
-
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.SimpleSink.SimpleWriter;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -64,7 +64,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -149,7 +148,8 @@ public class WriteFilesTest {
}
private String getBaseOutputFilename() {
- return appendToTempFolder("baseoutput");
+ return getBaseOutputDirectory()
+ .resolve("file", StandardResolveOptions.RESOLVE_FILE).toString();
}
/**
@@ -188,6 +188,15 @@ public class WriteFilesTest {
Optional.of(1));
}
+ private ResourceId getBaseOutputDirectory() {
+ return LocalResources.fromFile(tmpFolder.getRoot(), true)
+ .resolve("output", StandardResolveOptions.RESOLVE_DIRECTORY);
+
+ }
+ private SimpleSink makeSimpleSink() {
+ return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "simple");
+ }
+
@Test
@Category(NeedsRunner.class)
public void testCustomShardedWrite() throws IOException {
@@ -204,7 +213,7 @@ public class WriteFilesTest {
timestamps.add(i + 1);
}
- SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "");
+ SimpleSink sink = makeSimpleSink();
WriteFiles<String> write = WriteFiles.to(sink).withSharding(new LargestInt());
p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
.apply(IDENTITY_MAP)
@@ -270,7 +279,7 @@ public class WriteFilesTest {
@Test
public void testBuildWrite() {
- SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "");
+ SimpleSink sink = makeSimpleSink();
WriteFiles<String> write = WriteFiles.to(sink).withNumShards(3);
assertThat((SimpleSink) write.getSink(), is(sink));
PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding =
@@ -293,7 +302,7 @@ public class WriteFilesTest {
@Test
public void testDisplayData() {
- SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "") {
+ SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("foo", "bar"));
@@ -308,7 +317,7 @@ public class WriteFilesTest {
@Test
public void testShardedDisplayData() {
- SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "") {
+ SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("foo", "bar"));
@@ -323,7 +332,7 @@ public class WriteFilesTest {
@Test
public void testCustomShardStrategyDisplayData() {
- SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "") {
+ SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("foo", "bar"));
@@ -354,7 +363,7 @@ public class WriteFilesTest {
* methods on a test sink in the correct order, as well as verifies that the elements of a
* PCollection are written to the sink.
*/
- private static void runWrite(
+ private void runWrite(
List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform,
String baseName) throws IOException {
runShardedWrite(inputs, transform, baseName, Optional.<Integer>absent());
@@ -366,7 +375,7 @@ public class WriteFilesTest {
* verifies that the elements of a PCollection are written to the sink. If numConfiguredShards
* is not null, also verifies that the output number of shards is correct.
*/
- private static void runShardedWrite(
+ private void runShardedWrite(
List<String> inputs,
PTransform<PCollection<String>, PCollection<String>> transform,
String baseName,
@@ -382,7 +391,7 @@ public class WriteFilesTest {
timestamps.add(i + 1);
}
- SimpleSink sink = new SimpleSink(baseName, "");
+ SimpleSink sink = makeSimpleSink();
WriteFiles<String> write = WriteFiles.to(sink);
if (numConfiguredShards.isPresent()) {
write = write.withNumShards(numConfiguredShards.get());
@@ -399,8 +408,10 @@ public class WriteFilesTest {
Optional<Integer> numExpectedShards) throws IOException {
List<File> outputFiles = Lists.newArrayList();
final String pattern = baseName + "*";
- for (String outputFileName : IOChannelUtils.getFactory(pattern).match(pattern)) {
- outputFiles.add(new File(outputFileName));
+ List<Metadata> metadata =
+ FileSystems.match(Collections.singletonList(pattern)).get(0).metadata();
+ for (Metadata meta : metadata) {
+ outputFiles.add(new File(meta.resourceId().toString()));
}
if (numExpectedShards.isPresent()) {
assertEquals(numExpectedShards.get().intValue(), outputFiles.size());
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
index 4d58424..9ad4152 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
@@ -44,12 +45,11 @@ public class GcsPathValidator implements PathValidator {
* is well formed.
*/
@Override
- public String validateInputFilePatternSupported(String filepattern) {
+ public void validateInputFilePatternSupported(String filepattern) {
GcsPath gcsPath = getGcsPath(filepattern);
- checkArgument(gcpOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
- String returnValue = verifyPath(filepattern);
+ checkArgument(GcsUtil.isGcsPatternSupported(gcsPath.getObject()));
+ verifyPath(filepattern);
verifyPathIsAccessible(filepattern, "Could not find file %s");
- return returnValue;
}
/**
@@ -57,10 +57,18 @@ public class GcsPathValidator implements PathValidator {
* is well formed.
*/
@Override
- public String validateOutputFilePrefixSupported(String filePrefix) {
- String returnValue = verifyPath(filePrefix);
+ public void validateOutputFilePrefixSupported(String filePrefix) {
+ verifyPath(filePrefix);
verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
- return returnValue;
+ }
+
+ @Override
+ public void validateOutputResourceSupported(ResourceId resourceId) {
+ checkArgument(
+ resourceId.getScheme().equals("gs"),
+ "Expected a valid 'gs://' path but was given: '%s'",
+ resourceId);
+ verifyPath(resourceId.toString());
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
index c41d6bc..a34fb0f 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
@@ -21,19 +21,19 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
-
import java.nio.charset.Charset;
-
import javax.annotation.Nullable;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.ValidationEventHandler;
-
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CompressedSource;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -450,7 +450,7 @@ public class XmlIO {
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
@Nullable
- abstract String getFilenamePrefix();
+ abstract ValueProvider<ResourceId> getFilenamePrefix();
@Nullable
abstract Class<T> getRecordClass();
@@ -465,7 +465,7 @@ public class XmlIO {
@AutoValue.Builder
abstract static class Builder<T> {
- abstract Builder<T> setFilenamePrefix(String baseOutputFilename);
+ abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> prefix);
abstract Builder<T> setRecordClass(Class<T> recordClass);
@@ -482,8 +482,9 @@ public class XmlIO {
* <p>Output files will have the name {@literal {filenamePrefix}-0000i-of-0000n.xml} where n is
* the number of output bundles.
*/
- public Write<T> toFilenamePrefix(String filenamePrefix) {
- return toBuilder().setFilenamePrefix(filenamePrefix).build();
+ public Write<T> to(String filenamePrefix) {
+ ResourceId resourceId = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
+ return toBuilder().setFilenamePrefix(StaticValueProvider.of(resourceId)).build();
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
index 6f87d75..963ab1b 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -24,7 +24,10 @@ import java.nio.channels.WritableByteChannel;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Marshaller;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.CoderUtils;
@@ -32,12 +35,17 @@ import org.apache.beam.sdk.util.MimeTypes;
/** Implementation of {@link XmlIO#write}. */
class XmlSink<T> extends FileBasedSink<T> {
- protected static final String XML_EXTENSION = "xml";
+ private static final String XML_EXTENSION = ".xml";
private final XmlIO.Write<T> spec;
+ private static DefaultFilenamePolicy makeFilenamePolicy(XmlIO.Write<?> spec) {
+ return DefaultFilenamePolicy.constructUsingStandardParameters(
+ spec.getFilenamePrefix(), ShardNameTemplate.INDEX_OF_MAX, XML_EXTENSION);
+ }
+
XmlSink(XmlIO.Write<T> spec) {
- super(spec.getFilenamePrefix(), XML_EXTENSION);
+ super(spec.getFilenamePrefix(), makeFilenamePolicy(spec));
this.spec = spec;
}
@@ -79,7 +87,7 @@ class XmlSink<T> extends FileBasedSink<T> {
* Creates a {@link XmlWriter} with a marshaller for the type it will write.
*/
@Override
- public XmlWriter<T> createWriter(PipelineOptions options) throws Exception {
+ public XmlWriter<T> createWriter() throws Exception {
JAXBContext context;
Marshaller marshaller;
context = JAXBContext.newInstance(getSink().spec.getRecordClass());
@@ -99,7 +107,7 @@ class XmlSink<T> extends FileBasedSink<T> {
}
@VisibleForTesting
- String getTemporaryDirectory() {
+ ResourceId getTemporaryDirectory() {
return this.tempDirectory.get();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
index 7f9a8c5..aa0c1c3 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.xml;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -62,7 +63,7 @@ public class XmlSinkTest {
public ExpectedException thrown = ExpectedException.none();
private String testRootElement = "testElement";
- private String testFilePrefix = "/path/to/testPrefix";
+ private String testFilePrefix = "/path/to/file";
/**
* An XmlWriter correctly writes objects as Xml elements with an enclosing root element.
@@ -72,12 +73,12 @@ public class XmlSinkTest {
PipelineOptions options = PipelineOptionsFactory.create();
XmlWriteOperation<Bird> writeOp =
XmlIO.<Bird>write()
- .toFilenamePrefix(testFilePrefix)
+ .to(testFilePrefix)
.withRecordClass(Bird.class)
.withRootElement("birds")
.createSink()
.createWriteOperation();
- XmlWriter<Bird> writer = writeOp.createWriter(options);
+ XmlWriter<Bird> writer = writeOp.createWriter();
List<Bird> bundle =
Lists.newArrayList(new Bird("bemused", "robin"), new Bird("evasive", "goose"));
@@ -89,16 +90,15 @@ public class XmlSinkTest {
@Test
public void testXmlWriterCharset() throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
XmlWriteOperation<Bird> writeOp =
XmlIO.<Bird>write()
- .toFilenamePrefix(testFilePrefix)
+ .to(testFilePrefix)
.withRecordClass(Bird.class)
.withRootElement("birds")
.withCharset(StandardCharsets.ISO_8859_1)
.createSink()
.createWriteOperation();
- XmlWriter<Bird> writer = writeOp.createWriter(options);
+ XmlWriter<Bird> writer = writeOp.createWriter();
List<Bird> bundle = Lists.newArrayList(new Bird("bréche", "pinçon"));
List<String> lines = Arrays.asList("<birds>", "<bird>", "<species>pinçon</species>",
@@ -113,12 +113,15 @@ public class XmlSinkTest {
public void testBuildXmlWriteTransform() {
XmlIO.Write<Bird> write =
XmlIO.<Bird>write()
- .toFilenamePrefix(testFilePrefix)
+ .to(testFilePrefix)
.withRecordClass(Bird.class)
.withRootElement(testRootElement);
assertEquals(Bird.class, write.getRecordClass());
assertEquals(testRootElement, write.getRootElement());
- assertEquals(testFilePrefix, write.getFilenamePrefix());
+ assertNotNull(write.getFilenamePrefix());
+ assertThat(
+ write.getFilenamePrefix().toString(),
+ containsString(testFilePrefix));
}
/** Validation ensures no fields are missing. */
@@ -126,19 +129,21 @@ public class XmlSinkTest {
public void testValidateXmlSinkMissingRecordClass() {
thrown.expect(NullPointerException.class);
XmlIO.<Bird>write()
+ .to(testFilePrefix)
.withRootElement(testRootElement)
- .toFilenamePrefix(testFilePrefix)
.validate(null);
}
@Test
public void testValidateXmlSinkMissingRootElement() {
thrown.expect(NullPointerException.class);
- XmlIO.<Bird>write().withRecordClass(Bird.class).toFilenamePrefix(testFilePrefix).validate(null);
+ XmlIO.<Bird>write().withRecordClass(Bird.class)
+ .to(testFilePrefix)
+ .validate(null);
}
@Test
- public void testValidateXmlSinkMissingFilePrefix() {
+ public void testValidateXmlSinkMissingOutputDirectory() {
thrown.expect(NullPointerException.class);
XmlIO.<Bird>write().withRecordClass(Bird.class).withRootElement(testRootElement).validate(null);
}
@@ -151,16 +156,15 @@ public class XmlSinkTest {
PipelineOptions options = PipelineOptionsFactory.create();
XmlSink<Bird> sink =
XmlIO.<Bird>write()
+ .to(testFilePrefix)
.withRecordClass(Bird.class)
.withRootElement(testRootElement)
- .toFilenamePrefix(testFilePrefix)
.createSink();
XmlWriteOperation<Bird> writeOp = sink.createWriteOperation();
Path outputPath = new File(testFilePrefix).toPath();
- Path tempPath = new File(writeOp.getTemporaryDirectory()).toPath();
- assertEquals(outputPath.getParent(), tempPath.getParent());
- assertThat(
- tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName()));
+ Path tempPath = new File(writeOp.getTemporaryDirectory().toString()).toPath();
+ assertThat(tempPath.getParent(), equalTo(outputPath.getParent()));
+ assertThat(tempPath.getFileName().toString(), containsString("temp-beam-"));
}
/**
@@ -173,28 +177,28 @@ public class XmlSinkTest {
XmlIO.<Bird>write()
.withRecordClass(Bird.class)
.withRootElement(testRootElement)
- .toFilenamePrefix(testFilePrefix)
+ .to(testFilePrefix)
.createSink()
.createWriteOperation();
- XmlWriter<Bird> writer = writeOp.createWriter(options);
+ XmlWriter<Bird> writer = writeOp.createWriter();
Path outputPath = new File(testFilePrefix).toPath();
- Path tempPath = new File(writer.getWriteOperation().getTemporaryDirectory()).toPath();
- assertEquals(outputPath.getParent(), tempPath.getParent());
- assertThat(
- tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName()));
+ Path tempPath = new File(writer.getWriteOperation().getTemporaryDirectory().toString())
+ .toPath();
+ assertThat(tempPath.getParent(), equalTo(outputPath.getParent()));
+ assertThat(tempPath.getFileName().toString(), containsString("temp-beam-"));
assertNotNull(writer.marshaller);
}
@Test
public void testDisplayData() {
XmlIO.Write<Integer> write = XmlIO.<Integer>write()
- .toFilenamePrefix("foobar")
+ .to(testFilePrefix)
.withRootElement("bird")
.withRecordClass(Integer.class);
DisplayData displayData = DisplayData.from(write);
- assertThat(displayData, hasDisplayItem("fileNamePattern", "foobar-SSSSS-of-NNNNN.xml"));
+ assertThat(displayData, hasDisplayItem("filenamePattern", "file-SSSSS-of-NNNNN.xml"));
assertThat(displayData, hasDisplayItem("rootElement", "bird"));
assertThat(displayData, hasDisplayItem("recordClass", Integer.class));
}