You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/12 22:04:01 UTC
[1/2] beam git commit: [BEAM-2277] Cherrypick #3121 to release-2.0.0
Repository: beam
Updated Branches:
refs/heads/release-2.0.0 c807fec9b -> 7b598f878
[BEAM-2277] Cherrypick #3121 to release-2.0.0
fbb0de129d Remove '/' entirely from determining FileSystem scheme
a6a5ff7be3 [BEAM-2277] Add ResourceIdTester and test existing ResourceId implementations
ec956c85ef Mark FileSystem and related as Experimental
15df211c75 [BEAM-2277] HadoopFileSystem: normalize implementation
f3540d47f1 Rename FileSystems.setDefaultConfigInWorkers
3921163829 Fix shading of guava testlib
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e6ee1d8b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e6ee1d8b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e6ee1d8b
Branch: refs/heads/release-2.0.0
Commit: e6ee1d8b98bcee6ee8c6a80b4af6646990e0c399
Parents: c807fec
Author: Dan Halperin <dh...@google.com>
Authored: Fri May 12 14:59:14 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 12 15:03:50 2017 -0700
----------------------------------------------------------------------
pom.xml | 4 +
.../utils/SerializablePipelineOptions.java | 2 +-
runners/direct-java/pom.xml | 4 +
.../utils/SerializedPipelineOptions.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 2 +-
.../runners/dataflow/DataflowRunnerTest.java | 6 +-
.../options/DataflowPipelineOptionsTest.java | 4 +-
.../runners/dataflow/util/PackageUtilTest.java | 2 +-
.../spark/translation/SparkRuntimeContext.java | 2 +-
sdks/java/core/pom.xml | 4 +
.../org/apache/beam/sdk/PipelineRunner.java | 2 +-
.../beam/sdk/annotations/Experimental.java | 7 +
.../java/org/apache/beam/sdk/io/AvroIO.java | 4 +
.../org/apache/beam/sdk/io/FileBasedSink.java | 21 ++-
.../java/org/apache/beam/sdk/io/FileSystem.java | 3 +
.../apache/beam/sdk/io/FileSystemRegistrar.java | 3 +
.../org/apache/beam/sdk/io/FileSystems.java | 21 ++-
.../beam/sdk/io/LocalFileSystemRegistrar.java | 3 +
.../org/apache/beam/sdk/io/LocalResources.java | 3 +
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 4 +
.../java/org/apache/beam/sdk/io/TextIO.java | 4 +
.../org/apache/beam/sdk/io/fs/ResourceId.java | 3 +
.../apache/beam/sdk/testing/TestPipeline.java | 2 +-
.../apache/beam/sdk/io/LocalResourceIdTest.java | 6 +
.../apache/beam/sdk/io/fs/ResourceIdTester.java | 150 +++++++++++++++++++
.../google-cloud-platform-core/pom.xml | 6 +
.../gcp/storage/GcsFileSystemRegistrar.java | 5 +-
.../gcp/storage/GcsResourceIdTest.java | 9 ++
sdks/java/io/hadoop-file-system/pom.xml | 13 ++
.../beam/sdk/io/hdfs/HadoopFileSystem.java | 32 ++--
.../sdk/io/hdfs/HadoopFileSystemOptions.java | 3 +
.../sdk/io/hdfs/HadoopFileSystemRegistrar.java | 3 +
.../beam/sdk/io/hdfs/HadoopResourceId.java | 16 +-
.../beam/sdk/io/hdfs/HadoopFileSystemTest.java | 5 +-
.../beam/sdk/io/hdfs/HadoopResourceIdTest.java | 71 +++++++++
35 files changed, 395 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3d02096..8f96acc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1446,6 +1446,10 @@
<relocations>
<relocation>
<pattern>com.google.common</pattern>
+ <excludes>
+ <!-- com.google.common is too generic, need to exclude guava-testlib -->
+ <exclude>com.google.common.**.testing.*</exclude>
+ </excludes>
<!--suppress MavenModelInspection -->
<shadedPattern>
org.apache.${renderedArtifactId}.repackaged.com.google.common
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
index 02afa7a..46b04fc 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
@@ -62,7 +62,7 @@ public class SerializablePipelineOptions implements Externalizable {
.as(ApexPipelineOptions.class);
if (FILE_SYSTEMS_INTIIALIZED.compareAndSet(false, true)) {
- FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+ FileSystems.setDefaultPipelineOptions(pipelineOptions);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index c971ce3..95d560c 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -131,6 +131,10 @@
</relocation>
<relocation>
<pattern>com.google.common</pattern>
+ <excludes>
+ <!-- com.google.common is too generic, need to exclude guava-testlib -->
+ <exclude>com.google.common.**.testing.*</exclude>
+ </excludes>
<shadedPattern>
org.apache.beam.runners.direct.repackaged.com.google.common
</shadedPattern>
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
index 84f3bf4..40b6dd6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -56,7 +56,7 @@ public class SerializedPipelineOptions implements Serializable {
try {
pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class);
- FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+ FileSystems.setDefaultPipelineOptions(pipelineOptions);
} catch (IOException e) {
throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 93c1e5b..87744f0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -144,7 +144,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Pipeline p = Pipeline.create(options);
// Enable the FileSystems API to know about gs:// URIs in this test.
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
.apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index ce01aa1..8f10b18 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -177,7 +177,7 @@ public class DataflowRunnerTest {
.apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));
// Enable the FileSystems API to know about gs:// URIs in this test.
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
return p;
}
@@ -246,7 +246,7 @@ public class DataflowRunnerTest {
options.setGcpCredential(new TestCredential());
// Configure the FileSystem registrar to use these options.
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
return options;
}
@@ -771,7 +771,7 @@ public class DataflowRunnerTest {
@Test
public void testInvalidNumberOfWorkerHarnessThreads() throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
options.setRunner(DataflowRunner.class);
options.setProject("foo-12345");
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
index 613604a..cea44f0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -137,7 +137,7 @@ public class DataflowPipelineOptionsTest {
@Test
public void testDefaultToTempLocation() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
options.setPathValidatorClass(NoopPathValidator.class);
options.setTempLocation("gs://temp_location/");
assertEquals("gs://temp_location/", options.getGcpTempLocation());
@@ -147,7 +147,7 @@ public class DataflowPipelineOptionsTest {
@Test
public void testDefaultToGcpTempLocation() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
options.setPathValidatorClass(NoopPathValidator.class);
options.setTempLocation("gs://temp_location/");
options.setGcpTempLocation("gs://gcp_temp_location/");
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index c7a660e..5d0c0f2 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -120,7 +120,7 @@ public class PackageUtilTest {
GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
pipelineOptions.setGcsUtil(mockGcsUtil);
- FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+ FileSystems.setDefaultPipelineOptions(pipelineOptions);
createOptions = StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 82e8b44..f3fe99c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -90,7 +90,7 @@ public class SparkRuntimeContext implements Serializable {
}
}
// Register standard FileSystems.
- FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+ FileSystems.setDefaultPipelineOptions(pipelineOptions);
}
return pipelineOptions;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 35d160d..712553d 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -161,6 +161,10 @@
<relocations>
<relocation>
<pattern>com.google.common</pattern>
+ <excludes>
+ <!-- com.google.common is too generic, need to exclude guava-testlib -->
+ <exclude>com.google.common.**.testing.*</exclude>
+ </excludes>
<!--suppress MavenModelInspection -->
<shadedPattern>
org.apache.beam.sdk.repackaged.com.google.common
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
index 87705af..c114501 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineRunner.java
@@ -41,7 +41,7 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
PipelineOptionsValidator.validate(PipelineOptions.class, options);
// (Re-)register standard FileSystems. Clobbers any prior credentials.
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
@SuppressWarnings("unchecked")
PipelineRunner<? extends PipelineResult> result =
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index ac02465..8224ebb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -23,6 +23,7 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+
/**
* Signifies that a public API (public class, method or field) is subject to incompatible changes,
* or even removal, in a future release.
@@ -79,6 +80,12 @@ public @interface Experimental {
/** Metrics-related experimental APIs. */
METRICS,
+ /**
+ * {@link org.apache.beam.sdk.io.FileSystem} and {@link org.apache.beam.sdk.io.fs.ResourceId}
+ * related APIs.
+ */
+ FILESYSTEM,
+
/** Experimental feature related to alternative, unnested encodings for coders. */
CODER_CONTEXT,
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index d13c6ff..6af0e79 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -30,6 +30,8 @@ import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
@@ -313,6 +315,7 @@ public class AvroIO {
* a common suffix (if supplied using {@link #withSuffix(String)}). This default can be
* overridden using {@link #withFilenamePolicy(FilenamePolicy)}.
*/
+ @Experimental(Kind.FILESYSTEM)
public Write<T> to(ResourceId outputPrefix) {
return toResource(StaticValueProvider.of(outputPrefix));
}
@@ -333,6 +336,7 @@ public class AvroIO {
/**
* Like {@link #to(ResourceId)}.
*/
+ @Experimental(Kind.FILESYSTEM)
public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) {
return toBuilder().setFilenamePrefix(outputPrefix).build();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 7f729a7..8102316 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -47,6 +47,8 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.NullableCoder;
@@ -115,6 +117,7 @@ import org.slf4j.LoggerFactory;
*
* @param <T> the type of values written to the sink.
*/
+@Experimental(Kind.FILESYSTEM)
public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
@@ -193,6 +196,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* {@code /}, {@code gs://my-bucket}, or {@code c://}. In that case, interpreting the string as a
* file will fail and this function will return a directory {@link ResourceId} instead.
*/
+ @Experimental(Kind.FILESYSTEM)
public static ResourceId convertToFileResourceIfPossible(String outputPrefix) {
try {
return FileSystems.matchNewResource(outputPrefix, false /* isDirectory */);
@@ -290,6 +294,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* as well as sharding information. The policy must return unique and consistent filenames
* for different windows and panes.
*/
+ @Experimental(Kind.FILESYSTEM)
public abstract ResourceId windowedFilename(
ResourceId outputDirectory, WindowedContext c, String extension);
@@ -302,6 +307,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* <p>The {@link Context} object only provides sharding information, which is used by the policy
* to generate unique and consistent filenames.
*/
+ @Experimental(Kind.FILESYSTEM)
@Nullable public abstract ResourceId unwindowedFilename(
ResourceId outputDirectory, Context c, String extension);
@@ -320,6 +326,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
/**
* Construct a {@link FileBasedSink} with the given filename policy, producing uncompressed files.
*/
+ @Experimental(Kind.FILESYSTEM)
public FileBasedSink(
ValueProvider<ResourceId> baseOutputDirectoryProvider, FilenamePolicy filenamePolicy) {
this(baseOutputDirectoryProvider, filenamePolicy, CompressionType.UNCOMPRESSED);
@@ -335,6 +342,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
/**
* Construct a {@link FileBasedSink} with the given filename policy and output channel type.
*/
+ @Experimental(Kind.FILESYSTEM)
public FileBasedSink(
ValueProvider<ResourceId> baseOutputDirectoryProvider,
FilenamePolicy filenamePolicy,
@@ -349,6 +357,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* Returns the base directory inside which files will be written according to the configured
* {@link FilenamePolicy}.
*/
+ @Experimental(Kind.FILESYSTEM)
public ValueProvider<ResourceId> getBaseOutputDirectoryProvider() {
return baseOutputDirectoryProvider;
}
@@ -358,6 +367,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* the {@link FilenamePolicy} may itself specify one or more inner directories before each output
* file, say when writing windowed outputs in a {@code output/YYYY/MM/DD/file.txt} format.
*/
+ @Experimental(Kind.FILESYSTEM)
public final FilenamePolicy getFilenamePolicy() {
return filenamePolicy;
}
@@ -424,9 +434,11 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
protected final ValueProvider<ResourceId> tempDirectory;
/** Whether windowed writes are being used. */
- protected boolean windowedWrites;
+ @Experimental(Kind.FILESYSTEM)
+ protected boolean windowedWrites;
/** Constructs a temporary file resource given the temporary directory and a filename. */
+ @Experimental(Kind.FILESYSTEM)
protected static ResourceId buildTemporaryFilename(ResourceId tempDirectory, String filename)
throws IOException {
return tempDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
@@ -472,6 +484,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* @param sink the FileBasedSink that will be used to configure this write operation.
* @param tempDirectory the base directory to be used for temporary output files.
*/
+ @Experimental(Kind.FILESYSTEM)
public WriteOperation(FileBasedSink<T> sink, ResourceId tempDirectory) {
this(sink, StaticValueProvider.of(tempDirectory));
}
@@ -527,6 +540,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
removeTemporaryFiles(outputFilenames.keySet(), !windowedWrites);
}
+ @Experimental(Kind.FILESYSTEM)
protected final Map<ResourceId, ResourceId> buildOutputFilenames(
Iterable<FileResult> writerResults) {
int numShards = Iterables.size(writerResults);
@@ -610,6 +624,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* @param filenames the filenames of temporary files.
*/
@VisibleForTesting
+ @Experimental(Kind.FILESYSTEM)
final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames)
throws IOException {
int numFiles = filenames.size();
@@ -637,6 +652,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* temporary files, this method will remove them.
*/
@VisibleForTesting
+ @Experimental(Kind.FILESYSTEM)
final void removeTemporaryFiles(
Set<ResourceId> knownFiles, boolean shouldRemoveTemporaryDirectory) throws IOException {
ResourceId tempDir = tempDirectory.get();
@@ -945,6 +961,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
private final BoundedWindow window;
private final PaneInfo paneInfo;
+ @Experimental(Kind.FILESYSTEM)
public FileResult(ResourceId tempFilename, int shard, BoundedWindow window, PaneInfo paneInfo) {
this.tempFilename = tempFilename;
this.shard = shard;
@@ -952,6 +969,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
this.paneInfo = paneInfo;
}
+ @Experimental(Kind.FILESYSTEM)
public ResourceId getTempFilename() {
return tempFilename;
}
@@ -972,6 +990,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
return paneInfo;
}
+ @Experimental(Kind.FILESYSTEM)
public ResourceId getDestinationFile(FilenamePolicy policy, ResourceId outputDirectory,
int numShards, String extension) {
checkArgument(getShard() != UNKNOWN_SHARDNUM);
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
index 375264a..601feca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
@@ -23,6 +23,8 @@ import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
@@ -35,6 +37,7 @@ import org.apache.beam.sdk.io.fs.ResourceId;
* <p>All methods are protected, and they are for file system providers to implement.
* Clients should use {@link FileSystems} utility.
*/
+@Experimental(Kind.FILESYSTEM)
public abstract class FileSystem<ResourceIdT extends ResourceId> {
/**
* This is the entry point to convert user-provided specs to {@link ResourceIdT ResourceIds}.
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
index 78a91f6..50ee6eb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io;
import com.google.auto.service.AutoService;
import java.util.ServiceLoader;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.PipelineOptions;
/**
@@ -31,6 +33,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
* <p>It is optional but recommended to use one of the many build time tools such as
* {@link AutoService} to generate the necessary META-INF files automatically.
*/
+@Experimental(Kind.FILESYSTEM)
public interface FileSystemRegistrar {
/**
* Create zero or more {@link FileSystem filesystems} from the given {@link PipelineOptions}.
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index 2e11177..1aacc90 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -49,6 +49,9 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
@@ -63,11 +66,12 @@ import org.apache.beam.sdk.values.KV;
/**
* Clients facing {@link FileSystem} utility.
*/
+@Experimental(Kind.FILESYSTEM)
public class FileSystems {
public static final String DEFAULT_SCHEME = "default";
- private static final Pattern URI_SCHEME_PATTERN = Pattern.compile(
- "(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*)://.*");
+ private static final Pattern FILE_SCHEME_PATTERN =
+ Pattern.compile("(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*):.*");
private static final AtomicReference<Map<String, FileSystem>> SCHEME_TO_FILESYSTEM =
new AtomicReference<Map<String, FileSystem>>(
@@ -416,7 +420,7 @@ public class FileSystems {
// from their use in the URI spec. ('*' is not reserved).
// Here, we just need the scheme, which is so circumscribed as to be
// very easy to extract with a regex.
- Matcher matcher = URI_SCHEME_PATTERN.matcher(spec);
+ Matcher matcher = FILE_SCHEME_PATTERN.matcher(spec);
if (!matcher.matches()) {
return "file";
@@ -445,12 +449,21 @@ public class FileSystems {
/********************************** METHODS FOR REGISTRATION **********************************/
+ /** @deprecated to be removed. */
+ @Deprecated // for DataflowRunner backwards compatibility.
+ public static void setDefaultConfigInWorkers(PipelineOptions options) {
+ setDefaultPipelineOptions(options);
+ }
+
/**
* Sets the default configuration in workers.
*
* <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes.
+ *
+ * <p>This is expected only to be used by runners after {@code Pipeline.run}, or in tests.
*/
- public static void setDefaultConfigInWorkers(PipelineOptions options) {
+ @Internal
+ public static void setDefaultPipelineOptions(PipelineOptions options) {
checkNotNull(options, "options");
Set<FileSystemRegistrar> registrars =
Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
index f182360..7896e20 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
@@ -20,12 +20,15 @@ package org.apache.beam.sdk.io;
import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableList;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.PipelineOptions;
/**
* {@link AutoService} registrar for the {@link LocalFileSystem}.
*/
@AutoService(FileSystemRegistrar.class)
+@Experimental(Kind.FILESYSTEM)
public class LocalFileSystemRegistrar implements FileSystemRegistrar {
@Override
public Iterable<FileSystem> fromOptions(@Nullable PipelineOptions options) {
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java
index 817829b..d234bae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -28,6 +30,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
/**
* Helper functions for producing a {@link ResourceId} that references a local file or directory.
*/
+@Experimental(Kind.FILESYSTEM)
public final class LocalResources {
public static ResourceId fromFile(File file, boolean isDirectory) {
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index f73d6f3..c274595 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -31,6 +31,8 @@ import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
@@ -277,6 +279,7 @@ public class TFRecordIO {
*
* <p>For more information on filenames, see {@link DefaultFilenamePolicy}.
*/
+ @Experimental(Kind.FILESYSTEM)
public Write to(ResourceId outputResource) {
return toResource(StaticValueProvider.of(outputResource));
}
@@ -284,6 +287,7 @@ public class TFRecordIO {
/**
* Like {@link #to(ResourceId)}.
*/
+ @Experimental(Kind.FILESYSTEM)
public Write toResource(ValueProvider<ResourceId> outputResource) {
return toBuilder().setOutputPrefix(outputResource).build();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index af6a069..5c068ce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
@@ -306,6 +308,7 @@ public class TextIO {
* in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should
* not be set.
*/
+ @Experimental(Kind.FILESYSTEM)
public Write to(ResourceId filenamePrefix) {
return toResource(StaticValueProvider.of(filenamePrefix));
}
@@ -326,6 +329,7 @@ public class TextIO {
/**
* Like {@link #to(ResourceId)}.
*/
+ @Experimental(Kind.FILESYSTEM)
public Write toResource(ValueProvider<ResourceId> filenamePrefix) {
return toBuilder().setFilenamePrefix(filenamePrefix).build();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
index dfe771f..9196034 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.io.fs;
import java.io.Serializable;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
@@ -45,6 +47,7 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
* to generate {@link ResourceId ResourceIds} for resources that may not yet exist.
* </ul>
*/
+@Experimental(Kind.FILESYSTEM)
public interface ResourceId extends Serializable {
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index e04c2f8..9206e04 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -413,7 +413,7 @@ public class TestPipeline extends Pipeline implements TestRule {
}
options.setStableUniqueNames(CheckEnabled.ERROR);
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
return options;
} catch (IOException e) {
throw new RuntimeException(
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java
index 7ea85cf..e1ca303 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalResourceIdTest.java
@@ -31,6 +31,7 @@ import java.io.File;
import java.nio.file.Paths;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.fs.ResourceIdTester;
import org.apache.commons.lang3.SystemUtils;
import org.junit.Rule;
import org.junit.Test;
@@ -259,6 +260,11 @@ public class LocalResourceIdTest {
"xyz.txt");
}
+ @Test
+ public void testResourceIdTester() throws Exception {
+ ResourceIdTester.runResourceIdBattery(toResourceIdentifier("/tmp/foo/"));
+ }
+
private LocalResourceId toResourceIdentifier(String str) throws Exception {
boolean isDirectory;
if (SystemUtils.IS_OS_WINDOWS) {
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java
new file mode 100644
index 0000000..8ceaeed
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/fs/ResourceIdTester.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY;
+import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.common.testing.EqualsTester;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.io.FileSystems;
+
+/**
+ * A utility to test {@link ResourceId} implementations.
+ */
+@Experimental(Kind.FILESYSTEM)
+public final class ResourceIdTester {
+ /**
+ * Enforces that the {@link ResourceId} implementation of {@code baseDirectory} meets the
+ * {@link ResourceId} spec.
+ */
+ public static void runResourceIdBattery(ResourceId baseDirectory) {
+ checkArgument(
+ baseDirectory.isDirectory(), "baseDirectory %s is not a directory", baseDirectory);
+
+ List<ResourceId> allResourceIds = new ArrayList<>();
+ allResourceIds.add(baseDirectory);
+
+ // Validate that individual resources meet the fairly restrictive spec we have.
+ validateResourceIds(allResourceIds);
+
+ // Validate operations with resolving child resources.
+ validateResolvingIds(baseDirectory, allResourceIds);
+
+ // Validate safeguards against resolving bad paths.
+ validateFailureResolvingIds(baseDirectory);
+ }
+
+ private static void validateResolvingIds(
+ ResourceId baseDirectory, List<ResourceId> allResourceIds) {
+ ResourceId file1 = baseDirectory.resolve("child1", RESOLVE_FILE);
+ ResourceId file2 = baseDirectory.resolve("child2", RESOLVE_FILE);
+ ResourceId file2a = baseDirectory.resolve("child2", RESOLVE_FILE);
+ allResourceIds.add(file1);
+ allResourceIds.add(file2);
+ assertThat("Resolved file isDirectory()", file1.isDirectory(), is(false));
+ assertThat("Resolved file isDirectory()", file2.isDirectory(), is(false));
+ assertThat("Resolved file isDirectory()", file2a.isDirectory(), is(false));
+
+ ResourceId dir1 = baseDirectory.resolve("child1", RESOLVE_DIRECTORY);
+ ResourceId dir2 = baseDirectory.resolve("child2", RESOLVE_DIRECTORY);
+ ResourceId dir2a = baseDirectory.resolve("child2", RESOLVE_DIRECTORY);
+ assertThat("Resolved directory isDirectory()", dir1.isDirectory(), is(true));
+ assertThat("Resolved directory isDirectory()", dir2.isDirectory(), is(true));
+ assertThat("Resolved directory isDirectory()", dir2a.isDirectory(), is(true));
+ allResourceIds.add(dir1);
+ allResourceIds.add(dir2);
+
+ // ResourceIds in equality groups.
+ new EqualsTester()
+ .addEqualityGroup(file1)
+ .addEqualityGroup(file2, file2a)
+ .addEqualityGroup(dir1, dir1.getCurrentDirectory())
+ .addEqualityGroup(dir2, dir2a, dir2.getCurrentDirectory())
+ .addEqualityGroup(baseDirectory, file1.getCurrentDirectory(), file2.getCurrentDirectory())
+ .testEquals();
+
+ // ResourceId toString() in equality groups.
+ new EqualsTester()
+ .addEqualityGroup(file1.toString())
+ .addEqualityGroup(file2.toString(), file2a.toString())
+ .addEqualityGroup(dir1.toString(), dir1.getCurrentDirectory().toString())
+ .addEqualityGroup(dir2.toString(), dir2a.toString(), dir2.getCurrentDirectory().toString())
+ .addEqualityGroup(
+ baseDirectory.toString(),
+ file1.getCurrentDirectory().toString(),
+ file2.getCurrentDirectory().toString())
+ .testEquals();
+
+ // TODO: test resolving strings that need to be escaped.
+ // Possible spec: https://tools.ietf.org/html/rfc3986#section-2
+ // May need options to be filesystem-independent, e.g., if filesystems ban certain chars.
+ }
+
+ private static void validateFailureResolvingIds(ResourceId baseDirectory) {
+ try {
+ ResourceId badFile = baseDirectory.resolve("file/", RESOLVE_FILE);
+ fail(String.format("Resolving badFile %s should have failed", badFile));
+ } catch (Throwable t) {
+ // expected
+ }
+
+ ResourceId file = baseDirectory.resolve("file", RESOLVE_FILE);
+ try {
+ baseDirectory.resolve("file2", RESOLVE_FILE);
+ fail(String.format("Should not be able to resolve against file resource %s", file));
+ } catch (Throwable t) {
+ // expected
+ }
+ }
+
+ private static void validateResourceIds(List<ResourceId> resourceIds) {
+ for (ResourceId resourceId : resourceIds) {
+ // ResourceIds should equal themselves.
+ assertThat("ResourceId equal to itself", resourceId, equalTo(resourceId));
+
+ // ResourceIds should be clonable via FileSystems#matchNewResource.
+ ResourceId cloned;
+ if (resourceId.isDirectory()) {
+ cloned = FileSystems.matchNewResource(resourceId.toString(), true /* isDirectory */);
+ } else {
+ cloned = FileSystems.matchNewResource(resourceId.toString(), false /* isDirectory */);
+ }
+ assertThat(
+ "ResourceId equals clone of itself", cloned, equalTo(resourceId));
+ // .. and clones have consistent toString.
+ assertThat(
+ "ResourceId toString consistency", cloned.toString(), equalTo(resourceId.toString()));
+ // .. and have consistent isDirectory.
+ assertThat(
+ "ResourceId isDirectory consistency",
+ cloned.isDirectory(),
+ equalTo(resourceId.isDirectory()));
+ }
+ }
+
+ private ResourceIdTester() {} // prevent instantiation
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/extensions/google-cloud-platform-core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/pom.xml b/sdks/java/extensions/google-cloud-platform-core/pom.xml
index 5f187c4..891c476 100644
--- a/sdks/java/extensions/google-cloud-platform-core/pom.xml
+++ b/sdks/java/extensions/google-cloud-platform-core/pom.xml
@@ -152,6 +152,12 @@
<!-- test dependencies -->
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-testlib</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<classifier>tests</classifier>
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
index 9f5980a..f3a67b2 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
@@ -22,6 +22,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableList;
import javax.annotation.Nonnull;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.FileSystemRegistrar;
@@ -31,13 +33,14 @@ import org.apache.beam.sdk.options.PipelineOptions;
* {@link AutoService} registrar for the {@link GcsFileSystem}.
*/
@AutoService(FileSystemRegistrar.class)
+@Experimental(Kind.FILESYSTEM)
public class GcsFileSystemRegistrar implements FileSystemRegistrar {
@Override
public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
checkNotNull(
options,
- "Expect the runner have called FileSystems.setDefaultConfigInWorkers().");
+ "Expect the runner have called FileSystems.setDefaultPipelineOptions().");
return ImmutableList.<FileSystem>of(new GcsFileSystem(options.as(GcsOptions.class)));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
index b245610..c1e214e 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
@@ -22,8 +22,11 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
+import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.fs.ResourceIdTester;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.junit.Rule;
import org.junit.Test;
@@ -163,6 +166,12 @@ public class GcsResourceIdTest {
"xyz.txt");
}
+ @Test
+ public void testResourceIdTester() throws Exception {
+ FileSystems.setDefaultPipelineOptions(TestPipeline.testingPipelineOptions());
+ ResourceIdTester.runResourceIdBattery(toResourceIdentifier("gs://bucket/foo/"));
+ }
+
private GcsResourceId toResourceIdentifier(String str) throws Exception {
return GcsResourceId.fromGcsPath(GcsPath.fromUri(str));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/pom.xml b/sdks/java/io/hadoop-file-system/pom.xml
index 46f3e32..b90ccf0 100644
--- a/sdks/java/io/hadoop-file-system/pom.xml
+++ b/sdks/java/io/hadoop-file-system/pom.xml
@@ -157,6 +157,19 @@
</dependency>
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-testlib</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index 154a818..d519a8c 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.net.URI;
-import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
@@ -82,8 +81,9 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
List<Metadata> metadata = new ArrayList<>();
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isFile()) {
+ URI uri = dropEmptyAuthority(fileStatus.getPath().toUri().toString());
metadata.add(Metadata.builder()
- .setResourceId(new HadoopResourceId(fileStatus.getPath().toUri()))
+ .setResourceId(new HadoopResourceId(uri))
.setIsReadSeekEfficient(true)
.setSizeBytes(fileStatus.getLen())
.build());
@@ -151,19 +151,13 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
@Override
protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
- try {
- if (singleResourceSpec.endsWith("/") && !isDirectory) {
- throw new IllegalArgumentException(String.format(
- "Expected file path but received directory path %s", singleResourceSpec));
- }
- return !singleResourceSpec.endsWith("/") && isDirectory
- ? new HadoopResourceId(new URI(singleResourceSpec + "/"))
- : new HadoopResourceId(new URI(singleResourceSpec));
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(
- String.format("Invalid spec %s directory %s", singleResourceSpec, isDirectory),
- e);
+ if (singleResourceSpec.endsWith("/") && !isDirectory) {
+ throw new IllegalArgumentException(String.format(
+ "Expected file path but received directory path %s", singleResourceSpec));
}
+ return !singleResourceSpec.endsWith("/") && isDirectory
+ ? new HadoopResourceId(dropEmptyAuthority(singleResourceSpec + "/"))
+ : new HadoopResourceId(dropEmptyAuthority(singleResourceSpec));
}
@Override
@@ -237,4 +231,14 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
inputStream.close();
}
}
+
+ private static URI dropEmptyAuthority(String uriStr) {
+ URI uri = URI.create(uriStr);
+ String prefix = uri.getScheme() + ":///";
+ if (uriStr.startsWith(prefix)) {
+ return URI.create(uri.getScheme() + ":/" + uriStr.substring(prefix.length()));
+ } else {
+ return uri;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
index 45f43e2..f5183d5 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
@@ -25,6 +25,8 @@ import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
@@ -38,6 +40,7 @@ import org.slf4j.LoggerFactory;
* {@link PipelineOptions} which encapsulate {@link Configuration Hadoop Configuration}
* for the {@link HadoopFileSystem}.
*/
+@Experimental(Kind.FILESYSTEM)
public interface HadoopFileSystemOptions extends PipelineOptions {
@Description("A list of Hadoop configurations used to configure zero or more Hadoop filesystems. "
+ "By default, Hadoop configuration is loaded from 'core-site.xml' and 'hdfs-site.xml' "
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
index 9159df3..8c57089 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
@@ -25,6 +25,8 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.FileSystemRegistrar;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -34,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
* {@link AutoService} registrar for the {@link HadoopFileSystem}.
*/
@AutoService(FileSystemRegistrar.class)
+@Experimental(Kind.FILESYSTEM)
public class HadoopFileSystemRegistrar implements FileSystemRegistrar {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
index e570864..88fa32a 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
@@ -17,9 +17,12 @@
*/
package org.apache.beam.sdk.io.hdfs;
+import static com.google.common.base.Preconditions.checkArgument;
+
import java.net.URI;
import java.util.Objects;
import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.hadoop.fs.Path;
@@ -35,7 +38,18 @@ class HadoopResourceId implements ResourceId {
@Override
public ResourceId resolve(String other, ResolveOptions resolveOptions) {
- return new HadoopResourceId(uri.resolve(other));
+ if (resolveOptions == StandardResolveOptions.RESOLVE_DIRECTORY) {
+ if (!other.endsWith("/")) {
+ other += '/';
+ }
+ return new HadoopResourceId(uri.resolve(other));
+ } else if (resolveOptions == StandardResolveOptions.RESOLVE_FILE) {
+ checkArgument(!other.endsWith("/"), "Resolving a file with a directory path: %s", other);
+ return new HadoopResourceId(uri.resolve(other));
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("Unexpected StandardResolveOptions %s", resolveOptions));
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
index cf86c36..88275f4 100644
--- a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -63,14 +63,13 @@ public class HadoopFileSystemTest {
@Rule public TestPipeline p = TestPipeline.create();
@Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
@Rule public ExpectedException thrown = ExpectedException.none();
- private Configuration configuration;
private MiniDFSCluster hdfsCluster;
private URI hdfsClusterBaseUri;
private HadoopFileSystem fileSystem;
@Before
public void setUp() throws Exception {
- configuration = new Configuration();
+ Configuration configuration = new Configuration();
configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath());
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration);
hdfsCluster = builder.build();
@@ -220,7 +219,7 @@ public class HadoopFileSystemTest {
HadoopFileSystemOptions options = TestPipeline.testingPipelineOptions()
.as(HadoopFileSystemOptions.class);
options.setHdfsConfiguration(ImmutableList.of(fileSystem.fileSystem.getConf()));
- FileSystems.setDefaultConfigInWorkers(options);
+ FileSystems.setDefaultPipelineOptions(options);
PCollection<String> pc = p.apply(
TextIO.read().from(testPath("testFile*").toString()));
PAssert.that(pc).containsInAnyOrder("testDataA", "testDataB", "testDataC");
http://git-wip-us.apache.org/repos/asf/beam/blob/e6ee1d8b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
new file mode 100644
index 0000000..c4a8577
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopResourceIdTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import java.net.URI;
+import java.util.Collections;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.fs.ResourceIdTester;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for {@link HadoopResourceId}.
+ */
+public class HadoopResourceIdTest {
+
+ private MiniDFSCluster hdfsCluster;
+ private URI hdfsClusterBaseUri;
+
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Before
+ public void setUp() throws Exception {
+ Configuration configuration = new Configuration();
+ configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath());
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration);
+ hdfsCluster = builder.build();
+ hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/");
+
+ // Register HadoopFileSystem for this test.
+ HadoopFileSystemOptions options = PipelineOptionsFactory.as(HadoopFileSystemOptions.class);
+ options.setHdfsConfiguration(Collections.singletonList(configuration));
+ FileSystems.setDefaultPipelineOptions(options);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ hdfsCluster.shutdown();
+ }
+
+ @Test
+ public void testResourceIdTester() throws Exception {
+ ResourceId baseDirectory =
+ FileSystems.matchNewResource(
+ "hdfs://" + hdfsClusterBaseUri.getPath(), true /* isDirectory */);
+ ResourceIdTester.runResourceIdBattery(baseDirectory);
+ }
+}
[2/2] beam git commit: This closes #3129
Posted by dh...@apache.org.
This closes #3129
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b598f87
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b598f87
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b598f87
Branch: refs/heads/release-2.0.0
Commit: 7b598f8781211c165bf183139d673a407993c32f
Parents: c807fec e6ee1d8
Author: Dan Halperin <dh...@google.com>
Authored: Fri May 12 15:03:54 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 12 15:03:54 2017 -0700
----------------------------------------------------------------------
pom.xml | 4 +
.../utils/SerializablePipelineOptions.java | 2 +-
runners/direct-java/pom.xml | 4 +
.../utils/SerializedPipelineOptions.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 2 +-
.../runners/dataflow/DataflowRunnerTest.java | 6 +-
.../options/DataflowPipelineOptionsTest.java | 4 +-
.../runners/dataflow/util/PackageUtilTest.java | 2 +-
.../spark/translation/SparkRuntimeContext.java | 2 +-
sdks/java/core/pom.xml | 4 +
.../org/apache/beam/sdk/PipelineRunner.java | 2 +-
.../beam/sdk/annotations/Experimental.java | 7 +
.../java/org/apache/beam/sdk/io/AvroIO.java | 4 +
.../org/apache/beam/sdk/io/FileBasedSink.java | 21 ++-
.../java/org/apache/beam/sdk/io/FileSystem.java | 3 +
.../apache/beam/sdk/io/FileSystemRegistrar.java | 3 +
.../org/apache/beam/sdk/io/FileSystems.java | 21 ++-
.../beam/sdk/io/LocalFileSystemRegistrar.java | 3 +
.../org/apache/beam/sdk/io/LocalResources.java | 3 +
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 4 +
.../java/org/apache/beam/sdk/io/TextIO.java | 4 +
.../org/apache/beam/sdk/io/fs/ResourceId.java | 3 +
.../apache/beam/sdk/testing/TestPipeline.java | 2 +-
.../apache/beam/sdk/io/LocalResourceIdTest.java | 6 +
.../apache/beam/sdk/io/fs/ResourceIdTester.java | 150 +++++++++++++++++++
.../google-cloud-platform-core/pom.xml | 6 +
.../gcp/storage/GcsFileSystemRegistrar.java | 5 +-
.../gcp/storage/GcsResourceIdTest.java | 9 ++
sdks/java/io/hadoop-file-system/pom.xml | 13 ++
.../beam/sdk/io/hdfs/HadoopFileSystem.java | 32 ++--
.../sdk/io/hdfs/HadoopFileSystemOptions.java | 3 +
.../sdk/io/hdfs/HadoopFileSystemRegistrar.java | 3 +
.../beam/sdk/io/hdfs/HadoopResourceId.java | 16 +-
.../beam/sdk/io/hdfs/HadoopFileSystemTest.java | 5 +-
.../beam/sdk/io/hdfs/HadoopResourceIdTest.java | 71 +++++++++
35 files changed, 395 insertions(+), 36 deletions(-)
----------------------------------------------------------------------