You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/12 21:59:24 UTC
[5/7] beam git commit: Mark FileSystem and related as Experimental
Mark FileSystem and related as Experimental
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ec956c85
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ec956c85
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ec956c85
Branch: refs/heads/master
Commit: ec956c85efa16d00c5e218ee1257b8ee62a2013d
Parents: a6a5ff7
Author: Dan Halperin <dh...@google.com>
Authored: Fri May 12 11:38:54 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 12 14:59:10 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/annotations/Experimental.java | 7 +++++++
.../java/org/apache/beam/sdk/io/AvroIO.java | 4 ++++
.../org/apache/beam/sdk/io/FileBasedSink.java | 21 +++++++++++++++++++-
.../java/org/apache/beam/sdk/io/FileSystem.java | 3 +++
.../apache/beam/sdk/io/FileSystemRegistrar.java | 3 +++
.../org/apache/beam/sdk/io/FileSystems.java | 3 +++
.../beam/sdk/io/LocalFileSystemRegistrar.java | 3 +++
.../org/apache/beam/sdk/io/LocalResources.java | 3 +++
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 4 ++++
.../java/org/apache/beam/sdk/io/TextIO.java | 4 ++++
.../org/apache/beam/sdk/io/fs/ResourceId.java | 3 +++
.../gcp/storage/GcsFileSystemRegistrar.java | 3 +++
.../sdk/io/hdfs/HadoopFileSystemOptions.java | 3 +++
.../sdk/io/hdfs/HadoopFileSystemRegistrar.java | 3 +++
14 files changed, 66 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ec956c85/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index ac02465..8224ebb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -23,6 +23,7 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+
/**
* Signifies that a public API (public class, method or field) is subject to incompatible changes,
* or even removal, in a future release.
@@ -79,6 +80,12 @@ public @interface Experimental {
/** Metrics-related experimental APIs. */
METRICS,
+ /**
+ * {@link org.apache.beam.sdk.io.FileSystem} and {@link org.apache.beam.sdk.io.fs.ResourceId}
+ * related APIs.
+ */
+ FILESYSTEM,
+
/** Experimental feature related to alternative, unnested encodings for coders. */
CODER_CONTEXT,
http://git-wip-us.apache.org/repos/asf/beam/blob/ec956c85/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index d13c6ff..6af0e79 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -30,6 +30,8 @@ import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
@@ -313,6 +315,7 @@ public class AvroIO {
* a common suffix (if supplied using {@link #withSuffix(String)}). This default can be
* overridden using {@link #withFilenamePolicy(FilenamePolicy)}.
*/
+ @Experimental(Kind.FILESYSTEM)
public Write<T> to(ResourceId outputPrefix) {
return toResource(StaticValueProvider.of(outputPrefix));
}
@@ -333,6 +336,7 @@ public class AvroIO {
/**
* Like {@link #to(ResourceId)}.
*/
+ @Experimental(Kind.FILESYSTEM)
public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) {
return toBuilder().setFilenamePrefix(outputPrefix).build();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ec956c85/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 7f729a7..8102316 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -47,6 +47,8 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.NullableCoder;
@@ -115,6 +117,7 @@ import org.slf4j.LoggerFactory;
*
* @param <T> the type of values written to the sink.
*/
+@Experimental(Kind.FILESYSTEM)
public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
@@ -193,6 +196,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* {@code /}, {@code gs://my-bucket}, or {@code c://}. In that case, interpreting the string as a
* file will fail and this function will return a directory {@link ResourceId} instead.
*/
+ @Experimental(Kind.FILESYSTEM)
public static ResourceId convertToFileResourceIfPossible(String outputPrefix) {
try {
return FileSystems.matchNewResource(outputPrefix, false /* isDirectory */);
@@ -290,6 +294,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* as well as sharding information. The policy must return unique and consistent filenames
* for different windows and panes.
*/
+ @Experimental(Kind.FILESYSTEM)
public abstract ResourceId windowedFilename(
ResourceId outputDirectory, WindowedContext c, String extension);
@@ -302,6 +307,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* <p>The {@link Context} object only provides sharding information, which is used by the policy
* to generate unique and consistent filenames.
*/
+ @Experimental(Kind.FILESYSTEM)
@Nullable public abstract ResourceId unwindowedFilename(
ResourceId outputDirectory, Context c, String extension);
@@ -320,6 +326,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
/**
* Construct a {@link FileBasedSink} with the given filename policy, producing uncompressed files.
*/
+ @Experimental(Kind.FILESYSTEM)
public FileBasedSink(
ValueProvider<ResourceId> baseOutputDirectoryProvider, FilenamePolicy filenamePolicy) {
this(baseOutputDirectoryProvider, filenamePolicy, CompressionType.UNCOMPRESSED);
@@ -335,6 +342,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
/**
* Construct a {@link FileBasedSink} with the given filename policy and output channel type.
*/
+ @Experimental(Kind.FILESYSTEM)
public FileBasedSink(
ValueProvider<ResourceId> baseOutputDirectoryProvider,
FilenamePolicy filenamePolicy,
@@ -349,6 +357,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* Returns the base directory inside which files will be written according to the configured
* {@link FilenamePolicy}.
*/
+ @Experimental(Kind.FILESYSTEM)
public ValueProvider<ResourceId> getBaseOutputDirectoryProvider() {
return baseOutputDirectoryProvider;
}
@@ -358,6 +367,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* the {@link FilenamePolicy} may itself specify one or more inner directories before each output
* file, say when writing windowed outputs in a {@code output/YYYY/MM/DD/file.txt} format.
*/
+ @Experimental(Kind.FILESYSTEM)
public final FilenamePolicy getFilenamePolicy() {
return filenamePolicy;
}
@@ -424,9 +434,11 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
protected final ValueProvider<ResourceId> tempDirectory;
/** Whether windowed writes are being used. */
- protected boolean windowedWrites;
+ @Experimental(Kind.FILESYSTEM)
+ protected boolean windowedWrites;
/** Constructs a temporary file resource given the temporary directory and a filename. */
+ @Experimental(Kind.FILESYSTEM)
protected static ResourceId buildTemporaryFilename(ResourceId tempDirectory, String filename)
throws IOException {
return tempDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
@@ -472,6 +484,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* @param sink the FileBasedSink that will be used to configure this write operation.
* @param tempDirectory the base directory to be used for temporary output files.
*/
+ @Experimental(Kind.FILESYSTEM)
public WriteOperation(FileBasedSink<T> sink, ResourceId tempDirectory) {
this(sink, StaticValueProvider.of(tempDirectory));
}
@@ -527,6 +540,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
removeTemporaryFiles(outputFilenames.keySet(), !windowedWrites);
}
+ @Experimental(Kind.FILESYSTEM)
protected final Map<ResourceId, ResourceId> buildOutputFilenames(
Iterable<FileResult> writerResults) {
int numShards = Iterables.size(writerResults);
@@ -610,6 +624,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* @param filenames the filenames of temporary files.
*/
@VisibleForTesting
+ @Experimental(Kind.FILESYSTEM)
final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames)
throws IOException {
int numFiles = filenames.size();
@@ -637,6 +652,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
* temporary files, this method will remove them.
*/
@VisibleForTesting
+ @Experimental(Kind.FILESYSTEM)
final void removeTemporaryFiles(
Set<ResourceId> knownFiles, boolean shouldRemoveTemporaryDirectory) throws IOException {
ResourceId tempDir = tempDirectory.get();
@@ -945,6 +961,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
private final BoundedWindow window;
private final PaneInfo paneInfo;
+ @Experimental(Kind.FILESYSTEM)
public FileResult(ResourceId tempFilename, int shard, BoundedWindow window, PaneInfo paneInfo) {
this.tempFilename = tempFilename;
this.shard = shard;
@@ -952,6 +969,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
this.paneInfo = paneInfo;
}
+ @Experimental(Kind.FILESYSTEM)
public ResourceId getTempFilename() {
return tempFilename;
}
@@ -972,6 +990,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
return paneInfo;
}
+ @Experimental(Kind.FILESYSTEM)
public ResourceId getDestinationFile(FilenamePolicy policy, ResourceId outputDirectory,
int numShards, String extension) {
checkArgument(getShard() != UNKNOWN_SHARDNUM);
http://git-wip-us.apache.org/repos/asf/beam/blob/ec956c85/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
index 375264a..601feca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
@@ -23,6 +23,8 @@ import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
@@ -35,6 +37,7 @@ import org.apache.beam.sdk.io.fs.ResourceId;
* <p>All methods are protected, and they are for file system providers to implement.
* Clients should use {@link FileSystems} utility.
*/
+@Experimental(Kind.FILESYSTEM)
public abstract class FileSystem<ResourceIdT extends ResourceId> {
/**
* This is the entry point to convert user-provided specs to {@link ResourceIdT ResourceIds}.
http://git-wip-us.apache.org/repos/asf/beam/blob/ec956c85/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
index 78a91f6..50ee6eb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io;
import com.google.auto.service.AutoService;
import java.util.ServiceLoader;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.PipelineOptions;
/**
@@ -31,6 +33,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
* <p>It is optional but recommended to use one of the many build time tools such as
* {@link AutoService} to generate the necessary META-INF files automatically.
*/
+@Experimental(Kind.FILESYSTEM)
public interface FileSystemRegistrar {
/**
* Create zero or more {@link FileSystem filesystems} from the given {@link PipelineOptions}.
http://git-wip-us.apache.org/repos/asf/beam/blob/ec956c85/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index 4341fab..cfb63c0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -49,6 +49,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
@@ -63,6 +65,7 @@ import org.apache.beam.sdk.values.KV;
/**
* Clients facing {@link FileSystem} utility.
*/
+@Experimental(Kind.FILESYSTEM)
public class FileSystems {
public static final String DEFAULT_SCHEME = "default";
http://git-wip-us.apache.org/repos/asf/beam/blob/ec956c85/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
index f182360..7896e20 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystemRegistrar.java
@@ -20,12 +20,15 @@ package org.apache.beam.sdk.io;
import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableList;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.PipelineOptions;
/**
* {@link AutoService} registrar for the {@link LocalFileSystem}.
*/
@AutoService(FileSystemRegistrar.class)
+@Experimental(Kind.FILESYSTEM)
public class LocalFileSystemRegistrar implements FileSystemRegistrar {
@Override
public Iterable<FileSystem> fromOptions(@Nullable PipelineOptions options) {
http://git-wip-us.apache.org/repos/asf/beam/blob/ec956c85/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java
index 817829b..d234bae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalResources.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -28,6 +30,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
/**
* Helper functions for producing a {@link ResourceId} that references a local file or directory.
*/
+@Experimental(Kind.FILESYSTEM)
public final class LocalResources {
public static ResourceId fromFile(File file, boolean isDirectory) {
http://git-wip-us.apache.org/repos/asf/beam/blob/ec956c85/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index f73d6f3..c274595 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -31,6 +31,8 @@ import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
@@ -277,6 +279,7 @@ public class TFRecordIO {
*
* <p>For more information on filenames, see {@link DefaultFilenamePolicy}.
*/
+ @Experimental(Kind.FILESYSTEM)
public Write to(ResourceId outputResource) {
return toResource(StaticValueProvider.of(outputResource));
}
@@ -284,6 +287,7 @@ public class TFRecordIO {
/**
* Like {@link #to(ResourceId)}.
*/
+ @Experimental(Kind.FILESYSTEM)
public Write toResource(ValueProvider<ResourceId> outputResource) {
return toBuilder().setOutputPrefix(outputResource).build();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ec956c85/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index af6a069..5c068ce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
@@ -306,6 +308,7 @@ public class TextIO {
* in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should
* not be set.
*/
+ @Experimental(Kind.FILESYSTEM)
public Write to(ResourceId filenamePrefix) {
return toResource(StaticValueProvider.of(filenamePrefix));
}
@@ -326,6 +329,7 @@ public class TextIO {
/**
* Like {@link #to(ResourceId)}.
*/
+ @Experimental(Kind.FILESYSTEM)
public Write toResource(ValueProvider<ResourceId> filenamePrefix) {
return toBuilder().setFilenamePrefix(filenamePrefix).build();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ec956c85/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
index dfe771f..9196034 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceId.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.io.fs;
import java.io.Serializable;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
@@ -45,6 +47,7 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
* to generate {@link ResourceId ResourceIds} for resources that may not yet exist.
* </ul>
*/
+@Experimental(Kind.FILESYSTEM)
public interface ResourceId extends Serializable {
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/ec956c85/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
index 9f5980a..f954b33 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
@@ -22,6 +22,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableList;
import javax.annotation.Nonnull;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.FileSystemRegistrar;
@@ -31,6 +33,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
* {@link AutoService} registrar for the {@link GcsFileSystem}.
*/
@AutoService(FileSystemRegistrar.class)
+@Experimental(Kind.FILESYSTEM)
public class GcsFileSystemRegistrar implements FileSystemRegistrar {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/ec956c85/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
index 45f43e2..f5183d5 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
@@ -25,6 +25,8 @@ import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
@@ -38,6 +40,7 @@ import org.slf4j.LoggerFactory;
* {@link PipelineOptions} which encapsulate {@link Configuration Hadoop Configuration}
* for the {@link HadoopFileSystem}.
*/
+@Experimental(Kind.FILESYSTEM)
public interface HadoopFileSystemOptions extends PipelineOptions {
@Description("A list of Hadoop configurations used to configure zero or more Hadoop filesystems. "
+ "By default, Hadoop configuration is loaded from 'core-site.xml' and 'hdfs-site.xml' "
http://git-wip-us.apache.org/repos/asf/beam/blob/ec956c85/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
index 9159df3..8c57089 100644
--- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
@@ -25,6 +25,8 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.io.FileSystem;
import org.apache.beam.sdk.io.FileSystemRegistrar;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -34,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
* {@link AutoService} registrar for the {@link HadoopFileSystem}.
*/
@AutoService(FileSystemRegistrar.class)
+@Experimental(Kind.FILESYSTEM)
public class HadoopFileSystemRegistrar implements FileSystemRegistrar {
@Override