You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/04 19:53:28 UTC
[03/50] [abbrv] beam git commit: [BEAM-2135] Move hdfs to
hadoop-file-system
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
new file mode 100644
index 0000000..fe2db5f
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+
+/**
+ * This class is deprecated, and only exists for HDFSFileSink.
+ */
+@Deprecated
+public abstract class Sink<T> implements Serializable, HasDisplayData {
+ /**
+ * Ensures that the sink is valid and can be written to before the write operation begins. One
+ * should use {@link com.google.common.base.Preconditions} to implement this method.
+ */
+ public abstract void validate(PipelineOptions options);
+
+ /**
+ * Returns an instance of a {@link WriteOperation} that can write to this Sink.
+ */
+ public abstract WriteOperation<T, ?> createWriteOperation();
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p>By default, does not register any display data. Implementors may override this method
+ * to provide their own display data.
+ */
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {}
+
+ /**
+ * A {@link WriteOperation} defines the process of a parallel write of objects to a Sink.
+ *
+ * <p>The {@code WriteOperation} defines how to perform initialization and finalization of a
+ * parallel write to a sink as well as how to create a {@link Sink.Writer} object that can write
+ * a bundle to the sink.
+ *
+ * <p>Since operations in Beam may be run multiple times for redundancy or fault-tolerance,
+ * the initialization and finalization defined by a WriteOperation <b>must be idempotent</b>.
+ *
+ * <p>{@code WriteOperation}s may be mutable; a {@code WriteOperation} is serialized after the
+ * call to {@code initialize} method and deserialized before calls to
+ * {@code createWriter} and {@code finalized}. However, it is not
+ * reserialized after {@code createWriter}, so {@code createWriter} should not mutate the
+ * state of the {@code WriteOperation}.
+ *
+ * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
+ *
+ * @param <T> The type of objects to write
+ * @param <WriteT> The result of a per-bundle write
+ */
+ public abstract static class WriteOperation<T, WriteT> implements Serializable {
+ /**
+ * Performs initialization before writing to the sink. Called before writing begins.
+ */
+ public abstract void initialize(PipelineOptions options) throws Exception;
+
+ /**
+ * Indicates that the operation will be performing windowed writes.
+ */
+ public abstract void setWindowedWrites(boolean windowedWrites);
+
+ /**
+ * Given an Iterable of results from bundle writes, performs finalization after writing and
+ * closes the sink. Called after all bundle writes are complete.
+ *
+ * <p>The results that are passed to finalize are those returned by bundles that completed
+ * successfully. Although bundles may have been run multiple times (for fault-tolerance), only
+ * one writer result will be passed to finalize for each bundle. An implementation of finalize
+ * should perform clean up of any failed and successfully retried bundles. Note that these
+ * failed bundles will not have their writer result passed to finalize, so finalize should be
+ * capable of locating any temporary/partial output written by failed bundles.
+ *
+ * <p>A best practice is to make finalize atomic. If this is impossible given the semantics
+ * of the sink, finalize should be idempotent, as it may be called multiple times in the case of
+ * failure/retry or for redundancy.
+ *
+ * <p>Note that the iteration order of the writer results is not guaranteed to be consistent if
+ * finalize is called multiple times.
+ *
+ * @param writerResults an Iterable of results from successful bundle writes.
+ */
+ public abstract void finalize(Iterable<WriteT> writerResults, PipelineOptions options)
+ throws Exception;
+
+ /**
+ * Creates a new {@link Sink.Writer} to write a bundle of the input to the sink.
+ *
+ * <p>The bundle id that the writer will use to uniquely identify its output will be passed to
+ * {@link Writer#openWindowed} or {@link Writer#openUnwindowed}.
+ *
+ * <p>Must not mutate the state of the WriteOperation.
+ */
+ public abstract Writer<T, WriteT> createWriter(PipelineOptions options) throws Exception;
+
+ /**
+ * Returns the Sink that this write operation writes to.
+ */
+ public abstract Sink<T> getSink();
+
+ /**
+ * Returns a coder for the writer result type.
+ */
+ public abstract Coder<WriteT> getWriterResultCoder();
+ }
+
+ /**
+ * A Writer writes a bundle of elements from a PCollection to a sink.
+ * {@link Writer#openWindowed} or {@link Writer#openUnwindowed} is called before writing begins
+ * and {@link Writer#close} is called after all elements in the bundle have been written.
+ * {@link Writer#write} writes an element to the sink.
+ *
+ * <p>Note that any access to static members or methods of a Writer must be thread-safe, as
+ * multiple instances of a Writer may be instantiated in different threads on the same worker.
+ *
+ * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
+ *
+ * @param <T> The type of object to write
+ * @param <WriteT> The writer results type (e.g., the bundle's output filename, as String)
+ */
+ public abstract static class Writer<T, WriteT> {
+ /**
+ * Performs bundle initialization. For example, creates a temporary file for writing or
+ * initializes any state that will be used across calls to {@link Writer#write}.
+ *
+ * <p>The unique id that is given to open should be used to ensure that the writer's output does
+ * not interfere with the output of other Writers, as a bundle may be executed many times for
+ * fault tolerance. See {@link Sink} for more information about bundle ids.
+ *
+ * <p>The window and paneInfo arguments are populated when windowed writes are requested.
+ * shard and numbShards are populated for the case of static sharding. In cases where the
+ * runner is dynamically picking sharding, shard and numShards might both be set to -1.
+ */
+ public abstract void openWindowed(String uId,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ int shard,
+ int numShards) throws Exception;
+
+ /**
+ * Perform bundle initialization for the case where the file is written unwindowed.
+ */
+ public abstract void openUnwindowed(String uId,
+ int shard,
+ int numShards) throws Exception;
+
+ public abstract void cleanup() throws Exception;
+
+ /**
+ * Called for each value in the bundle.
+ */
+ public abstract void write(T value) throws Exception;
+
+ /**
+ * Finishes writing the bundle. Closes any resources used for writing the bundle.
+ *
+ * <p>Returns a writer result that will be used in the {@link Sink.WriteOperation}'s
+ * finalization. The result should contain some way to identify the output of this bundle (using
+ * the bundle id). {@link WriteOperation#finalize} will use the writer result to identify
+ * successful writes. See {@link Sink} for more information about bundle ids.
+ *
+ * @return the writer result
+ */
+ public abstract WriteT close() throws Exception;
+
+ /**
+ * Returns the write operation this writer belongs to.
+ */
+ public abstract WriteOperation<T, WriteT> getWriteOperation();
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
new file mode 100644
index 0000000..fd05a19
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * {@link UserGroupInformation} helper methods.
+ */
+public class UGIHelper {
+
+ /**
+ * Find the most appropriate UserGroupInformation to use.
+ * @param username the user name, or NULL if none is specified.
+ * @return the most appropriate UserGroupInformation
+ */
+ public static UserGroupInformation getBestUGI(@Nullable String username) throws IOException {
+ return UserGroupInformation.getBestUGI(null, username);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
new file mode 100644
index 0000000..86a9246
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.hdfs.Sink.WriteOperation;
+import org.apache.beam.sdk.io.hdfs.Sink.Writer;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PDone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is deprecated, and only exists currently for HDFSFileSink.
+ */
+@Deprecated
+public class Write<T> extends PTransform<PCollection<T>, PDone> {
+ private static final Logger LOG = LoggerFactory.getLogger(Write.class);
+
+ private static final int UNKNOWN_SHARDNUM = -1;
+ private static final int UNKNOWN_NUMSHARDS = -1;
+
+ private final Sink<T> sink;
+ // This allows the number of shards to be dynamically computed based on the input
+ // PCollection.
+ @Nullable
+ private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
+ // We don't use a side input for static sharding, as we want this value to be updatable
+ // when a pipeline is updated.
+ @Nullable
+ private final ValueProvider<Integer> numShardsProvider;
+ private boolean windowedWrites;
+
+ /**
+ * Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner
+ * control how many different shards are produced.
+ */
+ public static <T> Write<T> to(Sink<T> sink) {
+ checkNotNull(sink, "sink");
+ return new Write<>(sink, null /* runner-determined sharding */, null, false);
+ }
+
+ private Write(
+ Sink<T> sink,
+ @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards,
+ @Nullable ValueProvider<Integer> numShardsProvider,
+ boolean windowedWrites) {
+ this.sink = sink;
+ this.computeNumShards = computeNumShards;
+ this.numShardsProvider = numShardsProvider;
+ this.windowedWrites = windowedWrites;
+ }
+
+ @Override
+ public PDone expand(PCollection<T> input) {
+ checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites,
+ "%s can only be applied to an unbounded PCollection if doing windowed writes",
+ Write.class.getSimpleName());
+ return createWrite(input, sink.createWriteOperation());
+ }
+
+ @Override
+ public void validate(PipelineOptions options) {
+ sink.validate(options);
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
+ .include("sink", sink);
+ if (getSharding() != null) {
+ builder.include("sharding", getSharding());
+ } else if (getNumShards() != null) {
+ String numShards = getNumShards().isAccessible()
+ ? getNumShards().get().toString() : getNumShards().toString();
+ builder.add(DisplayData.item("numShards", numShards)
+ .withLabel("Fixed Number of Shards"));
+ }
+ }
+
+ /**
+ * Returns the {@link Sink} associated with this PTransform.
+ */
+ public Sink<T> getSink() {
+ return sink;
+ }
+
+ /**
+ * Gets the {@link PTransform} that will be used to determine sharding. This can be either a
+ * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by
+ * {@link #withSharding(PTransform)}), or runner-determined (by {@link
+ * #withRunnerDeterminedSharding()}.
+ */
+ @Nullable
+ public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
+ return computeNumShards;
+ }
+
+ public ValueProvider<Integer> getNumShards() {
+ return numShardsProvider;
+ }
+
+ /**
+ * Returns a new {@link Write} that will write to the current {@link Sink} using the
+ * specified number of shards.
+ *
+ * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
+ * more information.
+ *
+ * <p>A value less than or equal to 0 will be equivalent to the default behavior of
+ * runner-determined sharding.
+ */
+ public Write<T> withNumShards(int numShards) {
+ if (numShards > 0) {
+ return withNumShards(StaticValueProvider.of(numShards));
+ }
+ return withRunnerDeterminedSharding();
+ }
+
+ /**
+ * Returns a new {@link Write} that will write to the current {@link Sink} using the
+ * {@link ValueProvider} specified number of shards.
+ *
+ * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
+ * more information.
+ */
+ public Write<T> withNumShards(ValueProvider<Integer> numShardsProvider) {
+ return new Write<>(sink, null, numShardsProvider, windowedWrites);
+ }
+
+ /**
+ * Returns a new {@link Write} that will write to the current {@link Sink} using the
+ * specified {@link PTransform} to compute the number of shards.
+ *
+ * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
+ * more information.
+ */
+ public Write<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
+ checkNotNull(
+ sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
+ return new Write<>(sink, sharding, null, windowedWrites);
+ }
+
+ /**
+ * Returns a new {@link Write} that will write to the current {@link Sink} with
+ * runner-determined sharding.
+ */
+ public Write<T> withRunnerDeterminedSharding() {
+ return new Write<>(sink, null, null, windowedWrites);
+ }
+
+ /**
+ * Returns a new {@link Write} that writes preserves windowing on it's input.
+ *
+ * <p>If this option is not specified, windowing and triggering are replaced by
+ * {@link GlobalWindows} and {@link DefaultTrigger}.
+ *
+ * <p>If there is no data for a window, no output shards will be generated for that window.
+ * If a window triggers multiple times, then more than a single output shard might be
+ * generated multiple times; it's up to the sink implementation to keep these output shards
+ * unique.
+ *
+ * <p>This option can only be used if {@link #withNumShards(int)} is also set to a
+ * positive value.
+ */
+ public Write<T> withWindowedWrites() {
+ return new Write<>(sink, computeNumShards, numShardsProvider, true);
+ }
+
+ /**
+ * Writes all the elements in a bundle using a {@link Writer} produced by the
+ * {@link WriteOperation} associated with the {@link Sink}.
+ */
+ private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
+ // Writer that will write the records in this bundle. Lazily
+ // initialized in processElement.
+ private Writer<T, WriteT> writer = null;
+ private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
+
+ WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
+ this.writeOperationView = writeOperationView;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+ // Lazily initialize the Writer
+ if (writer == null) {
+ WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+ LOG.info("Opening writer for write operation {}", writeOperation);
+ writer = writeOperation.createWriter(c.getPipelineOptions());
+
+ if (windowedWrites) {
+ writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM,
+ UNKNOWN_NUMSHARDS);
+ } else {
+ writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
+ }
+ LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
+ }
+ try {
+ writer.write(c.element());
+ } catch (Exception e) {
+ // Discard write result and close the write.
+ try {
+ writer.close();
+ // The writer does not need to be reset, as this DoFn cannot be reused.
+ } catch (Exception closeException) {
+ if (closeException instanceof InterruptedException) {
+ // Do not silently ignore interrupted state.
+ Thread.currentThread().interrupt();
+ }
+ // Do not mask the exception that caused the write to fail.
+ e.addSuppressed(closeException);
+ }
+ throw e;
+ }
+ }
+
+ @FinishBundle
+ public void finishBundle(Context c) throws Exception {
+ if (writer != null) {
+ WriteT result = writer.close();
+ c.output(result);
+ // Reset state in case of reuse.
+ writer = null;
+ }
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.delegate(Write.this);
+ }
+ }
+
+ /**
+ * Like {@link WriteBundles}, but where the elements for each shard have been collected into
+ * a single iterable.
+ *
+ * @see WriteBundles
+ */
+ private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
+ private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
+ private final PCollectionView<Integer> numShardsView;
+
+ WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView,
+ PCollectionView<Integer> numShardsView) {
+ this.writeOperationView = writeOperationView;
+ this.numShardsView = numShardsView;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+ int numShards = numShardsView != null ? c.sideInput(numShardsView) : getNumShards().get();
+ // In a sharded write, single input element represents one shard. We can open and close
+ // the writer in each call to processElement.
+ WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+ LOG.info("Opening writer for write operation {}", writeOperation);
+ Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+ if (windowedWrites) {
+ writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(),
+ numShards);
+ } else {
+ writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
+ }
+ LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
+
+ try {
+ try {
+ for (T t : c.element().getValue()) {
+ writer.write(t);
+ }
+ } catch (Exception e) {
+ try {
+ writer.close();
+ } catch (Exception closeException) {
+ if (closeException instanceof InterruptedException) {
+ // Do not silently ignore interrupted state.
+ Thread.currentThread().interrupt();
+ }
+ // Do not mask the exception that caused the write to fail.
+ e.addSuppressed(closeException);
+ }
+ throw e;
+ }
+
+ // Close the writer; if this throws let the error propagate.
+ WriteT result = writer.close();
+ c.output(result);
+ } catch (Exception e) {
+ // If anything goes wrong, make sure to delete the temporary file.
+ writer.cleanup();
+ throw e;
+ }
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.delegate(Write.this);
+ }
+ }
+
+ private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
+ private final PCollectionView<Integer> numShardsView;
+ private final ValueProvider<Integer> numShardsProvider;
+ private int shardNumber;
+
+ ApplyShardingKey(PCollectionView<Integer> numShardsView,
+ ValueProvider<Integer> numShardsProvider) {
+ this.numShardsView = numShardsView;
+ this.numShardsProvider = numShardsProvider;
+ shardNumber = UNKNOWN_SHARDNUM;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ int shardCount = 0;
+ if (numShardsView != null) {
+ shardCount = context.sideInput(numShardsView);
+ } else {
+ checkNotNull(numShardsProvider);
+ shardCount = numShardsProvider.get();
+ }
+ checkArgument(
+ shardCount > 0,
+ "Must have a positive number of shards specified for non-runner-determined sharding."
+ + " Got %s",
+ shardCount);
+ if (shardNumber == UNKNOWN_SHARDNUM) {
+ // We want to desynchronize the first record sharding key for each instance of
+ // ApplyShardingKey, so records in a small PCollection will be statistically balanced.
+ shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
+ } else {
+ shardNumber = (shardNumber + 1) % shardCount;
+ }
+ context.output(KV.of(shardNumber, context.element()));
+ }
+ }
+
+ /**
+ * A write is performed as sequence of three {@link ParDo}'s.
+ *
+ * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's
+ * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is
+ * called. The output of this ParDo is a singleton PCollection
+ * containing the WriteOperation.
+ *
+ * <p>This singleton collection containing the WriteOperation is then used as a side input to a
+ * ParDo over the PCollection of elements to write. In this bundle-writing phase,
+ * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
+ * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
+ * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for
+ * every element in the bundle. The output of this ParDo is a PCollection of
+ * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
+ * each bundle.
+ *
+ * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and
+ * the collection of writer results as a side-input. In this ParDo,
+ * {@link WriteOperation#finalize} is called to finalize the write.
+ *
+ * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
+ * before the exception that caused the write to fail is propagated and the write result will be
+ * discarded.
+ *
+ * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
+ * deserialized in the bundle-writing and finalization phases, any state change to the
+ * WriteOperation object that occurs during initialization is visible in the latter phases.
+ * However, the WriteOperation is not serialized after the bundle-writing phase. This is why
+ * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
+ * WriteOperation).
+ */
+ private <WriteT> PDone createWrite(
+ PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
+ Pipeline p = input.getPipeline();
+ writeOperation.setWindowedWrites(windowedWrites);
+
+ // A coder to use for the WriteOperation.
+ @SuppressWarnings("unchecked")
+ Coder<WriteOperation<T, WriteT>> operationCoder =
+ (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass());
+
+ // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
+ // the sink.
+ PCollection<WriteOperation<T, WriteT>> operationCollection =
+ p.apply("CreateOperationCollection", Create.of(writeOperation).withCoder(operationCoder));
+
+ // Initialize the resource in a do-once ParDo on the WriteOperation.
+ operationCollection = operationCollection
+ .apply("Initialize", ParDo.of(
+ new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ WriteOperation<T, WriteT> writeOperation = c.element();
+ LOG.info("Initializing write operation {}", writeOperation);
+ writeOperation.initialize(c.getPipelineOptions());
+ writeOperation.setWindowedWrites(windowedWrites);
+ LOG.debug("Done initializing write operation {}", writeOperation);
+ // The WriteOperation is also the output of this ParDo, so it can have mutable
+ // state.
+ c.output(writeOperation);
+ }
+ }))
+ .setCoder(operationCoder);
+
+ // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase.
+ final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
+ operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
+
+ if (!windowedWrites) {
+ // Re-window the data into the global window and remove any existing triggers.
+ input =
+ input.apply(
+ Window.<T>into(new GlobalWindows())
+ .triggering(DefaultTrigger.of())
+ .discardingFiredPanes());
+ }
+
+
+ // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
+ // as a side input) and collect the results of the writes in a PCollection.
+ // There is a dependency between this ParDo and the first (the WriteOperation PCollection
+ // as a side input), so this will happen after the initial ParDo.
+ PCollection<WriteT> results;
+ final PCollectionView<Integer> numShardsView;
+ if (computeNumShards == null && numShardsProvider == null) {
+ if (windowedWrites) {
+ throw new IllegalStateException("When doing windowed writes, numShards must be set"
+ + "explicitly to a positive value");
+ }
+ numShardsView = null;
+ results = input
+ .apply("WriteBundles",
+ ParDo.of(new WriteBundles<>(writeOperationView))
+ .withSideInputs(writeOperationView));
+ } else {
+ if (computeNumShards != null) {
+ numShardsView = input.apply(computeNumShards);
+ results = input
+ .apply("ApplyShardLabel", ParDo.of(
+ new ApplyShardingKey<T>(numShardsView, null)).withSideInputs(numShardsView))
+ .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
+ .apply("WriteShardedBundles",
+ ParDo.of(new WriteShardedBundles<>(writeOperationView, numShardsView))
+ .withSideInputs(numShardsView, writeOperationView));
+ } else {
+ numShardsView = null;
+ results = input
+ .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, numShardsProvider)))
+ .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
+ .apply("WriteShardedBundles",
+ ParDo.of(new WriteShardedBundles<>(writeOperationView, null))
+ .withSideInputs(writeOperationView));
+ }
+ }
+ results.setCoder(writeOperation.getWriterResultCoder());
+
+ if (windowedWrites) {
+ // When processing streaming windowed writes, results will arrive multiple times. This
+ // means we can't share the below implementation that turns the results into a side input,
+ // as new data arriving into a side input does not trigger the listening DoFn. Instead
+ // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered
+ // whenever new data arrives.
+ PCollection<KV<Void, WriteT>> keyedResults =
+ results.apply("AttachSingletonKey", WithKeys.<Void, WriteT>of((Void) null));
+ keyedResults.setCoder(KvCoder.<Void, WriteT>of(VoidCoder.of(), writeOperation
+ .getWriterResultCoder()));
+
+ // Is the continuation trigger sufficient?
+ keyedResults
+ .apply("FinalizeGroupByKey", GroupByKey.<Void, WriteT>create())
+ .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<WriteT>>, Integer>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+ LOG.info("Finalizing write operation {}.", writeOperation);
+ List<WriteT> results = Lists.newArrayList(c.element().getValue());
+ writeOperation.finalize(results, c.getPipelineOptions());
+ LOG.debug("Done finalizing write operation {}", writeOperation);
+ }
+ }).withSideInputs(writeOperationView));
+ } else {
+ final PCollectionView<Iterable<WriteT>> resultsView =
+ results.apply(View.<WriteT>asIterable());
+ ImmutableList.Builder<PCollectionView<?>> sideInputs =
+ ImmutableList.<PCollectionView<?>>builder().add(resultsView);
+ if (numShardsView != null) {
+ sideInputs.add(numShardsView);
+ }
+
+ // Finalize the write in another do-once ParDo on the singleton collection containing the
+ // Writer. The results from the per-bundle writes are given as an Iterable side input.
+ // The WriteOperation's state is the same as after its initialization in the first do-once
+ // ParDo. There is a dependency between this ParDo and the parallel write (the writer
+ // results collection as a side input), so it will happen after the parallel write.
+ // For the non-windowed case, we guarantee that if no data is written but the user has
+ // set numShards, then all shards will be written out as empty files. For this reason we
+ // use a side input here.
+ operationCollection
+ .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ WriteOperation<T, WriteT> writeOperation = c.element();
+ LOG.info("Finalizing write operation {}.", writeOperation);
+ List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
+ LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
+
+ // We must always output at least 1 shard, and honor user-specified numShards if
+ // set.
+ int minShardsNeeded;
+ if (numShardsView != null) {
+ minShardsNeeded = c.sideInput(numShardsView);
+ } else if (numShardsProvider != null) {
+ minShardsNeeded = numShardsProvider.get();
+ } else {
+ minShardsNeeded = 1;
+ }
+ int extraShardsNeeded = minShardsNeeded - results.size();
+ if (extraShardsNeeded > 0) {
+ LOG.info(
+ "Creating {} empty output shards in addition to {} written for a total of "
+ + " {}.", extraShardsNeeded, results.size(), minShardsNeeded);
+ for (int i = 0; i < extraShardsNeeded; ++i) {
+ Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+ writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM,
+ UNKNOWN_NUMSHARDS);
+ WriteT emptyWrite = writer.close();
+ results.add(emptyWrite);
+ }
+ LOG.debug("Done creating extra shards.");
+ }
+ writeOperation.finalize(results, c.getPipelineOptions());
+ LOG.debug("Done finalizing write operation {}", writeOperation);
+ }
+ }).withSideInputs(sideInputs.build()));
+ }
+ return PDone.in(input.getPipeline());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
new file mode 100644
index 0000000..763b30a
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms used to read from the Hadoop file system (HDFS).
+ */
+package org.apache.beam.sdk.io.hdfs;
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
new file mode 100644
index 0000000..9fa6606
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.base.MoreObjects;
+import java.io.File;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for HDFSFileSinkTest.
+ */
+public class HDFSFileSinkTest {
+
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ private final String part0 = "part-r-00000";
+ private final String foobar = "foobar";
+
+ private <T> void doWrite(Sink<T> sink,
+ PipelineOptions options,
+ Iterable<T> toWrite) throws Exception {
+ Sink.WriteOperation<T, String> writeOperation =
+ (Sink.WriteOperation<T, String>) sink.createWriteOperation();
+ Sink.Writer<T, String> writer = writeOperation.createWriter(options);
+ writer.openUnwindowed(UUID.randomUUID().toString(), -1, -1);
+ for (T t: toWrite) {
+ writer.write(t);
+ }
+ String writeResult = writer.close();
+ writeOperation.finalize(Collections.singletonList(writeResult), options);
+ }
+
+ @Test
+ public void testWriteSingleRecord() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ File file = tmpFolder.newFolder();
+
+ HDFSFileSink<String, NullWritable, Text> sink =
+ HDFSFileSink.to(
+ file.toString(),
+ SequenceFileOutputFormat.class,
+ NullWritable.class,
+ Text.class,
+ new SerializableFunction<String, KV<NullWritable, Text>>() {
+ @Override
+ public KV<NullWritable, Text> apply(String input) {
+ return KV.of(NullWritable.get(), new Text(input));
+ }
+ });
+
+ doWrite(sink, options, Collections.singletonList(foobar));
+
+ SequenceFile.Reader.Option opts =
+ SequenceFile.Reader.file(new Path(file.toString(), part0));
+ SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(), opts);
+ assertEquals(NullWritable.class.getName(), reader.getKeyClassName());
+ assertEquals(Text.class.getName(), reader.getValueClassName());
+ NullWritable k = NullWritable.get();
+ Text v = new Text();
+ assertEquals(true, reader.next(k, v));
+ assertEquals(NullWritable.get(), k);
+ assertEquals(new Text(foobar), v);
+ }
+
+ @Test
+ public void testToText() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ File file = tmpFolder.newFolder();
+
+ HDFSFileSink<String, NullWritable, Text> sink = HDFSFileSink.toText(file.toString());
+
+ doWrite(sink, options, Collections.singletonList(foobar));
+
+ List<String> strings = Files.readAllLines(new File(file.toString(), part0).toPath(),
+ Charset.forName("UTF-8"));
+ assertEquals(Collections.singletonList(foobar), strings);
+ }
+
+ @DefaultCoder(AvroCoder.class)
+ static class GenericClass {
+ int intField;
+ String stringField;
+ public GenericClass() {}
+ public GenericClass(int intValue, String stringValue) {
+ this.intField = intValue;
+ this.stringField = stringValue;
+ }
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("intField", intField)
+ .add("stringField", stringField)
+ .toString();
+ }
+ @Override
+ public int hashCode() {
+ return Objects.hash(intField, stringField);
+ }
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || !(other instanceof GenericClass)) {
+ return false;
+ }
+ GenericClass o = (GenericClass) other;
+ return Objects.equals(intField, o.intField) && Objects.equals(stringField, o.stringField);
+ }
+ }
+
+ @Test
+ public void testToAvro() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ File file = tmpFolder.newFolder();
+
+ HDFSFileSink<GenericClass, AvroKey<GenericClass>, NullWritable> sink = HDFSFileSink.toAvro(
+ file.toString(),
+ AvroCoder.of(GenericClass.class),
+ new Configuration(false));
+
+ doWrite(sink, options, Collections.singletonList(new GenericClass(3, "foobar")));
+
+ GenericDatumReader datumReader = new GenericDatumReader();
+ FileReader<GenericData.Record> reader =
+ DataFileReader.openReader(new File(file.getAbsolutePath(), part0 + ".avro"), datumReader);
+ GenericData.Record next = reader.next(null);
+ assertEquals("foobar", next.get("stringField").toString());
+ assertEquals(3, next.get("intField"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
new file mode 100644
index 0000000..a964239
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for HDFSFileSource.
+ */
+public class HDFSFileSourceTest {
+
+ private Random random = new Random(0L);
+
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Test
+ public void testFullyReadSingleFile() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0);
+ File file = createFileWithData("tmp.seq", expectedResults);
+
+ HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
+ HDFSFileSource.from(
+ file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class);
+
+ assertEquals(file.length(), source.getEstimatedSizeBytes(null));
+
+ assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
+ }
+
+ @Test
+ public void testFullyReadSingleFileWithSpaces() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0);
+ File file = createFileWithData("tmp data.seq", expectedResults);
+
+ HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
+ HDFSFileSource.from(
+ file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class);
+
+ assertEquals(file.length(), source.getEstimatedSizeBytes(null));
+
+ assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
+ }
+
+ @Test
+ public void testFullyReadFilePattern() throws IOException {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
+ File file1 = createFileWithData("file1", data1);
+
+ List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
+ createFileWithData("file2", data2);
+
+ List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
+ createFileWithData("file3", data3);
+
+ List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
+ createFileWithData("otherfile", data4);
+
+ List<KV<IntWritable, Text>> expectedResults = new ArrayList<>();
+ expectedResults.addAll(data1);
+ expectedResults.addAll(data2);
+ expectedResults.addAll(data3);
+
+ HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
+ HDFSFileSource.from(
+ new File(file1.getParent(), "file*").toString(), SequenceFileInputFormat.class,
+ IntWritable.class, Text.class);
+
+ assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
+ }
+
+ @Test
+ public void testCloseUnstartedFilePatternReader() throws IOException {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
+ File file1 = createFileWithData("file1", data1);
+
+ List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
+ createFileWithData("file2", data2);
+
+ List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
+ createFileWithData("file3", data3);
+
+ List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
+ createFileWithData("otherfile", data4);
+
+ HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
+ HDFSFileSource.from(
+ new File(file1.getParent(), "file*").toString(),
+ SequenceFileInputFormat.class, IntWritable.class, Text.class);
+ Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options);
+
+ // Closing an unstarted FilePatternReader should not throw an exception.
+ try {
+ reader.close();
+ } catch (Exception e) {
+ fail("Closing an unstarted FilePatternReader should not throw an exception");
+ }
+ }
+
+ @Test
+ public void testSplits() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0);
+ File file = createFileWithData("tmp.seq", expectedResults);
+
+ HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
+ HDFSFileSource.from(
+ file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class);
+
+ // Assert that the source produces the expected records
+ assertEquals(expectedResults, readFromSource(source, options));
+
+ // Split with a small bundle size (has to be at least size of sync interval)
+ List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source
+ .split(SequenceFile.SYNC_INTERVAL, options);
+ assertTrue(splits.size() > 2);
+ SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
+ int nonEmptySplits = 0;
+ for (BoundedSource<KV<IntWritable, Text>> subSource : splits) {
+ if (readFromSource(subSource, options).size() > 0) {
+ nonEmptySplits += 1;
+ }
+ }
+ assertTrue(nonEmptySplits > 2);
+ }
+
+ @Test
+ public void testSplitEstimatedSize() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0);
+ File file = createFileWithData("tmp.avro", expectedResults);
+
+ HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
+ HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
+ IntWritable.class, Text.class);
+
+ long originalSize = source.getEstimatedSizeBytes(options);
+ long splitTotalSize = 0;
+ List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source.split(
+ SequenceFile.SYNC_INTERVAL, options
+ );
+ for (BoundedSource<KV<IntWritable, Text>> splitSource : splits) {
+ splitTotalSize += splitSource.getEstimatedSizeBytes(options);
+ }
+ // Assert that the estimated size of the whole is the sum of its parts
+ assertEquals(originalSize, splitTotalSize);
+ }
+
+ private File createFileWithData(String filename, List<KV<IntWritable, Text>> records)
+ throws IOException {
+ File tmpFile = tmpFolder.newFile(filename);
+ try (Writer writer = SequenceFile.createWriter(new Configuration(),
+ Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
+ Writer.file(new Path(tmpFile.toURI())))) {
+
+ for (KV<IntWritable, Text> record : records) {
+ writer.append(record.getKey(), record.getValue());
+ }
+ }
+ return tmpFile;
+ }
+
+ private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength,
+ int numItems, int offset) {
+ List<KV<IntWritable, Text>> records = new ArrayList<>();
+ for (int i = 0; i < numItems; i++) {
+ IntWritable key = new IntWritable(i + offset);
+ Text value = new Text(createRandomString(dataItemLength));
+ records.add(KV.of(key, value));
+ }
+ return records;
+ }
+
+ private String createRandomString(int length) {
+ char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < length; i++) {
+ builder.append(chars[random.nextInt(chars.length)]);
+ }
+ return builder.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
new file mode 100644
index 0000000..6963116
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertThat;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.AbstractMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.hadoop.conf.Configuration;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test for {@link HadoopFileSystemModule}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopFileSystemModuleTest {
+ @Test
+ public void testObjectMapperIsAbleToFindModule() throws Exception {
+ List<Module> modules = ObjectMapper.findModules(ReflectHelpers.findClassLoader());
+ assertThat(modules, hasItem(Matchers.<Module>instanceOf(HadoopFileSystemModule.class)));
+ }
+
+ @Test
+ public void testConfigurationSerializationDeserialization() throws Exception {
+ Configuration baseConfiguration = new Configuration(false);
+ baseConfiguration.set("testPropertyA", "baseA");
+ baseConfiguration.set("testPropertyC", "baseC");
+ Configuration configuration = new Configuration(false);
+ configuration.addResource(baseConfiguration);
+ configuration.set("testPropertyA", "A");
+ configuration.set("testPropertyB", "B");
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(new HadoopFileSystemModule());
+ String serializedConfiguration = objectMapper.writeValueAsString(configuration);
+ Configuration deserializedConfiguration =
+ objectMapper.readValue(serializedConfiguration, Configuration.class);
+ assertThat(deserializedConfiguration, Matchers.<Map.Entry<String, String>>contains(
+ new AbstractMap.SimpleEntry("testPropertyA", "A"),
+ new AbstractMap.SimpleEntry("testPropertyB", "B"),
+ new AbstractMap.SimpleEntry("testPropertyC", "baseC")));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java
new file mode 100644
index 0000000..2be3d93
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link HadoopFileSystemOptionsRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopFileSystemOptionsRegistrarTest {
+
+ @Test
+ public void testServiceLoader() {
+ for (PipelineOptionsRegistrar registrar
+ : Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) {
+ if (registrar instanceof HadoopFileSystemOptionsRegistrar) {
+ assertThat(registrar.getPipelineOptions(),
+ Matchers.<Class<?>>contains(HadoopFileSystemOptions.class));
+ return;
+ }
+ }
+ fail("Expected to find " + HadoopFileSystemOptionsRegistrar.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
new file mode 100644
index 0000000..634528b
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.util.AbstractMap;
+import java.util.Map;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link HadoopFileSystemOptions}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopFileSystemOptionsTest {
+ @Test
+ public void testParsingHdfsConfiguration() {
+ HadoopFileSystemOptions options = PipelineOptionsFactory.fromArgs(
+ "--hdfsConfiguration=["
+ + "{\"propertyA\": \"A\"},"
+ + "{\"propertyB\": \"B\"}]").as(HadoopFileSystemOptions.class);
+ assertEquals(2, options.getHdfsConfiguration().size());
+ assertThat(options.getHdfsConfiguration().get(0), Matchers.<Map.Entry<String, String>>contains(
+ new AbstractMap.SimpleEntry("propertyA", "A")));
+ assertThat(options.getHdfsConfiguration().get(1), Matchers.<Map.Entry<String, String>>contains(
+ new AbstractMap.SimpleEntry("propertyB", "B")));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
new file mode 100644
index 0000000..96f7102
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.net.URI;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.FileSystemRegistrar;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link HadoopFileSystemRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopFileSystemRegistrarTest {
+
+ @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+ private Configuration configuration;
+ private MiniDFSCluster hdfsCluster;
+ private URI hdfsClusterBaseUri;
+
+ @Before
+ public void setUp() throws Exception {
+ configuration = new Configuration();
+ configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath());
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration);
+ hdfsCluster = builder.build();
+ hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ hdfsCluster.shutdown();
+ }
+
+ @Test
+ public void testServiceLoader() {
+ HadoopFileSystemOptions options = PipelineOptionsFactory.as(HadoopFileSystemOptions.class);
+ options.setHdfsConfiguration(ImmutableList.of(configuration));
+ for (FileSystemRegistrar registrar
+ : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
+ if (registrar instanceof HadoopFileSystemRegistrar) {
+ Iterable<FileSystem> fileSystems = registrar.fromOptions(options);
+ assertEquals(hdfsClusterBaseUri.getScheme(),
+ ((HadoopFileSystem) Iterables.getOnlyElement(fileSystems)).getScheme());
+ return;
+ }
+ }
+ fail("Expected to find " + HadoopFileSystemRegistrar.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
new file mode 100644
index 0000000..cf86c36
--- /dev/null
+++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.io.ByteStreams;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link HadoopFileSystem}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopFileSystemTest {
+
+ @Rule public TestPipeline p = TestPipeline.create();
+ @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule public ExpectedException thrown = ExpectedException.none();
+ private Configuration configuration;
+ private MiniDFSCluster hdfsCluster;
+ private URI hdfsClusterBaseUri;
+ private HadoopFileSystem fileSystem;
+
+ @Before
+ public void setUp() throws Exception {
+ configuration = new Configuration();
+ configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath());
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration);
+ hdfsCluster = builder.build();
+ hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/");
+ fileSystem = new HadoopFileSystem(configuration);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ hdfsCluster.shutdown();
+ }
+
+ @Test
+ public void testCreateAndReadFile() throws Exception {
+ create("testFile", "testData".getBytes());
+ assertArrayEquals("testData".getBytes(), read("testFile"));
+ }
+
+ @Test
+ public void testCopy() throws Exception {
+ create("testFileA", "testDataA".getBytes());
+ create("testFileB", "testDataB".getBytes());
+ fileSystem.copy(
+ ImmutableList.of(
+ testPath("testFileA"),
+ testPath("testFileB")),
+ ImmutableList.of(
+ testPath("copyTestFileA"),
+ testPath("copyTestFileB")));
+ assertArrayEquals("testDataA".getBytes(), read("testFileA"));
+ assertArrayEquals("testDataB".getBytes(), read("testFileB"));
+ assertArrayEquals("testDataA".getBytes(), read("copyTestFileA"));
+ assertArrayEquals("testDataB".getBytes(), read("copyTestFileB"));
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ create("testFileA", "testDataA".getBytes());
+ create("testFileB", "testDataB".getBytes());
+ create("testFileC", "testDataC".getBytes());
+
+ // ensure files exist
+ assertArrayEquals("testDataA".getBytes(), read("testFileA"));
+ assertArrayEquals("testDataB".getBytes(), read("testFileB"));
+ assertArrayEquals("testDataC".getBytes(), read("testFileC"));
+
+ fileSystem.delete(ImmutableList.of(
+ testPath("testFileA"),
+ testPath("testFileC")));
+
+ List<MatchResult> results =
+ fileSystem.match(ImmutableList.of(testPath("testFile*").toString()));
+ assertThat(results, contains(MatchResult.create(Status.OK, ImmutableList.of(
+ Metadata.builder()
+ .setResourceId(testPath("testFileB"))
+ .setIsReadSeekEfficient(true)
+ .setSizeBytes("testDataB".getBytes().length)
+ .build()))));
+ }
+
+ @Test
+ public void testMatch() throws Exception {
+ create("testFileAA", "testDataAA".getBytes());
+ create("testFileA", "testDataA".getBytes());
+ create("testFileB", "testDataB".getBytes());
+
+ // ensure files exist
+ assertArrayEquals("testDataAA".getBytes(), read("testFileAA"));
+ assertArrayEquals("testDataA".getBytes(), read("testFileA"));
+ assertArrayEquals("testDataB".getBytes(), read("testFileB"));
+
+ List<MatchResult> results =
+ fileSystem.match(ImmutableList.of(testPath("testFileA*").toString()));
+ assertEquals(Status.OK, Iterables.getOnlyElement(results).status());
+ assertThat(Iterables.getOnlyElement(results).metadata(), containsInAnyOrder(
+ Metadata.builder()
+ .setResourceId(testPath("testFileAA"))
+ .setIsReadSeekEfficient(true)
+ .setSizeBytes("testDataAA".getBytes().length)
+ .build(),
+ Metadata.builder()
+ .setResourceId(testPath("testFileA"))
+ .setIsReadSeekEfficient(true)
+ .setSizeBytes("testDataA".getBytes().length)
+ .build()));
+ }
+
+ @Test
+ public void testRename() throws Exception {
+ create("testFileA", "testDataA".getBytes());
+ create("testFileB", "testDataB".getBytes());
+
+ // ensure files exist
+ assertArrayEquals("testDataA".getBytes(), read("testFileA"));
+ assertArrayEquals("testDataB".getBytes(), read("testFileB"));
+
+ fileSystem.rename(
+ ImmutableList.of(
+ testPath("testFileA"), testPath("testFileB")),
+ ImmutableList.of(
+ testPath("renameFileA"), testPath("renameFileB")));
+
+ List<MatchResult> results =
+ fileSystem.match(ImmutableList.of(testPath("*").toString()));
+ assertEquals(Status.OK, Iterables.getOnlyElement(results).status());
+ assertThat(Iterables.getOnlyElement(results).metadata(), containsInAnyOrder(
+ Metadata.builder()
+ .setResourceId(testPath("renameFileA"))
+ .setIsReadSeekEfficient(true)
+ .setSizeBytes("testDataA".getBytes().length)
+ .build(),
+ Metadata.builder()
+ .setResourceId(testPath("renameFileB"))
+ .setIsReadSeekEfficient(true)
+ .setSizeBytes("testDataB".getBytes().length)
+ .build()));
+
+ // ensure files exist
+ assertArrayEquals("testDataA".getBytes(), read("renameFileA"));
+ assertArrayEquals("testDataB".getBytes(), read("renameFileB"));
+ }
+
+ @Test
+ public void testMatchNewResource() throws Exception {
+ // match file spec
+ assertEquals(testPath("file"),
+ fileSystem.matchNewResource(testPath("file").toString(), false));
+ // match dir spec missing '/'
+ assertEquals(testPath("dir/"),
+ fileSystem.matchNewResource(testPath("dir").toString(), true));
+ // match dir spec with '/'
+ assertEquals(testPath("dir/"),
+ fileSystem.matchNewResource(testPath("dir/").toString(), true));
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Expected file path but received directory path");
+ fileSystem.matchNewResource(testPath("dir/").toString(), false);
+ }
+
+ @Test
+ @Ignore("TestPipeline needs a way to take in HadoopFileSystemOptions")
+ public void testReadPipeline() throws Exception {
+ create("testFileA", "testDataA".getBytes());
+ create("testFileB", "testDataB".getBytes());
+ create("testFileC", "testDataC".getBytes());
+
+ HadoopFileSystemOptions options = TestPipeline.testingPipelineOptions()
+ .as(HadoopFileSystemOptions.class);
+ options.setHdfsConfiguration(ImmutableList.of(fileSystem.fileSystem.getConf()));
+ FileSystems.setDefaultConfigInWorkers(options);
+ PCollection<String> pc = p.apply(
+ TextIO.read().from(testPath("testFile*").toString()));
+ PAssert.that(pc).containsInAnyOrder("testDataA", "testDataB", "testDataC");
+ p.run();
+ }
+
+ private void create(String relativePath, byte[] contents) throws Exception {
+ try (WritableByteChannel channel = fileSystem.create(
+ testPath(relativePath),
+ StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build())) {
+ channel.write(ByteBuffer.wrap(contents));
+ }
+ }
+
+ private byte[] read(String relativePath) throws Exception {
+ try (ReadableByteChannel channel = fileSystem.open(testPath(relativePath))) {
+ return ByteStreams.toByteArray(Channels.newInputStream(channel));
+ }
+ }
+
+ private HadoopResourceId testPath(String relativePath) {
+ return new HadoopResourceId(hdfsClusterBaseUri.resolve(relativePath));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/README.md b/sdks/java/io/hdfs/README.md
deleted file mode 100644
index 3a734f2..0000000
--- a/sdks/java/io/hdfs/README.md
+++ /dev/null
@@ -1,43 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-
-# HDFS IO
-
-This library provides HDFS sources and sinks to make it possible to read and
-write Apache Hadoop file formats from Apache Beam pipelines.
-
-Currently, only the read path is implemented. A `HDFSFileSource` allows any
-Hadoop `FileInputFormat` to be read as a `PCollection`.
-
-A `HDFSFileSource` can be read from using the
-`org.apache.beam.sdk.io.Read` transform. For example:
-
-```java
-HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class,
- MyKey.class, MyValue.class);
-PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource));
-```
-
-Alternatively, the `readFrom` method is a convenience method that returns a read
-transform. For example:
-
-```java
-PCollection<KV<MyKey, MyValue>> records = pipeline.apply(HDFSFileSource.readFrom(path,
- MyInputFormat.class, MyKey.class, MyValue.class));
-```
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
deleted file mode 100644
index daa3b26..0000000
--- a/sdks/java/io/hdfs/pom.xml
+++ /dev/null
@@ -1,195 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-io-parent</artifactId>
- <version>0.7.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <artifactId>beam-sdks-java-io-hdfs</artifactId>
- <name>Apache Beam :: SDKs :: Java :: IO :: HDFS</name>
- <description>Library to read and write Hadoop/HDFS file formats from Beam.</description>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <systemPropertyVariables>
- <beamUseDummyRunner>false</beamUseDummyRunner>
- </systemPropertyVariables>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <properties>
- <!--
- This is the version of Hadoop used to compile the hadoop-common module.
- This dependency is defined with a provided scope.
- Users must supply their own Hadoop version at runtime.
- -->
- <hadoop.version>2.7.3</hadoop.version>
- </properties>
-
- <dependencyManagement>
- <!--
- We define dependencies here instead of sdks/java/io because
- of a version mimatch between this Hadoop version and other
- Hadoop versions declared in other io submodules.
- -->
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <classifier>tests</classifier>
- <version>${hadoop.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minicluster</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- </dependencies>
- </dependencyManagement>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.auto.service</groupId>
- <artifactId>auto-service</artifactId>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.auto.value</groupId>
- <artifactId>auto-value</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-mapred</artifactId>
- <version>${avro.version}</version>
- <classifier>hadoop2</classifier>
- <exclusions>
- <!-- exclude old Jetty version of servlet API -->
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <!-- test dependencies -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minicluster</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-direct-java</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-all</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>