You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/04 19:53:27 UTC
[02/50] [abbrv] beam git commit: [BEAM-2135] Move hdfs to
hadoop-file-system
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
deleted file mode 100644
index aee73c4..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import java.io.IOException;
-import java.net.URI;
-import java.security.PrivilegedExceptionAction;
-import java.util.Random;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-
-/**
- * A {@link Sink} for writing records to a Hadoop filesystem using a Hadoop file-based
- * output
- * format.
- *
- * <p>To write a {@link org.apache.beam.sdk.values.PCollection} of elements of type T to Hadoop
- * filesystem use {@link HDFSFileSink#to}, specify the path (this can be any Hadoop supported
- * filesystem: HDFS, S3, GCS etc), the Hadoop {@link FileOutputFormat}, the key class K and the
- * value class V and finally the {@link SerializableFunction} to map from T to {@link KV} of K
- * and V.
- *
- * <p>{@code HDFSFileSink} can be used by {@link Write} to create write
- * transform. See example below.
- *
- * <p>{@code HDFSFileSink} comes with helper methods to write text and Apache Avro. For example:
- *
- * <pre>
- * {@code
- * HDFSFileSink<CustomSpecificAvroClass, AvroKey<CustomSpecificAvroClass>, NullWritable> sink =
- * HDFSFileSink.toAvro(path, AvroCoder.of(CustomSpecificAvroClass.class));
- * avroRecordsPCollection.apply(Write.to(sink));
- * }
- * </pre>
- *
- * @param <T> the type of elements of the input {@link org.apache.beam.sdk.values.PCollection}.
- * @param <K> the type of keys to be written to the sink via {@link FileOutputFormat}.
- * @param <V> the type of values to be written to the sink via {@link FileOutputFormat}.
- */
-@AutoValue
-@Experimental
-public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
-
- private static final JobID jobId = new JobID(
- Long.toString(System.currentTimeMillis()),
- new Random().nextInt(Integer.MAX_VALUE));
-
- public abstract String path();
- public abstract Class<? extends FileOutputFormat<K, V>> formatClass();
- public abstract Class<K> keyClass();
- public abstract Class<V> valueClass();
- public abstract SerializableFunction<T, KV<K, V>> outputConverter();
- public abstract SerializableConfiguration serializableConfiguration();
- public @Nullable abstract String username();
- public abstract boolean validate();
-
- // =======================================================================
- // Factory methods
- // =======================================================================
-
- public static <T, K, V, W extends FileOutputFormat<K, V>> HDFSFileSink<T, K, V>
- to(String path,
- Class<W> formatClass,
- Class<K> keyClass,
- Class<V> valueClass,
- SerializableFunction<T, KV<K, V>> outputConverter) {
- return HDFSFileSink.<T, K, V>builder()
- .setPath(path)
- .setFormatClass(formatClass)
- .setKeyClass(keyClass)
- .setValueClass(valueClass)
- .setOutputConverter(outputConverter)
- .setConfiguration(null)
- .setUsername(null)
- .setValidate(true)
- .build();
- }
-
- public static <T> HDFSFileSink<T, NullWritable, Text> toText(String path) {
- SerializableFunction<T, KV<NullWritable, Text>> outputConverter =
- new SerializableFunction<T, KV<NullWritable, Text>>() {
- @Override
- public KV<NullWritable, Text> apply(T input) {
- return KV.of(NullWritable.get(), new Text(input.toString()));
- }
- };
- return to(path, TextOutputFormat.class, NullWritable.class, Text.class, outputConverter);
- }
-
- /**
- * Helper to create Avro sink given {@link AvroCoder}. Keep in mind that configuration
- * object is altered to enable Avro output.
- */
- public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String path,
- final AvroCoder<T> coder,
- Configuration conf) {
- SerializableFunction<T, KV<AvroKey<T>, NullWritable>> outputConverter =
- new SerializableFunction<T, KV<AvroKey<T>, NullWritable>>() {
- @Override
- public KV<AvroKey<T>, NullWritable> apply(T input) {
- return KV.of(new AvroKey<>(input), NullWritable.get());
- }
- };
- conf.set("avro.schema.output.key", coder.getSchema().toString());
- return to(
- path,
- AvroKeyOutputFormat.class,
- (Class<AvroKey<T>>) (Class<?>) AvroKey.class,
- NullWritable.class,
- outputConverter).withConfiguration(conf);
- }
-
- /**
- * Helper to create Avro sink given {@link Schema}. Keep in mind that configuration
- * object is altered to enable Avro output.
- */
- public static HDFSFileSink<GenericRecord, AvroKey<GenericRecord>, NullWritable>
- toAvro(String path, Schema schema, Configuration conf) {
- return toAvro(path, AvroCoder.of(schema), conf);
- }
-
- /**
- * Helper to create Avro sink given {@link Class}. Keep in mind that configuration
- * object is altered to enable Avro output.
- */
- public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String path,
- Class<T> cls,
- Configuration conf) {
- return toAvro(path, AvroCoder.of(cls), conf);
- }
-
- // =======================================================================
- // Builder methods
- // =======================================================================
-
- public abstract Builder<T, K, V> toBuilder();
- public static <T, K, V> Builder builder() {
- return new AutoValue_HDFSFileSink.Builder<>();
- }
-
- /**
- * AutoValue builder for {@link HDFSFileSink}.
- */
- @AutoValue.Builder
- public abstract static class Builder<T, K, V> {
- public abstract Builder<T, K, V> setPath(String path);
- public abstract Builder<T, K, V> setFormatClass(
- Class<? extends FileOutputFormat<K, V>> formatClass);
- public abstract Builder<T, K, V> setKeyClass(Class<K> keyClass);
- public abstract Builder<T, K, V> setValueClass(Class<V> valueClass);
- public abstract Builder<T, K, V> setOutputConverter(
- SerializableFunction<T, KV<K, V>> outputConverter);
- public abstract Builder<T, K, V> setSerializableConfiguration(
- SerializableConfiguration serializableConfiguration);
- public Builder<T, K, V> setConfiguration(@Nullable Configuration configuration) {
- if (configuration == null) {
- configuration = new Configuration(false);
- }
- return this.setSerializableConfiguration(new SerializableConfiguration(configuration));
- }
- public abstract Builder<T, K, V> setUsername(String username);
- public abstract Builder<T, K, V> setValidate(boolean validate);
- public abstract HDFSFileSink<T, K, V> build();
- }
-
- public HDFSFileSink<T, K, V> withConfiguration(@Nullable Configuration configuration) {
- return this.toBuilder().setConfiguration(configuration).build();
- }
-
- public HDFSFileSink<T, K, V> withUsername(@Nullable String username) {
- return this.toBuilder().setUsername(username).build();
- }
-
- // =======================================================================
- // Sink
- // =======================================================================
-
- @Override
- public void validate(PipelineOptions options) {
- if (validate()) {
- try {
- UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- FileSystem fs = FileSystem.get(new URI(path()),
- SerializableConfiguration.newConfiguration(serializableConfiguration()));
- checkState(!fs.exists(new Path(path())), "Output path %s already exists", path());
- return null;
- }
- });
- } catch (IOException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- @Override
- public Sink.WriteOperation<T, String> createWriteOperation() {
- return new HDFSWriteOperation<>(this, path(), formatClass());
- }
-
- private Job newJob() throws IOException {
- Job job = SerializableConfiguration.newJob(serializableConfiguration());
- job.setJobID(jobId);
- job.setOutputKeyClass(keyClass());
- job.setOutputValueClass(valueClass());
- return job;
- }
-
- // =======================================================================
- // WriteOperation
- // =======================================================================
-
- /** {{@link WriteOperation}} for HDFS. */
- private static class HDFSWriteOperation<T, K, V> extends WriteOperation<T, String> {
-
- private final HDFSFileSink<T, K, V> sink;
- private final String path;
- private final Class<? extends FileOutputFormat<K, V>> formatClass;
-
- HDFSWriteOperation(HDFSFileSink<T, K, V> sink,
- String path,
- Class<? extends FileOutputFormat<K, V>> formatClass) {
- this.sink = sink;
- this.path = path;
- this.formatClass = formatClass;
- }
-
- @Override
- public void initialize(PipelineOptions options) throws Exception {
- Job job = sink.newJob();
- FileOutputFormat.setOutputPath(job, new Path(path));
- }
-
- @Override
- public void setWindowedWrites(boolean windowedWrites) {
- }
-
- @Override
- public void finalize(final Iterable<String> writerResults, PipelineOptions options)
- throws Exception {
- UGIHelper.getBestUGI(sink.username()).doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- doFinalize(writerResults);
- return null;
- }
- });
- }
-
- private void doFinalize(Iterable<String> writerResults) throws Exception {
- Job job = sink.newJob();
- FileSystem fs = FileSystem.get(new URI(path), job.getConfiguration());
- // If there are 0 output shards, just create output folder.
- if (!writerResults.iterator().hasNext()) {
- fs.mkdirs(new Path(path));
- return;
- }
-
- // job successful
- JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
- FileOutputCommitter outputCommitter = new FileOutputCommitter(new Path(path), context);
- outputCommitter.commitJob(context);
-
- // get actual output shards
- Set<String> actual = Sets.newHashSet();
- FileStatus[] statuses = fs.listStatus(new Path(path), new PathFilter() {
- @Override
- public boolean accept(Path path) {
- String name = path.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- });
-
- // get expected output shards
- Set<String> expected = Sets.newHashSet(writerResults);
- checkState(
- expected.size() == Lists.newArrayList(writerResults).size(),
- "Data loss due to writer results hash collision");
- for (FileStatus s : statuses) {
- String name = s.getPath().getName();
- int pos = name.indexOf('.');
- actual.add(pos > 0 ? name.substring(0, pos) : name);
- }
-
- checkState(actual.equals(expected), "Writer results and output files do not match");
-
- // rename output shards to Hadoop style, i.e. part-r-00000.txt
- int i = 0;
- for (FileStatus s : statuses) {
- String name = s.getPath().getName();
- int pos = name.indexOf('.');
- String ext = pos > 0 ? name.substring(pos) : "";
- fs.rename(
- s.getPath(),
- new Path(s.getPath().getParent(), String.format("part-r-%05d%s", i, ext)));
- i++;
- }
- }
-
- @Override
- public Writer<T, String> createWriter(PipelineOptions options) throws Exception {
- return new HDFSWriter<>(this, path, formatClass);
- }
-
- @Override
- public Sink<T> getSink() {
- return sink;
- }
-
- @Override
- public Coder<String> getWriterResultCoder() {
- return StringUtf8Coder.of();
- }
-
- }
-
- // =======================================================================
- // Writer
- // =======================================================================
-
- private static class HDFSWriter<T, K, V> extends Writer<T, String> {
-
- private final HDFSWriteOperation<T, K, V> writeOperation;
- private final String path;
- private final Class<? extends FileOutputFormat<K, V>> formatClass;
-
- // unique hash for each task
- private int hash;
-
- private TaskAttemptContext context;
- private RecordWriter<K, V> recordWriter;
- private FileOutputCommitter outputCommitter;
-
- HDFSWriter(HDFSWriteOperation<T, K, V> writeOperation,
- String path,
- Class<? extends FileOutputFormat<K, V>> formatClass) {
- this.writeOperation = writeOperation;
- this.path = path;
- this.formatClass = formatClass;
- }
-
- @Override
- public void openWindowed(final String uId,
- BoundedWindow window,
- PaneInfo paneInfo,
- int shard,
- int numShards) throws Exception {
- throw new UnsupportedOperationException("Windowing support not implemented yet for"
- + "HDFS. Window " + window);
- }
-
- @Override
- public void openUnwindowed(final String uId, int shard, int numShards) throws Exception {
- UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
- new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- doOpen(uId);
- return null;
- }
- }
- );
- }
-
- private void doOpen(String uId) throws Exception {
- this.hash = uId.hashCode();
-
- Job job = writeOperation.sink.newJob();
- FileOutputFormat.setOutputPath(job, new Path(path));
-
- // Each Writer is responsible for writing one bundle of elements and is represented by one
- // unique Hadoop task based on uId/hash. All tasks share the same job ID.
- JobID jobId = job.getJobID();
- TaskID taskId = new TaskID(jobId, TaskType.REDUCE, hash);
- context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID(taskId, 0));
-
- FileOutputFormat<K, V> outputFormat = formatClass.newInstance();
- recordWriter = outputFormat.getRecordWriter(context);
- outputCommitter = (FileOutputCommitter) outputFormat.getOutputCommitter(context);
- }
-
- @Override
- public void write(T value) throws Exception {
- checkNotNull(recordWriter,
- "Record writer can't be null. Make sure to open Writer first!");
- KV<K, V> kv = writeOperation.sink.outputConverter().apply(value);
- recordWriter.write(kv.getKey(), kv.getValue());
- }
-
- @Override
- public void cleanup() throws Exception {
-
- }
-
- @Override
- public String close() throws Exception {
- return UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
- new PrivilegedExceptionAction<String>() {
- @Override
- public String run() throws Exception {
- return doClose();
- }
- });
- }
-
- private String doClose() throws Exception {
- // task/attempt successful
- recordWriter.close(context);
- outputCommitter.commitTask(context);
-
- // result is prefix of the output file name
- return String.format("part-r-%d", hash);
- }
-
- @Override
- public WriteOperation<T, String> getWriteOperation() {
- return writeOperation;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
deleted file mode 100644
index 5cc2097..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
+++ /dev/null
@@ -1,625 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.security.PrivilegedExceptionAction;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
-import org.apache.beam.sdk.io.hadoop.WritableCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@code BoundedSource} for reading files resident in a Hadoop filesystem using a
- * Hadoop file-based input format.
- *
- * <p>To read a {@link org.apache.beam.sdk.values.PCollection} of
- * {@link org.apache.beam.sdk.values.KV} key-value pairs from one or more
- * Hadoop files, use {@link HDFSFileSource#from} to specify the path(s) of the files to
- * read, the Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, the
- * key class and the value class.
- *
- * <p>A {@code HDFSFileSource} can be read from using the
- * {@link org.apache.beam.sdk.io.Read} transform. For example:
- *
- * <pre>
- * {@code
- * HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class,
- * MyKey.class, MyValue.class);
- * PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource));
- * }
- * </pre>
- *
- * <p>Implementation note: Since Hadoop's
- * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}
- * determines the input splits, this class extends {@link BoundedSource} rather than
- * {@link org.apache.beam.sdk.io.OffsetBasedSource}, since the latter
- * dictates input splits.
- * @param <T> the type of elements of the result {@link org.apache.beam.sdk.values.PCollection}.
- * @param <K> the type of keys to be read from the source via {@link FileInputFormat}.
- * @param <V> the type of values to be read from the source via {@link FileInputFormat}.
- */
-@AutoValue
-@Experimental
-public abstract class HDFSFileSource<T, K, V> extends BoundedSource<T> {
- private static final long serialVersionUID = 0L;
-
- private static final Logger LOG = LoggerFactory.getLogger(HDFSFileSource.class);
-
- public abstract String filepattern();
- public abstract Class<? extends FileInputFormat<K, V>> formatClass();
- public abstract Coder<T> coder();
- public abstract SerializableFunction<KV<K, V>, T> inputConverter();
- public abstract SerializableConfiguration serializableConfiguration();
- public @Nullable abstract SerializableSplit serializableSplit();
- public @Nullable abstract String username();
- public abstract boolean validateSource();
-
- // =======================================================================
- // Factory methods
- // =======================================================================
-
- public static <T, K, V, W extends FileInputFormat<K, V>> HDFSFileSource<T, K, V>
- from(String filepattern,
- Class<W> formatClass,
- Coder<T> coder,
- SerializableFunction<KV<K, V>, T> inputConverter) {
- return HDFSFileSource.<T, K, V>builder()
- .setFilepattern(filepattern)
- .setFormatClass(formatClass)
- .setCoder(coder)
- .setInputConverter(inputConverter)
- .setConfiguration(null)
- .setUsername(null)
- .setValidateSource(true)
- .setSerializableSplit(null)
- .build();
- }
-
- public static <K, V, W extends FileInputFormat<K, V>> HDFSFileSource<KV<K, V>, K, V>
- from(String filepattern,
- Class<W> formatClass,
- Class<K> keyClass,
- Class<V> valueClass) {
- KvCoder<K, V> coder = KvCoder.of(getDefaultCoder(keyClass), getDefaultCoder(valueClass));
- SerializableFunction<KV<K, V>, KV<K, V>> inputConverter =
- new SerializableFunction<KV<K, V>, KV<K, V>>() {
- @Override
- public KV<K, V> apply(KV<K, V> input) {
- return input;
- }
- };
- return HDFSFileSource.<KV<K, V>, K, V>builder()
- .setFilepattern(filepattern)
- .setFormatClass(formatClass)
- .setCoder(coder)
- .setInputConverter(inputConverter)
- .setConfiguration(null)
- .setUsername(null)
- .setValidateSource(true)
- .setSerializableSplit(null)
- .build();
- }
-
- public static HDFSFileSource<String, LongWritable, Text>
- fromText(String filepattern) {
- SerializableFunction<KV<LongWritable, Text>, String> inputConverter =
- new SerializableFunction<KV<LongWritable, Text>, String>() {
- @Override
- public String apply(KV<LongWritable, Text> input) {
- return input.getValue().toString();
- }
- };
- return from(filepattern, TextInputFormat.class, StringUtf8Coder.of(), inputConverter);
- }
-
- /**
- * Helper to read from Avro source given {@link AvroCoder}. Keep in mind that configuration
- * object is altered to enable Avro input.
- */
- public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable>
- fromAvro(String filepattern, final AvroCoder<T> coder, Configuration conf) {
- Class<AvroKeyInputFormat<T>> formatClass = castClass(AvroKeyInputFormat.class);
- SerializableFunction<KV<AvroKey<T>, NullWritable>, T> inputConverter =
- new SerializableFunction<KV<AvroKey<T>, NullWritable>, T>() {
- @Override
- public T apply(KV<AvroKey<T>, NullWritable> input) {
- try {
- return CoderUtils.clone(coder, input.getKey().datum());
- } catch (CoderException e) {
- throw new RuntimeException(e);
- }
- }
- };
- conf.set("avro.schema.input.key", coder.getSchema().toString());
- return from(filepattern, formatClass, coder, inputConverter).withConfiguration(conf);
- }
-
- /**
- * Helper to read from Avro source given {@link Schema}. Keep in mind that configuration
- * object is altered to enable Avro input.
- */
- public static HDFSFileSource<GenericRecord, AvroKey<GenericRecord>, NullWritable>
- fromAvro(String filepattern, Schema schema, Configuration conf) {
- return fromAvro(filepattern, AvroCoder.of(schema), conf);
- }
-
- /**
- * Helper to read from Avro source given {@link Class}. Keep in mind that configuration
- * object is altered to enable Avro input.
- */
- public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable>
- fromAvro(String filepattern, Class<T> cls, Configuration conf) {
- return fromAvro(filepattern, AvroCoder.of(cls), conf);
- }
-
- // =======================================================================
- // Builder methods
- // =======================================================================
-
- public abstract HDFSFileSource.Builder<T, K, V> toBuilder();
- public static <T, K, V> HDFSFileSource.Builder builder() {
- return new AutoValue_HDFSFileSource.Builder<>();
- }
-
- /**
- * AutoValue builder for {@link HDFSFileSource}.
- */
- @AutoValue.Builder
- public abstract static class Builder<T, K, V> {
- public abstract Builder<T, K, V> setFilepattern(String filepattern);
- public abstract Builder<T, K, V> setFormatClass(
- Class<? extends FileInputFormat<K, V>> formatClass);
- public abstract Builder<T, K, V> setCoder(Coder<T> coder);
- public abstract Builder<T, K, V> setInputConverter(
- SerializableFunction<KV<K, V>, T> inputConverter);
- public abstract Builder<T, K, V> setSerializableConfiguration(
- SerializableConfiguration serializableConfiguration);
- public Builder<T, K, V> setConfiguration(Configuration configuration) {
- if (configuration == null) {
- configuration = new Configuration(false);
- }
- return this.setSerializableConfiguration(new SerializableConfiguration(configuration));
- }
- public abstract Builder<T, K, V> setSerializableSplit(SerializableSplit serializableSplit);
- public abstract Builder<T, K, V> setUsername(@Nullable String username);
- public abstract Builder<T, K, V> setValidateSource(boolean validate);
- public abstract HDFSFileSource<T, K, V> build();
- }
-
- public HDFSFileSource<T, K, V> withConfiguration(@Nullable Configuration configuration) {
- return this.toBuilder().setConfiguration(configuration).build();
- }
-
- public HDFSFileSource<T, K, V> withUsername(@Nullable String username) {
- return this.toBuilder().setUsername(username).build();
- }
-
- // =======================================================================
- // BoundedSource
- // =======================================================================
-
- @Override
- public List<? extends BoundedSource<T>> split(
- final long desiredBundleSizeBytes,
- PipelineOptions options) throws Exception {
- if (serializableSplit() == null) {
- List<InputSplit> inputSplits = UGIHelper.getBestUGI(username()).doAs(
- new PrivilegedExceptionAction<List<InputSplit>>() {
- @Override
- public List<InputSplit> run() throws Exception {
- return computeSplits(desiredBundleSizeBytes, serializableConfiguration());
- }
- });
- return Lists.transform(inputSplits,
- new Function<InputSplit, BoundedSource<T>>() {
- @Override
- public BoundedSource<T> apply(@Nullable InputSplit inputSplit) {
- SerializableSplit serializableSplit = new SerializableSplit(inputSplit);
- return HDFSFileSource.this.toBuilder()
- .setSerializableSplit(serializableSplit)
- .build();
- }
- });
- } else {
- return ImmutableList.of(this);
- }
- }
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) {
- long size = 0;
-
- try {
- // If this source represents a split from split,
- // then return the size of the split, rather then the entire input
- if (serializableSplit() != null) {
- return serializableSplit().getSplit().getLength();
- }
-
- size += UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Long>() {
- @Override
- public Long run() throws Exception {
- long size = 0;
- Job job = SerializableConfiguration.newJob(serializableConfiguration());
- for (FileStatus st : listStatus(createFormat(job), job)) {
- size += st.getLen();
- }
- return size;
- }
- });
- } catch (IOException e) {
- LOG.warn(
- "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e);
- // ignore, and return 0
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn(
- "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e);
- // ignore, and return 0
- }
- return size;
- }
-
- @Override
- public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
- this.validate();
- return new HDFSFileReader<>(this, filepattern(), formatClass(), serializableSplit());
- }
-
- @Override
- public void validate() {
- if (validateSource()) {
- try {
- UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- final Path pathPattern = new Path(filepattern());
- FileSystem fs = FileSystem.get(pathPattern.toUri(),
- SerializableConfiguration.newConfiguration(serializableConfiguration()));
- FileStatus[] fileStatuses = fs.globStatus(pathPattern);
- checkState(
- fileStatuses != null && fileStatuses.length > 0,
- "Unable to find any files matching %s", filepattern());
- return null;
- }
- });
- } catch (IOException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- @Override
- public Coder<T> getDefaultOutputCoder() {
- return coder();
- }
-
- // =======================================================================
- // Helpers
- // =======================================================================
-
- private List<InputSplit> computeSplits(long desiredBundleSizeBytes,
- SerializableConfiguration serializableConfiguration)
- throws IOException, IllegalAccessException, InstantiationException {
- Job job = SerializableConfiguration.newJob(serializableConfiguration);
- FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes);
- FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes);
- return createFormat(job).getSplits(job);
- }
-
- private FileInputFormat<K, V> createFormat(Job job)
- throws IOException, IllegalAccessException, InstantiationException {
- Path path = new Path(filepattern());
- FileInputFormat.addInputPath(job, path);
- return formatClass().newInstance();
- }
-
- private List<FileStatus> listStatus(FileInputFormat<K, V> format, Job job)
- throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
- // FileInputFormat#listStatus is protected, so call using reflection
- Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", JobContext.class);
- listStatus.setAccessible(true);
- @SuppressWarnings("unchecked")
- List<FileStatus> stat = (List<FileStatus>) listStatus.invoke(format, job);
- return stat;
- }
-
- @SuppressWarnings("unchecked")
- private static <T> Coder<T> getDefaultCoder(Class<T> c) {
- if (Writable.class.isAssignableFrom(c)) {
- Class<? extends Writable> writableClass = (Class<? extends Writable>) c;
- return (Coder<T>) WritableCoder.of(writableClass);
- } else if (Void.class.equals(c)) {
- return (Coder<T>) VoidCoder.of();
- }
- // TODO: how to use registered coders here?
- throw new IllegalStateException("Cannot find coder for " + c);
- }
-
- @SuppressWarnings("unchecked")
- private static <T> Class<T> castClass(Class<?> aClass) {
- return (Class<T>) aClass;
- }
-
- // =======================================================================
- // BoundedReader
- // =======================================================================
-
- private static class HDFSFileReader<T, K, V> extends BoundedSource.BoundedReader<T> {
-
- private final HDFSFileSource<T, K, V> source;
- private final String filepattern;
- private final Class<? extends FileInputFormat<K, V>> formatClass;
- private final Job job;
-
- private List<InputSplit> splits;
- private ListIterator<InputSplit> splitsIterator;
-
- private Configuration conf;
- private FileInputFormat<?, ?> format;
- private TaskAttemptContext attemptContext;
- private RecordReader<K, V> currentReader;
- private KV<K, V> currentPair;
-
- HDFSFileReader(
- HDFSFileSource<T, K, V> source,
- String filepattern,
- Class<? extends FileInputFormat<K, V>> formatClass,
- SerializableSplit serializableSplit)
- throws IOException {
- this.source = source;
- this.filepattern = filepattern;
- this.formatClass = formatClass;
- this.job = SerializableConfiguration.newJob(source.serializableConfiguration());
-
- if (serializableSplit != null) {
- this.splits = ImmutableList.of(serializableSplit.getSplit());
- this.splitsIterator = splits.listIterator();
- }
- }
-
- @Override
- public boolean start() throws IOException {
- Path path = new Path(filepattern);
- FileInputFormat.addInputPath(job, path);
-
- conf = job.getConfiguration();
- try {
- format = formatClass.newInstance();
- } catch (InstantiationException | IllegalAccessException e) {
- throw new IOException("Cannot instantiate file input format " + formatClass, e);
- }
- attemptContext = new TaskAttemptContextImpl(conf, new TaskAttemptID());
-
- if (splitsIterator == null) {
- splits = format.getSplits(job);
- splitsIterator = splits.listIterator();
- }
-
- return advance();
- }
-
- @Override
- public boolean advance() throws IOException {
- try {
- if (currentReader != null && currentReader.nextKeyValue()) {
- currentPair = nextPair();
- return true;
- } else {
- while (splitsIterator.hasNext()) {
- // advance the reader and see if it has records
- final InputSplit nextSplit = splitsIterator.next();
- @SuppressWarnings("unchecked")
- RecordReader<K, V> reader =
- (RecordReader<K, V>) format.createRecordReader(nextSplit, attemptContext);
- if (currentReader != null) {
- currentReader.close();
- }
- currentReader = reader;
- UGIHelper.getBestUGI(source.username()).doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- currentReader.initialize(nextSplit, attemptContext);
- return null;
- }
- });
- if (currentReader.nextKeyValue()) {
- currentPair = nextPair();
- return true;
- }
- currentReader.close();
- currentReader = null;
- }
- // either no next split or all readers were empty
- currentPair = null;
- return false;
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- }
- }
-
- @Override
- public T getCurrent() throws NoSuchElementException {
- if (currentPair == null) {
- throw new NoSuchElementException();
- }
- return source.inputConverter().apply(currentPair);
- }
-
- @Override
- public void close() throws IOException {
- if (currentReader != null) {
- currentReader.close();
- currentReader = null;
- }
- currentPair = null;
- }
-
- @Override
- public BoundedSource<T> getCurrentSource() {
- return source;
- }
-
- @SuppressWarnings("unchecked")
- private KV<K, V> nextPair() throws IOException, InterruptedException {
- K key = currentReader.getCurrentKey();
- V value = currentReader.getCurrentValue();
- // clone Writable objects since they are reused between calls to RecordReader#nextKeyValue
- if (key instanceof Writable) {
- key = (K) WritableUtils.clone((Writable) key, conf);
- }
- if (value instanceof Writable) {
- value = (V) WritableUtils.clone((Writable) value, conf);
- }
- return KV.of(key, value);
- }
-
- // =======================================================================
- // Optional overrides
- // =======================================================================
-
- @Override
- public Double getFractionConsumed() {
- if (currentReader == null) {
- return 0.0;
- }
- if (splits.isEmpty()) {
- return 1.0;
- }
- int index = splitsIterator.previousIndex();
- int numReaders = splits.size();
- if (index == numReaders) {
- return 1.0;
- }
- double before = 1.0 * index / numReaders;
- double after = 1.0 * (index + 1) / numReaders;
- Double fractionOfCurrentReader = getProgress();
- if (fractionOfCurrentReader == null) {
- return before;
- }
- return before + fractionOfCurrentReader * (after - before);
- }
-
- private Double getProgress() {
- try {
- return (double) currentReader.getProgress();
- } catch (IOException | InterruptedException e) {
- return null;
- }
- }
-
- }
-
- // =======================================================================
- // SerializableSplit
- // =======================================================================
-
- /**
- * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit}s to be
- * serialized using Java's standard serialization mechanisms. Note that the InputSplit
- * has to be Writable (which most are).
- */
- protected static class SerializableSplit implements Externalizable {
- private static final long serialVersionUID = 0L;
-
- private InputSplit split;
-
- public SerializableSplit() {
- }
-
- public SerializableSplit(InputSplit split) {
- checkArgument(split instanceof Writable, "Split is not writable: %s", split);
- this.split = split;
- }
-
- public InputSplit getSplit() {
- return split;
- }
-
- @Override
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeUTF(split.getClass().getCanonicalName());
- ((Writable) split).write(out);
- }
-
- @Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- String className = in.readUTF();
- try {
- split = (InputSplit) Class.forName(className).newInstance();
- ((Writable) split).readFields(in);
- } catch (InstantiationException | IllegalAccessException e) {
- throw new IOException(e);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
deleted file mode 100644
index 154a818..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import org.apache.beam.sdk.io.FileSystem;
-import org.apache.beam.sdk.io.fs.CreateOptions;
-import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.io.fs.MatchResult.Status;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Adapts {@link org.apache.hadoop.fs.FileSystem} connectors to be used as
- * Apache Beam {@link FileSystem FileSystems}.
- *
- * <p>The following HDFS FileSystem(s) are known to be unsupported:
- * <ul>
- * <li>FTPFileSystem: Missing seek support within FTPInputStream</li>
- * </ul>
- *
- * <p>This implementation assumes that the underlying Hadoop {@link FileSystem} is seek
- * efficient when reading. The source code for the following {@link FSInputStream} implementations
- * (as of Hadoop 2.7.1) do provide seek implementations:
- * <ul>
- * <li>HarFsInputStream</li>
- * <li>S3InputStream</li>
- * <li>DFSInputStream</li>
- * <li>SwiftNativeInputStream</li>
- * <li>NativeS3FsInputStream</li>
- * <li>LocalFSFileInputStream</li>
- * <li>NativeAzureFsInputStream</li>
- * <li>S3AInputStream</li>
- * </ul>
- */
-class HadoopFileSystem extends FileSystem<HadoopResourceId> {
- @VisibleForTesting
- final org.apache.hadoop.fs.FileSystem fileSystem;
-
- HadoopFileSystem(Configuration configuration) throws IOException {
- this.fileSystem = org.apache.hadoop.fs.FileSystem.newInstance(configuration);
- }
-
- @Override
- protected List<MatchResult> match(List<String> specs) {
- ImmutableList.Builder<MatchResult> resultsBuilder = ImmutableList.builder();
- for (String spec : specs) {
- try {
- FileStatus[] fileStatuses = fileSystem.globStatus(new Path(spec));
- List<Metadata> metadata = new ArrayList<>();
- for (FileStatus fileStatus : fileStatuses) {
- if (fileStatus.isFile()) {
- metadata.add(Metadata.builder()
- .setResourceId(new HadoopResourceId(fileStatus.getPath().toUri()))
- .setIsReadSeekEfficient(true)
- .setSizeBytes(fileStatus.getLen())
- .build());
- }
- }
- resultsBuilder.add(MatchResult.create(Status.OK, metadata));
- } catch (IOException e) {
- resultsBuilder.add(MatchResult.create(Status.ERROR, e));
- }
- }
- return resultsBuilder.build();
- }
-
- @Override
- protected WritableByteChannel create(HadoopResourceId resourceId, CreateOptions createOptions)
- throws IOException {
- return Channels.newChannel(fileSystem.create(resourceId.toPath()));
- }
-
- @Override
- protected ReadableByteChannel open(HadoopResourceId resourceId) throws IOException {
- FileStatus fileStatus = fileSystem.getFileStatus(resourceId.toPath());
- return new HadoopSeekableByteChannel(fileStatus, fileSystem.open(resourceId.toPath()));
- }
-
- @Override
- protected void copy(
- List<HadoopResourceId> srcResourceIds,
- List<HadoopResourceId> destResourceIds) throws IOException {
- for (int i = 0; i < srcResourceIds.size(); ++i) {
- // Unfortunately HDFS FileSystems don't support a native copy operation so we are forced
- // to use the inefficient implementation found in FileUtil which copies all the bytes through
- // the local machine.
- //
- // HDFS FileSystem does define a concat method but could only find the DFSFileSystem
- // implementing it. The DFSFileSystem implemented concat by deleting the srcs after which
- // is not what we want. Also, all the other FileSystem implementations I saw threw
- // UnsupportedOperationException within concat.
- FileUtil.copy(
- fileSystem, srcResourceIds.get(i).toPath(),
- fileSystem, destResourceIds.get(i).toPath(),
- false,
- true,
- fileSystem.getConf());
- }
- }
-
- @Override
- protected void rename(
- List<HadoopResourceId> srcResourceIds,
- List<HadoopResourceId> destResourceIds) throws IOException {
- for (int i = 0; i < srcResourceIds.size(); ++i) {
- fileSystem.rename(
- srcResourceIds.get(i).toPath(),
- destResourceIds.get(i).toPath());
- }
- }
-
- @Override
- protected void delete(Collection<HadoopResourceId> resourceIds) throws IOException {
- for (HadoopResourceId resourceId : resourceIds) {
- fileSystem.delete(resourceId.toPath(), false);
- }
- }
-
- @Override
- protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
- try {
- if (singleResourceSpec.endsWith("/") && !isDirectory) {
- throw new IllegalArgumentException(String.format(
- "Expected file path but received directory path %s", singleResourceSpec));
- }
- return !singleResourceSpec.endsWith("/") && isDirectory
- ? new HadoopResourceId(new URI(singleResourceSpec + "/"))
- : new HadoopResourceId(new URI(singleResourceSpec));
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(
- String.format("Invalid spec %s directory %s", singleResourceSpec, isDirectory),
- e);
- }
- }
-
- @Override
- protected String getScheme() {
- return fileSystem.getScheme();
- }
-
- /** An adapter around {@link FSDataInputStream} that implements {@link SeekableByteChannel}. */
- private static class HadoopSeekableByteChannel implements SeekableByteChannel {
- private final FileStatus fileStatus;
- private final FSDataInputStream inputStream;
- private boolean closed;
-
- private HadoopSeekableByteChannel(FileStatus fileStatus, FSDataInputStream inputStream) {
- this.fileStatus = fileStatus;
- this.inputStream = inputStream;
- this.closed = false;
- }
-
- @Override
- public int read(ByteBuffer dst) throws IOException {
- if (closed) {
- throw new IOException("Channel is closed");
- }
- return inputStream.read(dst);
- }
-
- @Override
- public int write(ByteBuffer src) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long position() throws IOException {
- if (closed) {
- throw new IOException("Channel is closed");
- }
- return inputStream.getPos();
- }
-
- @Override
- public SeekableByteChannel position(long newPosition) throws IOException {
- if (closed) {
- throw new IOException("Channel is closed");
- }
- inputStream.seek(newPosition);
- return this;
- }
-
- @Override
- public long size() throws IOException {
- if (closed) {
- throw new IOException("Channel is closed");
- }
- return fileStatus.getLen();
- }
-
- @Override
- public SeekableByteChannel truncate(long size) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isOpen() {
- return !closed;
- }
-
- @Override
- public void close() throws IOException {
- closed = true;
- inputStream.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java
deleted file mode 100644
index 2cb9d8a..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.auto.service.AutoService;
-import java.io.IOException;
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * A Jackson {@link Module} that registers a {@link JsonSerializer} and {@link JsonDeserializer}
- * for a Hadoop {@link Configuration}. The serialized representation is that of a JSON map.
- *
- * <p>Note that the serialization of the Hadoop {@link Configuration} only keeps the keys and their
- * values dropping any configuration hierarchy and source information.
- */
-@AutoService(Module.class)
-public class HadoopFileSystemModule extends SimpleModule {
- public HadoopFileSystemModule() {
- super("HadoopFileSystemModule");
- setMixInAnnotation(Configuration.class, ConfigurationMixin.class);
- }
-
- /** A mixin class to add Jackson annotations to the Hadoop {@link Configuration} class. */
- @JsonDeserialize(using = ConfigurationDeserializer.class)
- @JsonSerialize(using = ConfigurationSerializer.class)
- private static class ConfigurationMixin {}
-
- /** A Jackson {@link JsonDeserializer} for Hadoop {@link Configuration} objects. */
- static class ConfigurationDeserializer extends JsonDeserializer<Configuration> {
- @Override
- public Configuration deserialize(JsonParser jsonParser,
- DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
- Map<String, String> rawConfiguration =
- jsonParser.readValueAs(new TypeReference<Map<String, String>>() {});
- Configuration configuration = new Configuration(false);
- for (Map.Entry<String, String> entry : rawConfiguration.entrySet()) {
- configuration.set(entry.getKey(), entry.getValue());
- }
- return configuration;
- }
- }
-
- /** A Jackson {@link JsonSerializer} for Hadoop {@link Configuration} objects. */
- static class ConfigurationSerializer extends JsonSerializer<Configuration> {
- @Override
- public void serialize(Configuration configuration, JsonGenerator jsonGenerator,
- SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
- Map<String, String> map = new TreeMap<>();
- for (Map.Entry<String, String> entry : configuration) {
- map.put(entry.getKey(), entry.getValue());
- }
- jsonGenerator.writeObject(map);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
deleted file mode 100644
index 31250bc..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import java.util.List;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * {@link PipelineOptions} which encapsulate {@link Configuration Hadoop Configuration}
- * for the {@link HadoopFileSystem}.
- */
-public interface HadoopFileSystemOptions extends PipelineOptions {
- @Description("A list of Hadoop configurations used to configure zero or more Hadoop filesystems. "
- + "To specify on the command-line, represent the value as a JSON list of JSON maps, where "
- + "each map represents the entire configuration for a single Hadoop filesystem. For example "
- + "--hdfsConfiguration='[{\"fs.default.name\": \"hdfs://localhost:9998\", ...},"
- + "{\"fs.default.name\": \"s3a://\", ...},...]'")
- @Default.InstanceFactory(ConfigurationLocator.class)
- List<Configuration> getHdfsConfiguration();
- void setHdfsConfiguration(List<Configuration> value);
-
- /** A {@link DefaultValueFactory} which locates a Hadoop {@link Configuration}. */
- class ConfigurationLocator implements DefaultValueFactory<Configuration> {
- @Override
- public Configuration create(PipelineOptions options) {
- // TODO: Find default configuration to use
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java
deleted file mode 100644
index 344623b..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-
-/**
- * {@link AutoService} registrar for {@link HadoopFileSystemOptions}.
- */
-@AutoService(PipelineOptionsRegistrar.class)
-public class HadoopFileSystemOptionsRegistrar implements PipelineOptionsRegistrar {
-
- @Override
- public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
- return ImmutableList.<Class<? extends PipelineOptions>>of(HadoopFileSystemOptions.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
deleted file mode 100644
index 9159df3..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import javax.annotation.Nonnull;
-import org.apache.beam.sdk.io.FileSystem;
-import org.apache.beam.sdk.io.FileSystemRegistrar;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * {@link AutoService} registrar for the {@link HadoopFileSystem}.
- */
-@AutoService(FileSystemRegistrar.class)
-public class HadoopFileSystemRegistrar implements FileSystemRegistrar {
-
- @Override
- public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
- List<Configuration> configurations =
- options.as(HadoopFileSystemOptions.class).getHdfsConfiguration();
- if (configurations == null) {
- configurations = Collections.emptyList();
- }
- checkArgument(configurations.size() <= 1,
- String.format(
- "The %s currently only supports at most a single Hadoop configuration.",
- HadoopFileSystemRegistrar.class.getSimpleName()));
-
- ImmutableList.Builder<FileSystem> builder = ImmutableList.builder();
- for (Configuration configuration : configurations) {
- try {
- builder.add(new HadoopFileSystem(configuration));
- } catch (IOException e) {
- throw new IllegalArgumentException(String.format(
- "Failed to construct Hadoop filesystem with configuration %s", configuration), e);
- }
- }
- return builder.build();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
deleted file mode 100644
index e570864..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import java.net.URI;
-import java.util.Objects;
-import org.apache.beam.sdk.io.fs.ResolveOptions;
-import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.hadoop.fs.Path;
-
-/**
- * {@link ResourceId} implementation for the {@link HadoopFileSystem}.
- */
-class HadoopResourceId implements ResourceId {
- private final URI uri;
-
- HadoopResourceId(URI uri) {
- this.uri = uri;
- }
-
- @Override
- public ResourceId resolve(String other, ResolveOptions resolveOptions) {
- return new HadoopResourceId(uri.resolve(other));
- }
-
- @Override
- public ResourceId getCurrentDirectory() {
- return new HadoopResourceId(uri.getPath().endsWith("/") ? uri : uri.resolve("."));
- }
-
- public boolean isDirectory() {
- return uri.getPath().endsWith("/");
- }
-
- @Override
- public String getFilename() {
- return new Path(uri).getName();
- }
-
- @Override
- public String getScheme() {
- return uri.getScheme();
- }
-
- @Override
- public String toString() {
- return uri.toString();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof HadoopResourceId)) {
- return false;
- }
- return Objects.equals(uri, ((HadoopResourceId) obj).uri);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(uri);
- }
-
- Path toPath() {
- return new Path(uri);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
deleted file mode 100644
index fe2db5f..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-
-/**
- * This class is deprecated, and only exists for HDFSFileSink.
- */
-@Deprecated
-public abstract class Sink<T> implements Serializable, HasDisplayData {
- /**
- * Ensures that the sink is valid and can be written to before the write operation begins. One
- * should use {@link com.google.common.base.Preconditions} to implement this method.
- */
- public abstract void validate(PipelineOptions options);
-
- /**
- * Returns an instance of a {@link WriteOperation} that can write to this Sink.
- */
- public abstract WriteOperation<T, ?> createWriteOperation();
-
- /**
- * {@inheritDoc}
- *
- * <p>By default, does not register any display data. Implementors may override this method
- * to provide their own display data.
- */
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {}
-
- /**
- * A {@link WriteOperation} defines the process of a parallel write of objects to a Sink.
- *
- * <p>The {@code WriteOperation} defines how to perform initialization and finalization of a
- * parallel write to a sink as well as how to create a {@link Sink.Writer} object that can write
- * a bundle to the sink.
- *
- * <p>Since operations in Beam may be run multiple times for redundancy or fault-tolerance,
- * the initialization and finalization defined by a WriteOperation <b>must be idempotent</b>.
- *
- * <p>{@code WriteOperation}s may be mutable; a {@code WriteOperation} is serialized after the
- * call to {@code initialize} method and deserialized before calls to
- * {@code createWriter} and {@code finalized}. However, it is not
- * reserialized after {@code createWriter}, so {@code createWriter} should not mutate the
- * state of the {@code WriteOperation}.
- *
- * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
- *
- * @param <T> The type of objects to write
- * @param <WriteT> The result of a per-bundle write
- */
- public abstract static class WriteOperation<T, WriteT> implements Serializable {
- /**
- * Performs initialization before writing to the sink. Called before writing begins.
- */
- public abstract void initialize(PipelineOptions options) throws Exception;
-
- /**
- * Indicates that the operation will be performing windowed writes.
- */
- public abstract void setWindowedWrites(boolean windowedWrites);
-
- /**
- * Given an Iterable of results from bundle writes, performs finalization after writing and
- * closes the sink. Called after all bundle writes are complete.
- *
- * <p>The results that are passed to finalize are those returned by bundles that completed
- * successfully. Although bundles may have been run multiple times (for fault-tolerance), only
- * one writer result will be passed to finalize for each bundle. An implementation of finalize
- * should perform clean up of any failed and successfully retried bundles. Note that these
- * failed bundles will not have their writer result passed to finalize, so finalize should be
- * capable of locating any temporary/partial output written by failed bundles.
- *
- * <p>A best practice is to make finalize atomic. If this is impossible given the semantics
- * of the sink, finalize should be idempotent, as it may be called multiple times in the case of
- * failure/retry or for redundancy.
- *
- * <p>Note that the iteration order of the writer results is not guaranteed to be consistent if
- * finalize is called multiple times.
- *
- * @param writerResults an Iterable of results from successful bundle writes.
- */
- public abstract void finalize(Iterable<WriteT> writerResults, PipelineOptions options)
- throws Exception;
-
- /**
- * Creates a new {@link Sink.Writer} to write a bundle of the input to the sink.
- *
- * <p>The bundle id that the writer will use to uniquely identify its output will be passed to
- * {@link Writer#openWindowed} or {@link Writer#openUnwindowed}.
- *
- * <p>Must not mutate the state of the WriteOperation.
- */
- public abstract Writer<T, WriteT> createWriter(PipelineOptions options) throws Exception;
-
- /**
- * Returns the Sink that this write operation writes to.
- */
- public abstract Sink<T> getSink();
-
- /**
- * Returns a coder for the writer result type.
- */
- public abstract Coder<WriteT> getWriterResultCoder();
- }
-
- /**
- * A Writer writes a bundle of elements from a PCollection to a sink.
- * {@link Writer#openWindowed} or {@link Writer#openUnwindowed} is called before writing begins
- * and {@link Writer#close} is called after all elements in the bundle have been written.
- * {@link Writer#write} writes an element to the sink.
- *
- * <p>Note that any access to static members or methods of a Writer must be thread-safe, as
- * multiple instances of a Writer may be instantiated in different threads on the same worker.
- *
- * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
- *
- * @param <T> The type of object to write
- * @param <WriteT> The writer results type (e.g., the bundle's output filename, as String)
- */
- public abstract static class Writer<T, WriteT> {
- /**
- * Performs bundle initialization. For example, creates a temporary file for writing or
- * initializes any state that will be used across calls to {@link Writer#write}.
- *
- * <p>The unique id that is given to open should be used to ensure that the writer's output does
- * not interfere with the output of other Writers, as a bundle may be executed many times for
- * fault tolerance. See {@link Sink} for more information about bundle ids.
- *
- * <p>The window and paneInfo arguments are populated when windowed writes are requested.
- * shard and numbShards are populated for the case of static sharding. In cases where the
- * runner is dynamically picking sharding, shard and numShards might both be set to -1.
- */
- public abstract void openWindowed(String uId,
- BoundedWindow window,
- PaneInfo paneInfo,
- int shard,
- int numShards) throws Exception;
-
- /**
- * Perform bundle initialization for the case where the file is written unwindowed.
- */
- public abstract void openUnwindowed(String uId,
- int shard,
- int numShards) throws Exception;
-
- public abstract void cleanup() throws Exception;
-
- /**
- * Called for each value in the bundle.
- */
- public abstract void write(T value) throws Exception;
-
- /**
- * Finishes writing the bundle. Closes any resources used for writing the bundle.
- *
- * <p>Returns a writer result that will be used in the {@link Sink.WriteOperation}'s
- * finalization. The result should contain some way to identify the output of this bundle (using
- * the bundle id). {@link WriteOperation#finalize} will use the writer result to identify
- * successful writes. See {@link Sink} for more information about bundle ids.
- *
- * @return the writer result
- */
- public abstract WriteT close() throws Exception;
-
- /**
- * Returns the write operation this writer belongs to.
- */
- public abstract WriteOperation<T, WriteT> getWriteOperation();
-
-
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
deleted file mode 100644
index fd05a19..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import java.io.IOException;
-import javax.annotation.Nullable;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * {@link UserGroupInformation} helper methods.
- */
-public class UGIHelper {
-
- /**
- * Find the most appropriate UserGroupInformation to use.
- * @param username the user name, or NULL if none is specified.
- * @return the most appropriate UserGroupInformation
- */
- public static UserGroupInformation getBestUGI(@Nullable String username) throws IOException {
- return UserGroupInformation.getBestUGI(null, username);
- }
-
-}