You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/02 02:54:55 UTC
[1/2] incubator-beam git commit: Several improvements to HDFS/Hadoop
interoperability
Repository: incubator-beam
Updated Branches:
refs/heads/master 69b0a48e8 -> c834ecd3d
Several improvements to HDFS/Hadoop interoperability
* handle NullWritable in WritableCoder
* update Function handling in HDFSFileSource#splitIntoBundles
* add AvroHDFSFileSource
* add HDFSFileSink
* add SimpleAuth HDFS IO
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ed7dbb07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ed7dbb07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ed7dbb07
Branch: refs/heads/master
Commit: ed7dbb07f41b3969c28a6096a385005ee3a1ff7f
Parents: 69b0a48
Author: Neville Li <ne...@spotify.com>
Authored: Mon Jun 27 15:51:17 2016 -0400
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 1 19:54:06 2016 -0700
----------------------------------------------------------------------
sdks/java/io/hdfs/pom.xml | 24 ++
.../beam/sdk/io/hdfs/AvroHDFSFileSource.java | 145 ++++++++++
.../beam/sdk/io/hdfs/AvroWrapperCoder.java | 116 ++++++++
.../apache/beam/sdk/io/hdfs/HDFSFileSink.java | 277 +++++++++++++++++++
.../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 40 +--
.../apache/beam/sdk/io/hdfs/WritableCoder.java | 9 +-
.../SimpleAuthAvroHDFSFileSource.java | 84 ++++++
.../hdfs/simpleauth/SimpleAuthHDFSFileSink.java | 132 +++++++++
.../simpleauth/SimpleAuthHDFSFileSource.java | 122 ++++++++
.../beam/sdk/io/hdfs/AvroWrapperCoderTest.java | 52 ++++
.../beam/sdk/io/hdfs/WritableCoderTest.java | 9 +
11 files changed, 989 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ed7dbb07/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index 2e427b1..42175d5 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -82,6 +82,30 @@
</dependency>
<dependency>
+ <groupId>com.google.http-client</groupId>
+ <artifactId>google-http-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ <version>${avro.version}</version>
+ <classifier>hadoop2</classifier>
+ <exclusions>
+ <!-- exclude old Jetty version of servlet API -->
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.0</version>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ed7dbb07/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java
new file mode 100644
index 0000000..9dc926b
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroHDFSFileSource.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import java.io.IOException;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * A {@code BoundedSource} for reading Avro files resident in a Hadoop filesystem.
+ *
+ * @param <T> The type of the Avro records to be read from the source.
+ */
+public class AvroHDFSFileSource<T> extends HDFSFileSource<AvroKey<T>, NullWritable> {
+ private static final long serialVersionUID = 0L;
+
+ protected final AvroCoder<T> avroCoder;
+ private final String schemaStr;
+
+ public AvroHDFSFileSource(String filepattern, AvroCoder<T> avroCoder) {
+ this(filepattern, avroCoder, null);
+ }
+
+ public AvroHDFSFileSource(String filepattern,
+ AvroCoder<T> avroCoder,
+ HDFSFileSource.SerializableSplit serializableSplit) {
+ super(filepattern,
+ ClassUtil.<AvroKeyInputFormat<T>>castClass(AvroKeyInputFormat.class),
+ ClassUtil.<AvroKey<T>>castClass(AvroKey.class),
+ NullWritable.class, serializableSplit);
+ this.avroCoder = avroCoder;
+ this.schemaStr = avroCoder.getSchema().toString();
+ }
+
+ @Override
+ public Coder<KV<AvroKey<T>, NullWritable>> getDefaultOutputCoder() {
+ AvroWrapperCoder<AvroKey<T>, T> keyCoder = AvroWrapperCoder.of(this.getKeyClass(), avroCoder);
+ return KvCoder.of(keyCoder, WritableCoder.of(NullWritable.class));
+ }
+
+ @Override
+ public List<? extends AvroHDFSFileSource<T>> splitIntoBundles(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ if (serializableSplit == null) {
+ return Lists.transform(computeSplits(desiredBundleSizeBytes),
+ new Function<InputSplit, AvroHDFSFileSource<T>>() {
+ @Override
+ public AvroHDFSFileSource<T> apply(@Nullable InputSplit inputSplit) {
+ return new AvroHDFSFileSource<>(
+ filepattern, avroCoder, new SerializableSplit(inputSplit));
+ }
+ });
+ } else {
+ return ImmutableList.of(this);
+ }
+ }
+
+ @Override
+ public BoundedReader<KV<AvroKey<T>, NullWritable>> createReader(PipelineOptions options)
+ throws IOException {
+ this.validate();
+
+ Schema schema = new Schema.Parser().parse(schemaStr);
+ if (serializableSplit == null) {
+ return new AvroHDFSFileReader<>(this, filepattern, formatClass, schema);
+ } else {
+ return new AvroHDFSFileReader<>(this, filepattern, formatClass, schema,
+ serializableSplit.getSplit());
+ }
+ }
+
+ static class AvroHDFSFileReader<T> extends HDFSFileReader<AvroKey<T>, NullWritable> {
+ public AvroHDFSFileReader(BoundedSource<KV<AvroKey<T>, NullWritable>> source,
+ String filepattern,
+ Class<? extends FileInputFormat<?, ?>> formatClass,
+ Schema schema) throws IOException {
+ this(source, filepattern, formatClass, schema, null);
+ }
+
+ public AvroHDFSFileReader(BoundedSource<KV<AvroKey<T>, NullWritable>> source,
+ String filepattern,
+ Class<? extends FileInputFormat<?, ?>> formatClass,
+ Schema schema, InputSplit split) throws IOException {
+ super(source, filepattern, formatClass, split);
+ AvroJob.setInputKeySchema(job, schema);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected KV<AvroKey<T>, NullWritable> nextPair() throws IOException, InterruptedException {
+ AvroKey<T> key = currentReader.getCurrentKey();
+ NullWritable value = currentReader.getCurrentValue();
+
+ // clone the record to work around identical element issue due to object reuse
+ Coder<T> avroCoder = ((AvroHDFSFileSource<T>) this.getCurrentSource()).avroCoder;
+ key = new AvroKey(CoderUtils.clone(avroCoder, key.datum()));
+
+ return KV.of(key, value);
+ }
+
+ }
+
+ static class ClassUtil {
+ @SuppressWarnings("unchecked")
+ static <T> Class<T> castClass(Class<?> aClass) {
+ return (Class<T>) aClass;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ed7dbb07/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
new file mode 100644
index 0000000..a831afe
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoder.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.PropertyNames;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.avro.mapred.AvroWrapper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A {@code AvroWrapperCoder} is a {@link Coder} for a Java class that implements {@link
+ * AvroWrapper}.
+ *
+ * @param <WrapperT> the type of the wrapper
+ * @param <DatumT> the type of the datum
+ */
+public class AvroWrapperCoder<WrapperT extends AvroWrapper<DatumT>, DatumT>
+ extends StandardCoder<WrapperT> {
+ private static final long serialVersionUID = 0L;
+
+ private final Class<WrapperT> wrapperType;
+ private final AvroCoder<DatumT> datumCoder;
+
+ private AvroWrapperCoder(Class<WrapperT> wrapperType, AvroCoder<DatumT> datumCoder) {
+ this.wrapperType = wrapperType;
+ this.datumCoder = datumCoder;
+ }
+
+ /**
+ * Return a {@code AvroWrapperCoder} instance for the provided element class.
+ * @param <WrapperT> the type of the wrapper
+ * @param <DatumT> the type of the datum
+ */
+ public static <WrapperT extends AvroWrapper<DatumT>, DatumT>
+ AvroWrapperCoder<WrapperT, DatumT>of(Class<WrapperT> wrapperType, AvroCoder<DatumT> datumCoder) {
+ return new AvroWrapperCoder<>(wrapperType, datumCoder);
+ }
+
+ @JsonCreator
+ @SuppressWarnings("unchecked")
+ public static AvroWrapperCoder<?, ?> of(
+ @JsonProperty("wrapperType") String wrapperType,
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components)
+ throws ClassNotFoundException {
+ Class<?> clazz = Class.forName(wrapperType);
+ if (!AvroWrapper.class.isAssignableFrom(clazz)) {
+ throw new ClassNotFoundException(
+ "Class " + wrapperType + " does not implement AvroWrapper");
+ }
+ checkArgument(components.size() == 1, "Expecting 1 component, got " + components.size());
+ return of((Class<? extends AvroWrapper>) clazz, (AvroCoder<?>) components.get(0));
+ }
+
+ @Override
+ public void encode(WrapperT value, OutputStream outStream, Context context) throws IOException {
+ datumCoder.encode(value.datum(), outStream, context);
+ }
+
+ @Override
+ public WrapperT decode(InputStream inStream, Context context) throws IOException {
+ try {
+ WrapperT wrapper = wrapperType.newInstance();
+ wrapper.datum(datumCoder.decode(inStream, context));
+ return wrapper;
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new CoderException("unable to deserialize record", e);
+ }
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Collections.singletonList(datumCoder);
+ }
+
+ @Override
+ public CloudObject asCloudObject() {
+ CloudObject result = super.asCloudObject();
+ result.put("wrapperType", wrapperType.getName());
+ return result;
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ datumCoder.verifyDeterministic();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ed7dbb07/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
new file mode 100644
index 0000000..688447a
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.Sink;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.api.client.util.Maps;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * A {@code Sink} for writing records to a Hadoop filesystem using a Hadoop file-based output
+ * format.
+ *
+ * @param <K> The type of keys to be written to the sink.
+ * @param <V> The type of values to be written to the sink.
+ */
+public class HDFSFileSink<K, V> extends Sink<KV<K, V>> {
+
+ private static final JobID jobId = new JobID(
+ Long.toString(System.currentTimeMillis()),
+ new Random().nextInt(Integer.MAX_VALUE));
+
+ protected final String path;
+ protected final Class<? extends FileOutputFormat<K, V>> formatClass;
+
+ // workaround to make Configuration serializable
+ private final Map<String, String> map;
+
+ public HDFSFileSink(String path, Class<? extends FileOutputFormat<K, V>> formatClass) {
+ this.path = path;
+ this.formatClass = formatClass;
+ this.map = Maps.newHashMap();
+ }
+
+ public HDFSFileSink(String path, Class<? extends FileOutputFormat<K, V>> formatClass,
+ Configuration conf) {
+ this(path, formatClass);
+ // serialize conf to map
+ for (Map.Entry<String, String> entry : conf) {
+ map.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public void validate(PipelineOptions options) {
+ try {
+ Job job = jobInstance();
+ FileSystem fs = FileSystem.get(job.getConfiguration());
+ checkState(!fs.exists(new Path(path)), "Output path " + path + " already exists");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Sink.WriteOperation<KV<K, V>, ?> createWriteOperation(PipelineOptions options) {
+ return new HDFSWriteOperation<>(this, path, formatClass);
+ }
+
+ private Job jobInstance() throws IOException {
+ Job job = Job.getInstance();
+ // deserialize map to conf
+ Configuration conf = job.getConfiguration();
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ conf.set(entry.getKey(), entry.getValue());
+ }
+ job.setJobID(jobId);
+ return job;
+ }
+
+ // =======================================================================
+ // WriteOperation
+ // =======================================================================
+
+ /** {{@link WriteOperation}} for HDFS. */
+ public static class HDFSWriteOperation<K, V> extends WriteOperation<KV<K, V>, String> {
+
+ private final Sink<KV<K, V>> sink;
+ protected final String path;
+ protected final Class<? extends FileOutputFormat<K, V>> formatClass;
+
+ public HDFSWriteOperation(Sink<KV<K, V>> sink,
+ String path,
+ Class<? extends FileOutputFormat<K, V>> formatClass) {
+ this.sink = sink;
+ this.path = path;
+ this.formatClass = formatClass;
+ }
+
+ @Override
+ public void initialize(PipelineOptions options) throws Exception {
+ Job job = ((HDFSFileSink<K, V>) getSink()).jobInstance();
+ FileOutputFormat.setOutputPath(job, new Path(path));
+ }
+
+ @Override
+ public void finalize(Iterable<String> writerResults, PipelineOptions options) throws Exception {
+ Job job = ((HDFSFileSink<K, V>) getSink()).jobInstance();
+ FileSystem fs = FileSystem.get(job.getConfiguration());
+
+ // If there are 0 output shards, just create output folder.
+ if (!writerResults.iterator().hasNext()) {
+ fs.mkdirs(new Path(path));
+ return;
+ }
+
+ // job successful
+ JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+ FileOutputCommitter outputCommitter = new FileOutputCommitter(new Path(path), context);
+ outputCommitter.commitJob(context);
+
+ // get actual output shards
+ Set<String> actual = Sets.newHashSet();
+ FileStatus[] statuses = fs.listStatus(new Path(path), new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ String name = path.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ });
+
+ // get expected output shards
+ Set<String> expected = Sets.newHashSet(writerResults);
+ checkState(
+ expected.size() == Lists.newArrayList(writerResults).size(),
+ "Data loss due to writer results hash collision");
+ for (FileStatus s : statuses) {
+ String name = s.getPath().getName();
+ int pos = name.indexOf('.');
+ actual.add(pos > 0 ? name.substring(0, pos) : name);
+ }
+
+ checkState(actual.equals(expected), "Writer results and output files do not match");
+
+ // rename output shards to Hadoop style, i.e. part-r-00000.txt
+ int i = 0;
+ for (FileStatus s : statuses) {
+ String name = s.getPath().getName();
+ int pos = name.indexOf('.');
+ String ext = pos > 0 ? name.substring(pos) : "";
+ fs.rename(
+ s.getPath(),
+ new Path(s.getPath().getParent(), String.format("part-r-%05d%s", i, ext)));
+ i++;
+ }
+ }
+
+ @Override
+ public Writer<KV<K, V>, String> createWriter(PipelineOptions options) throws Exception {
+ return new HDFSWriter<>(this, path, formatClass);
+ }
+
+ @Override
+ public Sink<KV<K, V>> getSink() {
+ return sink;
+ }
+
+ @Override
+ public Coder<String> getWriterResultCoder() {
+ return StringUtf8Coder.of();
+ }
+
+ }
+
+ // =======================================================================
+ // Writer
+ // =======================================================================
+
+ /** {{@link Writer}} for HDFS files. */
+ public static class HDFSWriter<K, V> extends Writer<KV<K, V>, String> {
+
+ private final HDFSWriteOperation<K, V> writeOperation;
+ private final String path;
+ private final Class<? extends FileOutputFormat<K, V>> formatClass;
+
+ // unique hash for each task
+ private int hash;
+
+ private TaskAttemptContext context;
+ private RecordWriter<K, V> recordWriter;
+ private FileOutputCommitter outputCommitter;
+
+ public HDFSWriter(HDFSWriteOperation<K, V> writeOperation,
+ String path,
+ Class<? extends FileOutputFormat<K, V>> formatClass) {
+ this.writeOperation = writeOperation;
+ this.path = path;
+ this.formatClass = formatClass;
+ }
+
+ @Override
+ public void open(String uId) throws Exception {
+ this.hash = uId.hashCode();
+
+ Job job = ((HDFSFileSink<K, V>) getWriteOperation().getSink()).jobInstance();
+ FileOutputFormat.setOutputPath(job, new Path(path));
+
+ // Each Writer is responsible for writing one bundle of elements and is represented by one
+ // unique Hadoop task based on uId/hash. All tasks share the same job ID. Since Dataflow
+ // handles retrying of failed bundles, each task has one attempt only.
+ JobID jobId = job.getJobID();
+ TaskID taskId = new TaskID(jobId, TaskType.REDUCE, hash);
+ context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID(taskId, 0));
+
+ FileOutputFormat<K, V> outputFormat = formatClass.newInstance();
+ recordWriter = outputFormat.getRecordWriter(context);
+ outputCommitter = (FileOutputCommitter) outputFormat.getOutputCommitter(context);
+ }
+
+ @Override
+ public void write(KV<K, V> value) throws Exception {
+ recordWriter.write(value.getKey(), value.getValue());
+ }
+
+ @Override
+ public String close() throws Exception {
+ // task/attempt successful
+ recordWriter.close(context);
+ outputCommitter.commitTask(context);
+
+ // result is prefix of the output file name
+ return String.format("part-r-%d", hash);
+ }
+
+ @Override
+ public WriteOperation<KV<K, V>, String> getWriteOperation() {
+ return writeOperation;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ed7dbb07/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
index 7a0545d..de68565 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
@@ -100,11 +100,11 @@ import javax.annotation.Nullable;
public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
private static final long serialVersionUID = 0L;
- private final String filepattern;
- private final Class<? extends FileInputFormat<?, ?>> formatClass;
- private final Class<K> keyClass;
- private final Class<V> valueClass;
- private final SerializableSplit serializableSplit;
+ protected final String filepattern;
+ protected final Class<? extends FileInputFormat<?, ?>> formatClass;
+ protected final Class<K> keyClass;
+ protected final Class<V> valueClass;
+ protected final SerializableSplit serializableSplit;
/**
* Creates a {@code Read} transform that will read from an {@code HDFSFileSource}
@@ -133,9 +133,9 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
/**
* Create a {@code HDFSFileSource} based on a file or a file pattern specification.
*/
- private HDFSFileSource(String filepattern,
- Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass,
- Class<V> valueClass) {
+ protected HDFSFileSource(String filepattern,
+ Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass,
+ Class<V> valueClass) {
this(filepattern, formatClass, keyClass, valueClass, null);
}
@@ -143,9 +143,9 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
* Create a {@code HDFSFileSource} based on a single Hadoop input split, which won't be
* split up further.
*/
- private HDFSFileSource(String filepattern,
- Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass,
- Class<V> valueClass, SerializableSplit serializableSplit) {
+ protected HDFSFileSource(String filepattern,
+ Class<? extends FileInputFormat<?, ?>> formatClass, Class<K> keyClass,
+ Class<V> valueClass, SerializableSplit serializableSplit) {
this.filepattern = filepattern;
this.formatClass = formatClass;
this.keyClass = keyClass;
@@ -183,9 +183,9 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
if (serializableSplit == null) {
return Lists.transform(computeSplits(desiredBundleSizeBytes),
new Function<InputSplit, BoundedSource<KV<K, V>>>() {
- @Nullable @Override
+ @Override
public BoundedSource<KV<K, V>> apply(@Nullable InputSplit inputSplit) {
- return new HDFSFileSource<K, V>(filepattern, formatClass, keyClass,
+ return new HDFSFileSource<>(filepattern, formatClass, keyClass,
valueClass, new SerializableSplit(inputSplit));
}
});
@@ -201,7 +201,7 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
return formatClass.newInstance();
}
- private List<InputSplit> computeSplits(long desiredBundleSizeBytes) throws IOException,
+ protected List<InputSplit> computeSplits(long desiredBundleSizeBytes) throws IOException,
IllegalAccessException, InstantiationException {
Job job = Job.getInstance();
FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes);
@@ -276,13 +276,14 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
private final BoundedSource<KV<K, V>> source;
private final String filepattern;
private final Class formatClass;
+ protected Job job;
private FileInputFormat<?, ?> format;
private TaskAttemptContext attemptContext;
private List<InputSplit> splits;
private ListIterator<InputSplit> splitsIterator;
private Configuration conf;
- private RecordReader<K, V> currentReader;
+ protected RecordReader<K, V> currentReader;
private KV<K, V> currentPair;
private volatile boolean done = false;
@@ -290,7 +291,7 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
* Create a {@code HDFSFileReader} based on a file or a file pattern specification.
*/
public HDFSFileReader(BoundedSource<KV<K, V>> source, String filepattern,
- Class<? extends FileInputFormat<?, ?>> formatClass) {
+ Class<? extends FileInputFormat<?, ?>> formatClass) throws IOException {
this(source, filepattern, formatClass, null);
}
@@ -298,7 +299,8 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
* Create a {@code HDFSFileReader} based on a single Hadoop input split.
*/
public HDFSFileReader(BoundedSource<KV<K, V>> source, String filepattern,
- Class<? extends FileInputFormat<?, ?>> formatClass, InputSplit split) {
+ Class<? extends FileInputFormat<?, ?>> formatClass, InputSplit split)
+ throws IOException {
this.source = source;
this.filepattern = filepattern;
this.formatClass = formatClass;
@@ -306,11 +308,11 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
this.splits = ImmutableList.of(split);
this.splitsIterator = splits.listIterator();
}
+ this.job = Job.getInstance(); // new instance
}
@Override
public boolean start() throws IOException {
- Job job = Job.getInstance(); // new instance
Path path = new Path(filepattern);
FileInputFormat.addInputPath(job, path);
@@ -369,7 +371,7 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
}
@SuppressWarnings("unchecked")
- private KV<K, V> nextPair() throws IOException, InterruptedException {
+ protected KV<K, V> nextPair() throws IOException, InterruptedException {
K key = currentReader.getCurrentKey();
V value = currentReader.getCurrentValue();
// clone Writable objects since they are reused between calls to RecordReader#nextKeyValue
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ed7dbb07/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
index 814a762..4e913ed 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.util.CloudObject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -32,8 +33,7 @@ import java.io.OutputStream;
import java.util.List;
/**
- * A {@code WritableCoder} is a {@link org.apache.beam.sdk.coders.Coder} for a
- * Java class that implements {@link org.apache.hadoop.io.Writable}.
+ * A {@code WritableCoder} is a {@link Coder} for a Java class that implements {@link Writable}.
*
* <p> To use, specify the coder type on a PCollection:
* <pre>
@@ -79,9 +79,14 @@ public class WritableCoder<T extends Writable> extends StandardCoder<T> {
value.write(new DataOutputStream(outStream));
}
+ @SuppressWarnings("unchecked")
@Override
public T decode(InputStream inStream, Context context) throws IOException {
try {
+ if (type == NullWritable.class) {
+ // NullWritable has no default constructor
+ return (T) NullWritable.get();
+ }
T t = type.newInstance();
t.readFields(new DataInputStream(inStream));
return t;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ed7dbb07/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthAvroHDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthAvroHDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthAvroHDFSFileSource.java
new file mode 100644
index 0000000..5dd9673
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthAvroHDFSFileSource.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs.simpleauth;
+
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.io.hdfs.AvroHDFSFileSource;
+import org.apache.beam.sdk.io.hdfs.HDFSFileSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Source for Avros on Hadoop/HDFS with Simple Authentication.
+ *
+ * Allows to set arbitrary username as HDFS user, which is used for reading Avro from HDFS.
+ */
+public class SimpleAuthAvroHDFSFileSource<T> extends AvroHDFSFileSource<T> {
+ // keep this field to pass Hadoop user between workers
+ private final String username;
+
+ /**
+ * Create a {@code SimpleAuthAvroHDFSFileSource} based on a file or a file pattern specification.
+ * @param username HDFS username.
+ */
+ public SimpleAuthAvroHDFSFileSource(String filepattern,
+ AvroCoder<T> avroCoder,
+ String username) {
+ super(filepattern, avroCoder);
+ this.username = username;
+ }
+
+ /**
+ * Create a {@code SimpleAuthAvroHDFSFileSource} based on a single Hadoop input split, which won't
+ * be split up further.
+ * @param username HDFS username.
+ */
+ public SimpleAuthAvroHDFSFileSource(String filepattern,
+ AvroCoder<T> avroCoder,
+ HDFSFileSource.SerializableSplit serializableSplit,
+ String username) {
+ super(filepattern, avroCoder, serializableSplit);
+ this.username = username;
+ }
+
+ @Override
+ public List<? extends AvroHDFSFileSource<T>> splitIntoBundles(long desiredBundleSizeBytes,
+ PipelineOptions options)
+ throws Exception {
+ if (serializableSplit == null) {
+ return Lists.transform(computeSplits(desiredBundleSizeBytes),
+ new Function<InputSplit, AvroHDFSFileSource<T>>() {
+ @Override
+ public AvroHDFSFileSource<T> apply(@Nullable InputSplit inputSplit) {
+ return new SimpleAuthAvroHDFSFileSource<>(
+ filepattern, avroCoder, new HDFSFileSource.SerializableSplit(inputSplit),
+ username);
+ }
+ });
+ } else {
+ return ImmutableList.of(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ed7dbb07/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSink.java
new file mode 100644
index 0000000..d0fd8b6
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSink.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs.simpleauth;
+
+import org.apache.beam.sdk.io.Sink;
+import org.apache.beam.sdk.io.hdfs.HDFSFileSink;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * A {@code Sink} for writing records to a Hadoop filesystem using a Hadoop file-based output
+ * format with Simple Authentication.
+ *
+ * Allows arbitrary username as HDFS user, which is used for writing to HDFS.
+ *
+ * @param <K> The type of keys to be written to the sink.
+ * @param <V> The type of values to be written to the sink.
+ */
+public class SimpleAuthHDFSFileSink<K, V> extends HDFSFileSink<K, V> {
+ private final String username;
+
+ public SimpleAuthHDFSFileSink(String path,
+ Class<? extends FileOutputFormat<K, V>> formatClass,
+ Configuration conf,
+ String username) {
+ super(path, formatClass, conf);
+ this.username = username;
+ }
+
+ @Override
+ public WriteOperation<KV<K, V>, ?> createWriteOperation(PipelineOptions options) {
+ return new SimpleAuthHDFSWriteOperation<>(this, path, formatClass, username);
+ }
+
+ /** {{@link WriteOperation}} for HDFS with Simple Authentication. */
+ public static class SimpleAuthHDFSWriteOperation<K, V> extends HDFSWriteOperation<K, V> {
+ private final String username;
+
+ SimpleAuthHDFSWriteOperation(Sink<KV<K, V>> sink,
+ String path,
+ Class<? extends FileOutputFormat<K, V>> formatClass,
+ String username) {
+ super(sink, path, formatClass);
+ this.username = username;
+ }
+
+ @Override
+ public void finalize(final Iterable<String> writerResults, final PipelineOptions options)
+ throws Exception {
+ UserGroupInformation.createRemoteUser(username).doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ superFinalize(writerResults, options);
+ return null;
+ }
+ });
+ }
+
+ private void superFinalize(Iterable<String> writerResults, PipelineOptions options)
+ throws Exception {
+ super.finalize(writerResults, options);
+ }
+
+ @Override
+ public Writer<KV<K, V>, String> createWriter(PipelineOptions options) throws Exception {
+ return new SimpleAuthHDFSWriter<>(this, path, formatClass, username);
+ }
+ }
+
+ /** {{@link Writer}} for HDFS files with Simple Authentication. */
+ public static class SimpleAuthHDFSWriter<K, V> extends HDFSWriter<K, V> {
+ private final UserGroupInformation ugi;
+
+ public SimpleAuthHDFSWriter(SimpleAuthHDFSWriteOperation<K, V> writeOperation,
+ String path,
+ Class<? extends FileOutputFormat<K, V>> formatClass,
+ String username) {
+ super(writeOperation, path, formatClass);
+ ugi = UserGroupInformation.createRemoteUser(username);
+ }
+
+ @Override
+ public void open(final String uId) throws Exception {
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ superOpen(uId);
+ return null;
+ }
+ });
+ }
+
+ private void superOpen(String uId) throws Exception {
+ super.open(uId);
+ }
+
+ @Override
+ public String close() throws Exception {
+ return ugi.doAs(new PrivilegedExceptionAction<String>() {
+ @Override
+ public String run() throws Exception {
+ return superClose();
+ }
+ });
+ }
+
+ private String superClose() throws Exception {
+ return super.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ed7dbb07/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java
new file mode 100644
index 0000000..5b768fc
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/simpleauth/SimpleAuthHDFSFileSource.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs.simpleauth;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.hdfs.HDFSFileSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Source for Hadoop/HDFS with Simple Authentication.
+ *
+ * Allows to set arbitrary username as HDFS user, which is used for reading from HDFS.
+ */
+public class SimpleAuthHDFSFileSource<K, V> extends HDFSFileSource<K, V> {
+ private final String username;
+ /**
+ * Create a {@code SimpleAuthHDFSFileSource} based on a single Hadoop input split, which won't be
+ * split up further.
+ * @param username HDFS username.
+ */
+ protected SimpleAuthHDFSFileSource(String filepattern,
+ Class<? extends FileInputFormat<?, ?>> formatClass,
+ Class<K> keyClass,
+ Class<V> valueClass,
+ HDFSFileSource.SerializableSplit serializableSplit,
+ String username) {
+ super(filepattern, formatClass, keyClass, valueClass, serializableSplit);
+ this.username = username;
+ }
+
+ /**
+ * Create a {@code SimpleAuthHDFSFileSource} based on a file or a file pattern specification.
+ * @param username HDFS username.
+ */
+ protected SimpleAuthHDFSFileSource(String filepattern,
+ Class<? extends FileInputFormat<?, ?>> formatClass,
+ Class<K> keyClass,
+ Class<V> valueClass,
+ String username) {
+ super(filepattern, formatClass, keyClass, valueClass);
+ this.username = username;
+ }
+
+ /**
+ * Creates a {@code Read} transform that will read from an {@code SimpleAuthHDFSFileSource}
+ * with the given file name or pattern ("glob") using the given Hadoop {@link FileInputFormat},
+ * with key-value types specified
+ * by the given key class and value class.
+ * @param username HDFS username.
+ */
+ public static <K, V, T extends FileInputFormat<K, V>> Read.Bounded<KV<K, V>> readFrom(
+ String filepattern,
+ Class<T> formatClass,
+ Class<K> keyClass,
+ Class<V> valueClass,
+ String username) {
+ return Read.from(from(filepattern, formatClass, keyClass, valueClass, username));
+ }
+
+ /**
+ * Creates a {@code SimpleAuthHDFSFileSource} that reads from the given file name or pattern
+ * ("glob") using the given Hadoop {@link FileInputFormat}, with key-value types specified by the
+ * given key class and value class.
+ * @param username HDFS username.
+ */
+ public static <K, V, T extends FileInputFormat<K, V>> HDFSFileSource<K, V> from(
+ String filepattern,
+ Class<T> formatClass,
+ Class<K> keyClass,
+ Class<V> valueClass,
+ String username) {
+ @SuppressWarnings("unchecked")
+ HDFSFileSource<K, V> source = (HDFSFileSource<K, V>)
+ new SimpleAuthHDFSFileSource(filepattern, formatClass, keyClass, valueClass, username);
+ return source;
+ }
+
+ @Override
+ public List<? extends BoundedSource<KV<K, V>>> splitIntoBundles(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ if (serializableSplit == null) {
+ return Lists.transform(computeSplits(desiredBundleSizeBytes),
+ new Function<InputSplit, BoundedSource<KV<K, V>>>() {
+ @Nullable
+ @Override
+ public BoundedSource<KV<K, V>> apply(@Nullable InputSplit inputSplit) {
+ return new SimpleAuthHDFSFileSource<>(filepattern, formatClass, keyClass,
+ valueClass, new HDFSFileSource.SerializableSplit(inputSplit),
+ username);
+ }
+ });
+ } else {
+ return ImmutableList.of(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ed7dbb07/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java
new file mode 100644
index 0000000..85cbd46
--- /dev/null
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/AvroWrapperCoderTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.testing.CoderProperties;
+
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.junit.Test;
+
+/**
+ * Tests for AvroWrapperCoder.
+ */
+public class AvroWrapperCoderTest {
+
+ @Test
+ public void testAvroKeyEncoding() throws Exception {
+ AvroKey<Integer> value = new AvroKey<>(42);
+ AvroWrapperCoder<AvroKey<Integer>, Integer> coder = AvroWrapperCoder.of(
+ AvroHDFSFileSource.ClassUtil.<AvroKey<Integer>>castClass(AvroKey.class),
+ AvroCoder.of(Integer.class));
+
+ CoderProperties.coderDecodeEncodeEqual(coder, value);
+ }
+
+ @Test
+ public void testAvroValueEncoding() throws Exception {
+ AvroValue<Integer> value = new AvroValue<>(42);
+ AvroWrapperCoder<AvroValue<Integer>, Integer> coder = AvroWrapperCoder.of(
+ AvroHDFSFileSource.ClassUtil.<AvroValue<Integer>>castClass(AvroValue.class),
+ AvroCoder.of(Integer.class));
+
+ CoderProperties.coderDecodeEncodeEqual(coder, value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ed7dbb07/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
index 715da8e..ac32c33 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.hdfs;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
import org.junit.Test;
/**
@@ -34,4 +35,12 @@ public class WritableCoderTest {
CoderProperties.coderDecodeEncodeEqual(coder, value);
}
+
+ @Test
+ public void testNullWritableEncoding() throws Exception {
+ NullWritable value = NullWritable.get();
+ WritableCoder<NullWritable> coder = WritableCoder.of(NullWritable.class);
+
+ CoderProperties.coderDecodeEncodeEqual(coder, value);
+ }
}
[2/2] incubator-beam git commit: Closes #485
Posted by dh...@apache.org.
Closes #485
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c834ecd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c834ecd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c834ecd3
Branch: refs/heads/master
Commit: c834ecd3dc5245f661f840b390b2135a11b3bc7a
Parents: 69b0a48 ed7dbb0
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 1 19:54:39 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 1 19:54:39 2016 -0700
----------------------------------------------------------------------
sdks/java/io/hdfs/pom.xml | 24 ++
.../beam/sdk/io/hdfs/AvroHDFSFileSource.java | 145 ++++++++++
.../beam/sdk/io/hdfs/AvroWrapperCoder.java | 116 ++++++++
.../apache/beam/sdk/io/hdfs/HDFSFileSink.java | 277 +++++++++++++++++++
.../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 40 +--
.../apache/beam/sdk/io/hdfs/WritableCoder.java | 9 +-
.../SimpleAuthAvroHDFSFileSource.java | 84 ++++++
.../hdfs/simpleauth/SimpleAuthHDFSFileSink.java | 132 +++++++++
.../simpleauth/SimpleAuthHDFSFileSource.java | 122 ++++++++
.../beam/sdk/io/hdfs/AvroWrapperCoderTest.java | 52 ++++
.../beam/sdk/io/hdfs/WritableCoderTest.java | 9 +
11 files changed, 989 insertions(+), 21 deletions(-)
----------------------------------------------------------------------