You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/05 17:42:57 UTC
[1/3] beam git commit: Add windowing support to FileBasedSink
Repository: beam
Updated Branches:
refs/heads/master 8e5cfdea9 -> bc907c58b
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index 948a65b..16f3eb6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.util.List;
@@ -30,7 +29,9 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.Sink.WriteOperation;
import org.apache.beam.sdk.io.Sink.Writer;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -42,7 +43,9 @@ import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -81,9 +84,19 @@ import org.slf4j.LoggerFactory;
public class Write<T> extends PTransform<PCollection<T>, PDone> {
private static final Logger LOG = LoggerFactory.getLogger(Write.class);
+ private static final int UNKNOWN_SHARDNUM = -1;
+ private static final int UNKNOWN_NUMSHARDS = -1;
+
private final Sink<T> sink;
+ // This allows the number of shards to be dynamically computed based on the input
+ // PCollection.
@Nullable
private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
+ // We don't use a side input for static sharding, as we want this value to be updatable
+ // when a pipeline is updated.
+ @Nullable
+ private final ValueProvider<Integer> numShardsProvider;
+ private boolean windowedWrites;
/**
* Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner
@@ -91,21 +104,24 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
*/
public static <T> Write<T> to(Sink<T> sink) {
checkNotNull(sink, "sink");
- return new Write<>(sink, null /* runner-determined sharding */);
+ return new Write<>(sink, null /* runner-determined sharding */, null, false);
}
private Write(
Sink<T> sink,
- @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards) {
+ @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards,
+ @Nullable ValueProvider<Integer> numShardsProvider,
+ boolean windowedWrites) {
this.sink = sink;
this.computeNumShards = computeNumShards;
+ this.numShardsProvider = numShardsProvider;
+ this.windowedWrites = windowedWrites;
}
@Override
public PDone expand(PCollection<T> input) {
- checkArgument(
- IsBounded.BOUNDED == input.isBounded(),
- "%s can only be applied to a Bounded PCollection",
+ checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites,
+ "%s can only be applied to an unbounded PCollection if doing windowed writes",
Write.class.getSimpleName());
PipelineOptions options = input.getPipeline().getOptions();
sink.validate(options);
@@ -120,6 +136,11 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
.include("sink", sink);
if (getSharding() != null) {
builder.include("sharding", getSharding());
+ } else if (getNumShards() != null) {
+ String numShards = getNumShards().isAccessible()
+ ? getNumShards().get().toString() : getNumShards().toString();
+ builder.add(DisplayData.item("numShards", numShards)
+ .withLabel("Fixed Number of Shards"));
}
}
@@ -141,6 +162,10 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
return computeNumShards;
}
+ public ValueProvider<Integer> getNumShards() {
+ return numShardsProvider;
+ }
+
/**
* Returns a new {@link Write} that will write to the current {@link Sink} using the
* specified number of shards.
@@ -165,8 +190,8 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
* <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
* more information.
*/
- public Write<T> withNumShards(ValueProvider<Integer> numShards) {
- return new Write<>(sink, new ConstantShards<T>(numShards));
+ public Write<T> withNumShards(ValueProvider<Integer> numShardsProvider) {
+ return new Write<>(sink, null, numShardsProvider, windowedWrites);
}
/**
@@ -179,7 +204,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
public Write<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
checkNotNull(
sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
- return new Write<>(sink, sharding);
+ return new Write<>(sink, sharding, null, windowedWrites);
}
/**
@@ -187,7 +212,25 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
* runner-determined sharding.
*/
public Write<T> withRunnerDeterminedSharding() {
- return new Write<>(sink, null);
+ return new Write<>(sink, null, null, windowedWrites);
+ }
+
+ /**
+ * Returns a new {@link Write} that writes preserves windowing on it's input.
+ *
+ * <p>If this option is not specified, windowing and triggering are replaced by
+ * {@link GlobalWindows} and {@link DefaultTrigger}.
+ *
+ * <p>If there is no data for a window, no output shards will be generated for that window.
+ * If a window triggers multiple times, then more than a single output shard might be
+ * generated multiple times; it's up to the sink implementation to keep these output shards
+ * unique.
+ *
+ * <p>This option can only be used if {@link #withNumShards(int)} is also set to a
+ * positive value.
+ */
+ public Write<T> withWindowedWrites() {
+ return new Write<>(sink, computeNumShards, numShardsProvider, true);
}
/**
@@ -205,13 +248,19 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
}
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
+ public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
// Lazily initialize the Writer
if (writer == null) {
WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
LOG.info("Opening writer for write operation {}", writeOperation);
writer = writeOperation.createWriter(c.getPipelineOptions());
- writer.open(UUID.randomUUID().toString());
+
+ if (windowedWrites) {
+ writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM,
+ UNKNOWN_NUMSHARDS);
+ } else {
+ writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
+ }
LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
}
try {
@@ -257,42 +306,57 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
*/
private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
+ private final PCollectionView<Integer> numShardsView;
- WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
+ WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView,
+ PCollectionView<Integer> numShardsView) {
this.writeOperationView = writeOperationView;
+ this.numShardsView = numShardsView;
}
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
+ public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+ int numShards = numShardsView != null ? c.sideInput(numShardsView) : getNumShards().get();
// In a sharded write, single input element represents one shard. We can open and close
// the writer in each call to processElement.
WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
LOG.info("Opening writer for write operation {}", writeOperation);
Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
- writer.open(UUID.randomUUID().toString());
+ if (windowedWrites) {
+ writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(),
+ numShards);
+ } else {
+ writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
+ }
LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
try {
- for (T t : c.element().getValue()) {
- writer.write(t);
- }
- } catch (Exception e) {
try {
- writer.close();
- } catch (Exception closeException) {
- if (closeException instanceof InterruptedException) {
- // Do not silently ignore interrupted state.
- Thread.currentThread().interrupt();
+ for (T t : c.element().getValue()) {
+ writer.write(t);
}
- // Do not mask the exception that caused the write to fail.
- e.addSuppressed(closeException);
+ } catch (Exception e) {
+ try {
+ writer.close();
+ } catch (Exception closeException) {
+ if (closeException instanceof InterruptedException) {
+ // Do not silently ignore interrupted state.
+ Thread.currentThread().interrupt();
+ }
+ // Do not mask the exception that caused the write to fail.
+ e.addSuppressed(closeException);
+ }
+ throw e;
}
+
+ // Close the writer; if this throws let the error propagate.
+ WriteT result = writer.close();
+ c.output(result);
+ } catch (Exception e) {
+ // If anything goes wrong, make sure to delete the temporary file.
+ writer.cleanup();
throw e;
}
-
- // Close the writer; if this throws let the error propagate.
- WriteT result = writer.close();
- c.output(result);
}
@Override
@@ -302,23 +366,32 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
}
private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
- private final PCollectionView<Integer> numShards;
+ private final PCollectionView<Integer> numShardsView;
+ private final ValueProvider<Integer> numShardsProvider;
private int shardNumber;
- ApplyShardingKey(PCollectionView<Integer> numShards) {
- this.numShards = numShards;
- shardNumber = -1;
+ ApplyShardingKey(PCollectionView<Integer> numShardsView,
+ ValueProvider<Integer> numShardsProvider) {
+ this.numShardsView = numShardsView;
+ this.numShardsProvider = numShardsProvider;
+ shardNumber = UNKNOWN_SHARDNUM;
}
@ProcessElement
public void processElement(ProcessContext context) {
- Integer shardCount = context.sideInput(numShards);
+ int shardCount = 0;
+ if (numShardsView != null) {
+ shardCount = context.sideInput(numShardsView);
+ } else {
+ checkNotNull(numShardsProvider);
+ shardCount = numShardsProvider.get();
+ }
checkArgument(
shardCount > 0,
"Must have a positive number of shards specified for non-runner-determined sharding."
+ " Got %s",
shardCount);
- if (shardNumber == -1) {
+ if (shardNumber == UNKNOWN_SHARDNUM) {
// We want to desynchronize the first record sharding key for each instance of
// ApplyShardingKey, so records in a small PCollection will be statistically balanced.
shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
@@ -340,8 +413,8 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
* <p>This singleton collection containing the WriteOperation is then used as a side input to a
* ParDo over the PCollection of elements to write. In this bundle-writing phase,
* {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
- * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn.StartBundle} and
- * {@link DoFn.FinishBundle}, respectively, and {@link Writer#write} method is called for
+ * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
+ * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for
* every element in the bundle. The output of this ParDo is a PCollection of
* <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
* each bundle.
@@ -364,6 +437,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
private <WriteT> PDone createWrite(
PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
Pipeline p = input.getPipeline();
+ writeOperation.setWindowedWrites(windowedWrites);
// A coder to use for the WriteOperation.
@SuppressWarnings("unchecked")
@@ -373,7 +447,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
// A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
// the sink.
PCollection<WriteOperation<T, WriteT>> operationCollection =
- p.apply(Create.of(writeOperation).withCoder(operationCoder));
+ p.apply("CreateOperationCollection", Create.of(writeOperation).withCoder(operationCoder));
// Initialize the resource in a do-once ParDo on the WriteOperation.
operationCollection = operationCollection
@@ -384,6 +458,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
WriteOperation<T, WriteT> writeOperation = c.element();
LOG.info("Initializing write operation {}", writeOperation);
writeOperation.initialize(c.getPipelineOptions());
+ writeOperation.setWindowedWrites(windowedWrites);
LOG.debug("Done initializing write operation {}", writeOperation);
// The WriteOperation is also the output of this ParDo, so it can have mutable
// state.
@@ -396,133 +471,133 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
- // Re-window the data into the global window and remove any existing triggers.
- PCollection<T> inputInGlobalWindow =
- input.apply(
- Window.<T>into(new GlobalWindows())
- .triggering(DefaultTrigger.of())
- .discardingFiredPanes());
+ if (!windowedWrites) {
+ // Re-window the data into the global window and remove any existing triggers.
+ input =
+ input.apply(
+ Window.<T>into(new GlobalWindows())
+ .triggering(DefaultTrigger.of())
+ .discardingFiredPanes());
+ }
+
// Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
// as a side input) and collect the results of the writes in a PCollection.
// There is a dependency between this ParDo and the first (the WriteOperation PCollection
// as a side input), so this will happen after the initial ParDo.
PCollection<WriteT> results;
- final PCollectionView<Integer> numShards;
- if (computeNumShards == null) {
- numShards = null;
- results =
- inputInGlobalWindow.apply(
- "WriteBundles",
+ final PCollectionView<Integer> numShardsView;
+ if (computeNumShards == null && numShardsProvider == null) {
+ if (windowedWrites) {
+ throw new IllegalStateException("When doing windowed writes, numShards must be set"
+ + "explicitly to a positive value");
+ }
+ numShardsView = null;
+ results = input
+ .apply("WriteBundles",
ParDo.of(new WriteBundles<>(writeOperationView))
.withSideInputs(writeOperationView));
} else {
- numShards = inputInGlobalWindow.apply(computeNumShards);
- results =
- inputInGlobalWindow
- .apply(
- "ApplyShardLabel",
- ParDo.of(new ApplyShardingKey<T>(numShards)).withSideInputs(numShards))
- .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
- .apply(
- "WriteShardedBundles",
- ParDo.of(new WriteShardedBundles<>(writeOperationView))
- .withSideInputs(writeOperationView));
+ if (computeNumShards != null) {
+ numShardsView = input.apply(computeNumShards);
+ results = input
+ .apply("ApplyShardLabel", ParDo.of(
+ new ApplyShardingKey<T>(numShardsView, null)).withSideInputs(numShardsView))
+ .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
+ .apply("WriteShardedBundles",
+ ParDo.of(new WriteShardedBundles<>(writeOperationView, numShardsView))
+ .withSideInputs(numShardsView, writeOperationView));
+ } else {
+ numShardsView = null;
+ results = input
+ .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, numShardsProvider)))
+ .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
+ .apply("WriteShardedBundles",
+ ParDo.of(new WriteShardedBundles<>(writeOperationView, null))
+ .withSideInputs(writeOperationView));
+ }
}
results.setCoder(writeOperation.getWriterResultCoder());
- final PCollectionView<Iterable<WriteT>> resultsView =
- results.apply(View.<WriteT>asIterable());
-
- // Finalize the write in another do-once ParDo on the singleton collection containing the
- // Writer. The results from the per-bundle writes are given as an Iterable side input.
- // The WriteOperation's state is the same as after its initialization in the first do-once
- // ParDo. There is a dependency between this ParDo and the parallel write (the writer results
- // collection as a side input), so it will happen after the parallel write.
- ImmutableList.Builder<PCollectionView<?>> sideInputs =
- ImmutableList.<PCollectionView<?>>builder().add(resultsView);
- if (numShards != null) {
- sideInputs.add(numShards);
- }
- operationCollection
- .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- WriteOperation<T, WriteT> writeOperation = c.element();
- LOG.info("Finalizing write operation {}.", writeOperation);
- List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
- LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
-
- // We must always output at least 1 shard, and honor user-specified numShards if set.
- int minShardsNeeded;
- if (numShards == null) {
- minShardsNeeded = 1;
- } else {
- minShardsNeeded = c.sideInput(numShards);
- checkArgument(
- minShardsNeeded > 0,
- "Must have a positive number of shards for non-runner-determined sharding."
- + " Got %s",
- minShardsNeeded);
+ if (windowedWrites) {
+ // When processing streaming windowed writes, results will arrive multiple times. This
+ // means we can't share the below implementation that turns the results into a side input,
+ // as new data arriving into a side input does not trigger the listening DoFn. Instead
+ // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered
+ // whenever new data arrives.
+ PCollection<KV<Void, WriteT>> keyedResults =
+ results.apply("AttachSingletonKey", WithKeys.<Void, WriteT>of((Void) null));
+ keyedResults.setCoder(KvCoder.<Void, WriteT>of(VoidCoder.of(), writeOperation
+ .getWriterResultCoder()));
+
+ // Is the continuation trigger sufficient?
+ keyedResults
+ .apply("FinalizeGroupByKey", GroupByKey.<Void, WriteT>create())
+ .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<WriteT>>, Integer>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+ LOG.info("Finalizing write operation {}.", writeOperation);
+ List<WriteT> results = Lists.newArrayList(c.element().getValue());
+ writeOperation.finalize(results, c.getPipelineOptions());
+ LOG.debug("Done finalizing write operation {}", writeOperation);
}
- int extraShardsNeeded = minShardsNeeded - results.size();
- if (extraShardsNeeded > 0) {
- LOG.info(
- "Creating {} empty output shards in addition to {} written for a total of {}.",
- extraShardsNeeded, results.size(), minShardsNeeded);
- for (int i = 0; i < extraShardsNeeded; ++i) {
- Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
- writer.open(UUID.randomUUID().toString());
- WriteT emptyWrite = writer.close();
- results.add(emptyWrite);
+ }).withSideInputs(writeOperationView));
+ } else {
+ final PCollectionView<Iterable<WriteT>> resultsView =
+ results.apply(View.<WriteT>asIterable());
+ ImmutableList.Builder<PCollectionView<?>> sideInputs =
+ ImmutableList.<PCollectionView<?>>builder().add(resultsView);
+ if (numShardsView != null) {
+ sideInputs.add(numShardsView);
+ }
+
+ // Finalize the write in another do-once ParDo on the singleton collection containing the
+ // Writer. The results from the per-bundle writes are given as an Iterable side input.
+ // The WriteOperation's state is the same as after its initialization in the first do-once
+ // ParDo. There is a dependency between this ParDo and the parallel write (the writer
+ // results collection as a side input), so it will happen after the parallel write.
+ // For the non-windowed case, we guarantee that if no data is written but the user has
+ // set numShards, then all shards will be written out as empty files. For this reason we
+ // use a side input here.
+ operationCollection
+ .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ WriteOperation<T, WriteT> writeOperation = c.element();
+ LOG.info("Finalizing write operation {}.", writeOperation);
+ List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
+ LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
+
+ // We must always output at least 1 shard, and honor user-specified numShards if
+ // set.
+ int minShardsNeeded;
+ if (numShardsView != null) {
+ minShardsNeeded = c.sideInput(numShardsView);
+ } else if (numShardsProvider != null) {
+ minShardsNeeded = numShardsProvider.get();
+ } else {
+ minShardsNeeded = 1;
}
- LOG.debug("Done creating extra shards.");
+ int extraShardsNeeded = minShardsNeeded - results.size();
+ if (extraShardsNeeded > 0) {
+ LOG.info(
+ "Creating {} empty output shards in addition to {} written for a total of "
+ + " {}.", extraShardsNeeded, results.size(), minShardsNeeded);
+ for (int i = 0; i < extraShardsNeeded; ++i) {
+ Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+ writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM,
+ UNKNOWN_NUMSHARDS);
+ WriteT emptyWrite = writer.close();
+ results.add(emptyWrite);
+ }
+ LOG.debug("Done creating extra shards.");
+ }
+ writeOperation.finalize(results, c.getPipelineOptions());
+ LOG.debug("Done finalizing write operation {}", writeOperation);
}
-
- writeOperation.finalize(results, c.getPipelineOptions());
- LOG.debug("Done finalizing write operation {}", writeOperation);
- }
- }).withSideInputs(sideInputs.build()));
- return PDone.in(input.getPipeline());
- }
-
- @VisibleForTesting
- static class ConstantShards<T>
- extends PTransform<PCollection<T>, PCollectionView<Integer>> {
- private final ValueProvider<Integer> numShards;
-
- private ConstantShards(ValueProvider<Integer> numShards) {
- this.numShards = numShards;
- }
-
- @Override
- public PCollectionView<Integer> expand(PCollection<T> input) {
- return input
- .getPipeline()
- .apply(Create.of(0))
- .apply(
- "FixedNumShards",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void outputNumShards(ProcessContext ctxt) {
- checkArgument(
- numShards.isAccessible(),
- "NumShards must be accessible at runtime to use constant sharding");
- ctxt.output(numShards.get());
- }
- }))
- .apply(View.<Integer>asSingleton());
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.add(
- DisplayData.item("Fixed Number of Shards", numShards).withLabel("ConstantShards"));
- }
-
- public ValueProvider<Integer> getNumShards() {
- return numShards;
+ }).withSideInputs(sideInputs.build()));
}
+ return PDone.in(input.getPipeline());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
index 6937e93..2159c8f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
@@ -176,7 +176,7 @@ public class XmlSink {
* <p>The specified class must be able to be used to create a JAXB context.
*/
public <T> Bound<T> ofRecordClass(Class<T> classToBind) {
- return new Bound<>(classToBind, rootElementName, baseOutputFilename.get());
+ return new Bound<>(classToBind, rootElementName, getBaseOutputFilenameProvider().get());
}
/**
@@ -194,7 +194,7 @@ public class XmlSink {
* supplied name.
*/
public Bound<T> withRootElement(String rootElementName) {
- return new Bound<>(classToBind, rootElementName, baseOutputFilename.get());
+ return new Bound<>(classToBind, rootElementName, getBaseOutputFilenameProvider().get());
}
/**
@@ -205,7 +205,7 @@ public class XmlSink {
public void validate(PipelineOptions options) {
checkNotNull(classToBind, "Missing a class to bind to a JAXB context.");
checkNotNull(rootElementName, "Missing a root element name.");
- checkNotNull(baseOutputFilename, "Missing a filename to write to.");
+ checkNotNull(getBaseOutputFilenameProvider().get(), "Missing a filename to write to.");
try {
JAXBContext.newInstance(classToBind);
} catch (JAXBException e) {
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
index 0739381..e1ad47b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
@@ -17,7 +17,10 @@
*/
package org.apache.beam.sdk.testing;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
import javax.annotation.Nullable;
+
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -35,10 +38,12 @@ public interface TestPipelineOptions extends PipelineOptions {
void setTempRoot(String value);
@Default.InstanceFactory(AlwaysPassMatcherFactory.class)
+ @JsonIgnore
SerializableMatcher<PipelineResult> getOnCreateMatcher();
void setOnCreateMatcher(SerializableMatcher<PipelineResult> value);
@Default.InstanceFactory(AlwaysPassMatcherFactory.class)
+ @JsonIgnore
SerializableMatcher<PipelineResult> getOnSuccessMatcher();
void setOnSuccessMatcher(SerializableMatcher<PipelineResult> value);
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
index dd81a34..6f6ba37 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -45,6 +46,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.regex.Matcher;
import javax.annotation.Nullable;
+
import org.apache.beam.sdk.options.PipelineOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -175,17 +177,20 @@ public class FileIOChannelFactory implements IOChannelFactory {
}
@Override
- public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
+ public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames) throws
+ IOException {
+ List<String> srcList = Lists.newArrayList(srcFilenames);
+ List<String> destList = Lists.newArrayList(destFilenames);
checkArgument(
- srcFilenames.size() == destFilenames.size(),
+ srcList.size() == destList.size(),
"Number of source files %s must equal number of destination files %s",
- srcFilenames.size(),
- destFilenames.size());
- int numFiles = srcFilenames.size();
+ srcList.size(),
+ destList.size());
+ int numFiles = srcList.size();
for (int i = 0; i < numFiles; i++) {
- String src = srcFilenames.get(i);
- String dst = destFilenames.get(i);
- LOG.debug("Copying {} to {}", src, dst);
+ String src = srcList.get(i);
+ String dst = destList.get(i);
+ LOG.info("Copying {} to {}", src, dst);
try {
// Copy the source file, replacing the existing destination.
// Paths.get(x) will not work on Windows OSes cause of the ":" after the drive letter.
@@ -194,7 +199,7 @@ public class FileIOChannelFactory implements IOChannelFactory {
new File(dst).toPath(),
StandardCopyOption.REPLACE_EXISTING);
} catch (NoSuchFileException e) {
- LOG.debug("{} does not exist.", src);
+ LOG.info("{} does not exist.", src);
// Suppress exception if file does not exist.
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
index 9f99cd6..745dcb9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
@@ -99,7 +99,8 @@ public class GcsIOChannelFactory implements IOChannelFactory {
}
@Override
- public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
+ public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames)
+ throws IOException {
options.getGcsUtil().copy(srcFilenames, destFilenames);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 14781c4..1c853bb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -68,6 +68,7 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
+
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -635,23 +636,27 @@ public class GcsUtil {
return batches;
}
- public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
+ public void copy(Iterable<String> srcFilenames,
+ Iterable<String> destFilenames) throws
+ IOException {
executeBatches(makeCopyBatches(srcFilenames, destFilenames));
}
- List<BatchRequest> makeCopyBatches(List<String> srcFilenames, List<String> destFilenames)
+ List<BatchRequest> makeCopyBatches(Iterable<String> srcFilenames, Iterable<String> destFilenames)
throws IOException {
+ List<String> srcList = Lists.newArrayList(srcFilenames);
+ List<String> destList = Lists.newArrayList(destFilenames);
checkArgument(
- srcFilenames.size() == destFilenames.size(),
+ srcList.size() == destList.size(),
"Number of source files %s must equal number of destination files %s",
- srcFilenames.size(),
- destFilenames.size());
+ srcList.size(),
+ destList.size());
List<BatchRequest> batches = new LinkedList<>();
BatchRequest batch = createBatchRequest();
- for (int i = 0; i < srcFilenames.size(); i++) {
- final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i));
- final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i));
+ for (int i = 0; i < srcList.size(); i++) {
+ final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i));
+ final GcsPath destPath = GcsPath.fromUri(destList.get(i));
enqueueCopy(sourcePath, destPath, batch);
if (batch.size() >= MAX_REQUESTS_PER_BATCH) {
batches.add(batch);
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
index 9504f45..3a3af17 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
@@ -23,7 +23,6 @@ import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Path;
import java.util.Collection;
-import java.util.List;
/**
* Defines a factory for working with read and write channels.
@@ -116,7 +115,7 @@ public interface IOChannelFactory {
* @param srcFilenames the source filenames.
* @param destFilenames the destination filenames.
*/
- void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException;
+ void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames) throws IOException;
/**
* Removes a collection of files or directories.
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 19f5ffa..f3dbb05 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -31,13 +31,18 @@ import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Objects;
+import java.util.Random;
import java.util.Set;
+
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
@@ -48,18 +53,29 @@ import org.apache.avro.reflect.Nullable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.AvroIO.Write.Bound;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
@@ -94,7 +110,7 @@ public class AvroIOTest {
}
@Test
- public void testWriteWithoutValidationFlag() throws Exception {
+ public void testWriteWithoutValPuidationFlag() throws Exception {
AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write.to("gs://bucket/foo/baz");
assertTrue(write.needsValidation());
assertFalse(write.withoutValidation().needsValidation());
@@ -275,6 +291,132 @@ public class AvroIOTest {
p.run();
}
+ private TimestampedValue<GenericClass> newValue(GenericClass element, Duration duration) {
+ return TimestampedValue.of(element, new Instant(0).plus(duration));
+ }
+
+ private static class WindowedFilenamePolicy extends FilenamePolicy {
+ String outputFilePrefix;
+
+ WindowedFilenamePolicy(String outputFilePrefix) {
+ this.outputFilePrefix = outputFilePrefix;
+ }
+
+ @Override
+ public ValueProvider<String> getBaseOutputFilenameProvider() {
+ return StaticValueProvider.of(outputFilePrefix);
+ }
+
+ @Override
+ public String windowedFilename(WindowedContext input) {
+ String filename = outputFilePrefix + "-" + input.getWindow().toString() + "-"
+ + input.getShardNumber() + "-of-" + (input.getNumShards() - 1) + "-pane-"
+ + input.getPaneInfo().getIndex();
+ if (input.getPaneInfo().isLast()) {
+ filename += "-final";
+ }
+ return filename;
+ }
+
+ @Override
+ public String unwindowedFilename(Context input) {
+ String filename = outputFilePrefix + input.getShardNumber() + "-of-"
+ + (input.getNumShards() - 1);
+ return filename;
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add(DisplayData.item("fileNamePrefix", outputFilePrefix)
+ .withLabel("File Name Prefix"));
+ }
+ }
+
+ @Rule
+ public TestPipeline windowedAvroWritePipeline = TestPipeline.create();
+
+ @Test
+ @Category({ValidatesRunner.class, UsesTestStream.class })
+ public void testWindowedAvroIOWrite() throws Throwable {
+ File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
+ final String outputFilePrefix = baseOutputFile.getAbsolutePath();
+
+ Instant base = new Instant(0);
+ ArrayList<GenericClass> allElements = new ArrayList<>();
+ ArrayList<TimestampedValue<GenericClass>> firstWindowElements = new ArrayList<>();
+ ArrayList<Instant> firstWindowTimestamps = Lists.newArrayList(
+ base.plus(Duration.standardSeconds(0)), base.plus(Duration.standardSeconds(10)),
+ base.plus(Duration.standardSeconds(20)), base.plus(Duration.standardSeconds(30)));
+
+ Random random = new Random();
+ for (int i = 0; i < 100; ++i) {
+ GenericClass item = new GenericClass(i, String.valueOf(i));
+ allElements.add(item);
+ firstWindowElements.add(TimestampedValue.of(item,
+ firstWindowTimestamps.get(random.nextInt(firstWindowTimestamps.size()))));
+ }
+
+ ArrayList<TimestampedValue<GenericClass>> secondWindowElements = new ArrayList<>();
+ ArrayList<Instant> secondWindowTimestamps = Lists.newArrayList(
+ base.plus(Duration.standardSeconds(60)), base.plus(Duration.standardSeconds(70)),
+ base.plus(Duration.standardSeconds(80)), base.plus(Duration.standardSeconds(90)));
+ for (int i = 100; i < 200; ++i) {
+ GenericClass item = new GenericClass(i, String.valueOf(i));
+ allElements.add(new GenericClass(i, String.valueOf(i)));
+ secondWindowElements.add(TimestampedValue.of(item,
+ secondWindowTimestamps.get(random.nextInt(secondWindowTimestamps.size()))));
+ }
+
+
+ TimestampedValue<GenericClass>[] firstWindowArray =
+ firstWindowElements.toArray(new TimestampedValue[100]);
+ TimestampedValue<GenericClass>[] secondWindowArray =
+ secondWindowElements.toArray(new TimestampedValue[100]);
+
+ TestStream<GenericClass> values = TestStream.create(AvroCoder.of(GenericClass.class))
+ .advanceWatermarkTo(new Instant(0))
+ .addElements(firstWindowArray[0],
+ Arrays.copyOfRange(firstWindowArray, 1, firstWindowArray.length))
+ .advanceWatermarkTo(new Instant(0).plus(Duration.standardMinutes(1)))
+ .addElements(secondWindowArray[0],
+ Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length))
+ .advanceWatermarkToInfinity();
+
+ windowedAvroWritePipeline
+ .apply(values)
+ .apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
+ .apply(AvroIO.Write.to(new WindowedFilenamePolicy(outputFilePrefix))
+ .withWindowedWrites()
+ .withNumShards(2)
+ .withSchema(GenericClass.class));
+ windowedAvroWritePipeline.run();
+
+ // Validate that the data written matches the expected elements in the expected order
+ List<File> expectedFiles = new ArrayList<>();
+ for (int shard = 0; shard < 2; shard++) {
+ for (int window = 0; window < 2; window++) {
+ Instant windowStart = new Instant(0).plus(Duration.standardMinutes(window));
+ IntervalWindow intervalWindow = new IntervalWindow(
+ windowStart, Duration.standardMinutes(1));
+ expectedFiles.add(
+ new File(outputFilePrefix + "-" + intervalWindow.toString() + "-" + shard
+ + "-of-1" + "-pane-0-final"));
+ }
+ }
+
+ List<GenericClass> actualElements = new ArrayList<>();
+ for (File outputFile : expectedFiles) {
+ assertTrue("Expected output file " + outputFile.getAbsolutePath(), outputFile.exists());
+ try (DataFileReader<GenericClass> reader =
+ new DataFileReader<>(outputFile, AvroCoder.of(
+ GenericClass.class).createDatumReader())) {
+ Iterators.addAll(actualElements, reader);
+ }
+ outputFile.delete();
+ }
+ assertThat(actualElements, containsInAnyOrder(allElements.toArray()));
+ }
+
@Test
public void testWriteWithDefaultCodec() throws Exception {
AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
@@ -347,8 +489,10 @@ public class AvroIOTest {
Bound<String> write = AvroIO.Write.to(outputFilePrefix).withSchema(String.class);
if (numShards > 1) {
+ System.out.println("NumShards " + numShards);
write = write.withNumShards(numShards);
} else {
+ System.out.println("no sharding");
write = write.withoutSharding();
}
p.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write);
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index d2c1968..5b81ba8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -17,13 +17,13 @@
*/
package org.apache.beam.sdk.io;
-import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import com.google.common.collect.Lists;
+
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
@@ -41,13 +41,17 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.zip.GZIPInputStream;
import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
import org.apache.beam.sdk.io.FileBasedSink.FileResult;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -99,7 +103,7 @@ public class FileBasedSinkTest {
expected.addAll(values);
expected.add(SimpleSink.SimpleWriter.FOOTER);
- writer.open(testUid);
+ writer.openUnwindowed(testUid, -1, -1);
for (String value : values) {
writer.write(value);
}
@@ -215,20 +219,18 @@ public class FileBasedSinkTest {
int numFiles = temporaryFiles.size();
- List<File> outputFiles = new ArrayList<>();
List<FileResult> fileResults = new ArrayList<>();
- List<String> outputFilenames = writeOp.generateDestinationFilenames(numFiles);
-
- // Create temporary output bundles and output File objects
+ // Create temporary output bundles and output File objects.
for (int i = 0; i < numFiles; i++) {
- fileResults.add(new FileResult(temporaryFiles.get(i).toString()));
- outputFiles.add(new File(outputFilenames.get(i)));
+ fileResults.add(new FileResult(temporaryFiles.get(i).toString(), null));
}
writeOp.finalize(fileResults, options);
for (int i = 0; i < numFiles; i++) {
- assertTrue(outputFiles.get(i).exists());
+ String outputFilename = writeOp.getSink().getFileNamePolicy().unwindowedFilename(
+ new Context(i, numFiles));
+ assertTrue(new File(outputFilename).exists());
assertFalse(temporaryFiles.get(i).exists());
}
@@ -258,7 +260,7 @@ public class FileBasedSinkTest {
outputFiles.add(outputFile);
}
- writeOp.removeTemporaryFiles(Collections.<String>emptyList(), options);
+ writeOp.removeTemporaryFiles(Collections.<String>emptySet(), true, options);
for (int i = 0; i < numFiles; i++) {
assertFalse(temporaryFiles.get(i).exists());
@@ -274,12 +276,12 @@ public class FileBasedSinkTest {
PipelineOptions options = PipelineOptionsFactory.create();
SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
- List<String> inputFilenames = Arrays.asList("input-3", "input-2", "input-1");
- List<String> inputContents = Arrays.asList("3", "2", "1");
+ List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3");
+ List<String> inputContents = Arrays.asList("1", "2", "3");
List<String> expectedOutputFilenames = Arrays.asList(
- "output-00002-of-00003.test", "output-00001-of-00003.test", "output-00000-of-00003.test");
+ "output-00000-of-00003.test", "output-00001-of-00003.test", "output-00002-of-00003.test");
- List<String> inputFilePaths = new ArrayList<>();
+ Map<String, String> inputFilePaths = new HashMap<>();
List<String> expectedOutputPaths = new ArrayList<>();
for (int i = 0; i < inputFilenames.size(); i++) {
@@ -291,14 +293,13 @@ public class FileBasedSinkTest {
File inputTmpFile = tmpFolder.newFile(inputFilenames.get(i));
List<String> lines = Arrays.asList(inputContents.get(i));
writeFile(lines, inputTmpFile);
- inputFilePaths.add(inputTmpFile.toString());
+ inputFilePaths.put(inputTmpFile.toString(),
+ writeOp.getSink().getFileNamePolicy().unwindowedFilename(
+ new Context(i, inputFilenames.size())));
}
// Copy input files to output files.
- List<String> actual = writeOp.copyToOutputFiles(inputFilePaths, options);
-
- // Assert that the expected paths are returned.
- assertThat(expectedOutputPaths, containsInAnyOrder(actual.toArray()));
+ writeOp.copyToOutputFiles(inputFilePaths, options);
// Assert that the contents were copied.
for (int i = 0; i < expectedOutputPaths.size(); i++) {
@@ -306,6 +307,14 @@ public class FileBasedSinkTest {
}
}
+ public List<String> generateDestinationFilenames(FilenamePolicy policy, int numFiles) {
+ List<String> filenames = new ArrayList<>();
+ for (int i = 0; i < numFiles; i++) {
+ filenames.add(policy.unwindowedFilename(new Context(i, numFiles)));
+ }
+ return filenames;
+ }
+
/**
* Output filenames use the supplied naming template.
*/
@@ -314,36 +323,35 @@ public class FileBasedSinkTest {
List<String> expected;
List<String> actual;
SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "test", ".SS.of.NN");
- SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink);
+ FilenamePolicy policy = sink.getFileNamePolicy();
expected = Arrays.asList(appendToTempFolder("output.00.of.03.test"),
appendToTempFolder("output.01.of.03.test"), appendToTempFolder("output.02.of.03.test"));
- actual = writeOp.generateDestinationFilenames(3);
+ actual = generateDestinationFilenames(policy, 3);
assertEquals(expected, actual);
expected = Arrays.asList(appendToTempFolder("output.00.of.01.test"));
- actual = writeOp.generateDestinationFilenames(1);
+ actual = generateDestinationFilenames(policy, 1);
assertEquals(expected, actual);
expected = new ArrayList<>();
- actual = writeOp.generateDestinationFilenames(0);
+ actual = generateDestinationFilenames(policy, 0);
assertEquals(expected, actual);
// Also validate that we handle the case where the user specified "." that we do
// not prefix an additional "." making "..test"
sink = new SimpleSink(getBaseOutputFilename(), ".test", ".SS.of.NN");
- writeOp = new SimpleSink.SimpleWriteOperation(sink);
expected = Arrays.asList(appendToTempFolder("output.00.of.03.test"),
appendToTempFolder("output.01.of.03.test"), appendToTempFolder("output.02.of.03.test"));
- actual = writeOp.generateDestinationFilenames(3);
+ actual = generateDestinationFilenames(policy, 3);
assertEquals(expected, actual);
expected = Arrays.asList(appendToTempFolder("output.00.of.01.test"));
- actual = writeOp.generateDestinationFilenames(1);
+ actual = generateDestinationFilenames(policy, 1);
assertEquals(expected, actual);
expected = new ArrayList<>();
- actual = writeOp.generateDestinationFilenames(0);
+ actual = generateDestinationFilenames(policy, 0);
assertEquals(expected, actual);
}
@@ -355,20 +363,21 @@ public class FileBasedSinkTest {
List<String> expected;
List<String> actual;
SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
+ FilenamePolicy policy = writeOp.getSink().getFileNamePolicy();
expected = Arrays.asList(
appendToTempFolder("output-00000-of-00003.test"),
appendToTempFolder("output-00001-of-00003.test"),
appendToTempFolder("output-00002-of-00003.test"));
- actual = writeOp.generateDestinationFilenames(3);
+ actual = generateDestinationFilenames(policy, 3);
assertEquals(expected, actual);
expected = Arrays.asList(appendToTempFolder("output-00000-of-00001.test"));
- actual = writeOp.generateDestinationFilenames(1);
+ actual = generateDestinationFilenames(policy, 1);
assertEquals(expected, actual);
expected = new ArrayList<>();
- actual = writeOp.generateDestinationFilenames(0);
+ actual = generateDestinationFilenames(policy, 0);
assertEquals(expected, actual);
}
@@ -380,16 +389,17 @@ public class FileBasedSinkTest {
SimpleSink sink = new SimpleSink("output", "test", "-NN");
SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink);
- // A single shard doesn't need to include the shard number.
- assertEquals(Arrays.asList("output-01.test"),
- writeOp.generateDestinationFilenames(1));
-
// More than one shard does.
try {
- writeOp.generateDestinationFilenames(3);
+ Iterable<FileResult> results = Lists.newArrayList(
+ new FileResult("temp1", "file1"),
+ new FileResult("temp2", "file1"),
+ new FileResult("temp3", "file1"));
+
+ writeOp.buildOutputFilenames(results);
fail("Should have failed.");
} catch (IllegalStateException exn) {
- assertEquals("Shard name template '-NN' only generated 1 distinct file names for 3 files.",
+ assertEquals("Only generated 1 distinct file names for 3 files.",
exn.getMessage());
}
}
@@ -402,19 +412,19 @@ public class FileBasedSinkTest {
List<String> expected;
List<String> actual;
SimpleSink sink = new SimpleSink(appendToTempFolder(baseOutputFilename), "");
- SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink);
+ FilenamePolicy policy = sink.getFileNamePolicy();
expected = Arrays.asList(appendToTempFolder("output-00000-of-00003"),
appendToTempFolder("output-00001-of-00003"), appendToTempFolder("output-00002-of-00003"));
- actual = writeOp.generateDestinationFilenames(3);
+ actual = generateDestinationFilenames(policy, 3);
assertEquals(expected, actual);
expected = Arrays.asList(appendToTempFolder("output-00000-of-00001"));
- actual = writeOp.generateDestinationFilenames(1);
+ actual = generateDestinationFilenames(policy, 1);
assertEquals(expected, actual);
expected = new ArrayList<>();
- actual = writeOp.generateDestinationFilenames(0);
+ actual = generateDestinationFilenames(policy, 0);
assertEquals(expected, actual);
}
@@ -513,7 +523,7 @@ public class FileBasedSinkTest {
expected.add("footer");
expected.add("footer");
- writer.open(testUid);
+ writer.openUnwindowed(testUid, -1, -1);
writer.write("a");
writer.write("b");
final FileResult result = writer.close();
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 3ecbed4..16d7f2a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -49,10 +49,10 @@ import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.Sink.WriteOperation;
import org.apache.beam.sdk.io.Sink.Writer;
-import org.apache.beam.sdk.io.Write.ConstantShards;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
@@ -63,11 +63,12 @@ import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.ToString;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
@@ -311,8 +312,10 @@ public class WriteTest {
assertThat(write.getSink(), is(sink));
PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding =
write.getSharding();
- assertThat(write.getSharding(), instanceOf(ConstantShards.class));
- assertThat(((ConstantShards<String>) write.getSharding()).getNumShards().get(), equalTo(3));
+
+ assertThat(write.getSharding(), is(nullValue()));
+ assertThat(write.getNumShards(), instanceOf(StaticValueProvider.class));
+ assertThat(write.getNumShards().get(), equalTo(3));
assertThat(write.getSharding(), equalTo(originalSharding));
Write<String> write2 = write.withSharding(SHARDING_TRANSFORM);
@@ -352,7 +355,7 @@ public class WriteTest {
DisplayData displayData = DisplayData.from(write);
assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
assertThat(displayData, includesDisplayDataFor("sink", sink));
- assertThat(displayData, hasDisplayItem("Fixed Number of Shards", 1));
+ assertThat(displayData, hasDisplayItem("numShards", "1"));
}
@Test
@@ -383,17 +386,6 @@ public class WriteTest {
assertThat(displayData, hasDisplayItem("spam", "ham"));
}
- @Test
- public void testWriteUnbounded() {
- PCollection<String> unbounded = p.apply(CountingInput.unbounded())
- .apply(ToString.elements());
-
- TestSink sink = new TestSink();
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Write can only be applied to a Bounded PCollection");
- unbounded.apply(Write.to(sink));
- }
-
/**
* Performs a Write transform and verifies the Write transform calls the appropriate methods on
* a test sink in the correct order, as well as verifies that the elements of a PCollection are
@@ -535,6 +527,10 @@ public class WriteTest {
}
@Override
+ public void setWindowedWrites(boolean windowedWrites) {
+ }
+
+ @Override
public void finalize(Iterable<TestWriterResult> bundleResults, PipelineOptions options)
throws Exception {
assertEquals("test_value", options.as(WriteOptions.class).getTestFlag());
@@ -633,7 +629,21 @@ public class WriteTest {
}
@Override
- public void open(String uId) throws Exception {
+ public final void openWindowed(String uId,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ int shard,
+ int nShards) throws Exception {
+ numShards.incrementAndGet();
+ this.uId = uId;
+ assertEquals(State.INITIAL, state);
+ state = State.OPENED;
+ }
+
+ @Override
+ public final void openUnwindowed(String uId,
+ int shard,
+ int nShards) throws Exception {
numShards.incrementAndGet();
this.uId = uId;
assertEquals(State.INITIAL, state);
@@ -653,8 +663,13 @@ public class WriteTest {
state = State.CLOSED;
return new TestWriterResult(uId, elementsWritten);
}
+
+ @Override
+ public void cleanup() throws Exception {
+ }
}
+
/**
* Options for test, exposed for PipelineOptionsFactory.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
index 96b8c57..63b5d11 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
@@ -93,7 +93,7 @@ public class XmlSinkTest {
.withRootElement(testRootElement);
assertEquals(testClass, sink.classToBind);
assertEquals(testRootElement, sink.rootElementName);
- assertEquals(testFilePrefix, sink.baseOutputFilename.get());
+ assertEquals(testFilePrefix, sink.getBaseOutputFilenameProvider().get());
}
/**
@@ -105,7 +105,7 @@ public class XmlSinkTest {
XmlSink.writeOf(Bird.class, testRootElement, testFilePrefix);
assertEquals(testClass, sink.classToBind);
assertEquals(testRootElement, sink.rootElementName);
- assertEquals(testFilePrefix, sink.baseOutputFilename.get());
+ assertEquals(testFilePrefix, sink.getBaseOutputFilenameProvider().get());
}
/**
@@ -142,9 +142,9 @@ public class XmlSinkTest {
XmlSink.writeOf(testClass, testRootElement, testFilePrefix);
XmlWriteOperation<Bird> writeOp = sink.createWriteOperation(options);
assertEquals(testClass, writeOp.getSink().classToBind);
- assertEquals(testFilePrefix, writeOp.getSink().baseOutputFilename.get());
+ assertEquals(testFilePrefix, writeOp.getSink().getBaseOutputFilenameProvider().get());
assertEquals(testRootElement, writeOp.getSink().rootElementName);
- assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().extension);
+ // assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().getFilenamePolicy().extension);
Path outputPath = new File(testFilePrefix).toPath();
Path tempPath = new File(writeOp.tempDirectory.get()).toPath();
assertEquals(outputPath.getParent(), tempPath.getParent());
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index 084d303..2ddead7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -162,23 +162,6 @@ public class TestPipelineTest implements Serializable {
}
@Test
- public void testMatcherSerializationDeserialization() {
- TestPipelineOptions opts = PipelineOptionsFactory.as(TestPipelineOptions.class);
- SerializableMatcher<PipelineResult> m1 = new TestMatcher();
- SerializableMatcher<PipelineResult> m2 = new TestMatcher();
-
- opts.setOnCreateMatcher(m1);
- opts.setOnSuccessMatcher(m2);
-
- String[] arr = TestPipeline.convertToArgs(opts);
- TestPipelineOptions newOpts =
- PipelineOptionsFactory.fromArgs(arr).as(TestPipelineOptions.class);
-
- assertEquals(m1, newOpts.getOnCreateMatcher());
- assertEquals(m2, newOpts.getOnSuccessMatcher());
- }
-
- @Test
public void testRunWithDummyEnvironmentVariableFails() {
System.getProperties()
.setProperty(TestPipeline.PROPERTY_USE_DEFAULT_DUMMY_RUNNER, Boolean.toString(true));
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
index 9b085ca..10ff788 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
@@ -40,6 +40,8 @@ import org.apache.beam.sdk.io.Sink;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -284,6 +286,10 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
}
@Override
+ public void setWindowedWrites(boolean windowedWrites) {
+ }
+
+ @Override
public void finalize(final Iterable<String> writerResults, PipelineOptions options)
throws Exception {
UGIHelper.getBestUGI(sink.username()).doAs(new PrivilegedExceptionAction<Void>() {
@@ -298,7 +304,6 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
private void doFinalize(Iterable<String> writerResults) throws Exception {
Job job = sink.newJob();
FileSystem fs = FileSystem.get(new URI(path), job.getConfiguration());
-
// If there are 0 output shards, just create output folder.
if (!writerResults.iterator().hasNext()) {
fs.mkdirs(new Path(path));
@@ -389,7 +394,17 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
}
@Override
- public void open(final String uId) throws Exception {
+ public void openWindowed(final String uId,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ int shard,
+ int numShards) throws Exception {
+ throw new UnsupportedOperationException("Windowing support not implemented yet for"
+ + "HDFS. Window " + window);
+ }
+
+ @Override
+ public void openUnwindowed(final String uId, int shard, int numShards) throws Exception {
UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
new PrivilegedExceptionAction<Void>() {
@Override
@@ -427,6 +442,11 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
}
@Override
+ public void cleanup() throws Exception {
+
+ }
+
+ @Override
public String close() throws Exception {
return UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
new PrivilegedExceptionAction<String>() {
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
index 8b9a6d1..cedd812 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
@@ -66,7 +66,7 @@ public class HDFSFileSinkTest {
Sink.WriteOperation<T, String> writeOperation =
(Sink.WriteOperation<T, String>) sink.createWriteOperation(options);
Sink.Writer<T, String> writer = writeOperation.createWriter(options);
- writer.open(UUID.randomUUID().toString());
+ writer.openUnwindowed(UUID.randomUUID().toString(), -1, -1);
for (T t: toWrite) {
writer.write(t);
}
[2/3] beam git commit: Add windowing support to FileBasedSink
Posted by ke...@apache.org.
Add windowing support to FileBasedSink
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6addc95f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6addc95f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6addc95f
Branch: refs/heads/master
Commit: 6addc95f0300a2e03109d4ad7ee93727d0a3b7b2
Parents: 570d0e2
Author: Reuven Lax <re...@google.com>
Authored: Thu Mar 9 09:45:35 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Apr 5 08:57:21 2017 -0700
----------------------------------------------------------------------
examples/java/pom.xml | 1 -
.../apache/beam/examples/WindowedWordCount.java | 34 +-
.../examples/common/WriteOneFilePerWindow.java | 91 ++++
.../examples/common/WriteWindowedFilesDoFn.java | 77 ----
.../beam/examples/WindowedWordCountIT.java | 41 +-
.../core/construction/PTransformMatchers.java | 3 +-
.../direct/WriteWithShardingFactory.java | 6 +-
.../streaming/io/UnboundedFlinkSink.java | 20 +-
.../beam/runners/flink/WriteSinkITCase.java | 23 +-
.../java/org/apache/beam/sdk/io/AvroIO.java | 157 +++++--
.../org/apache/beam/sdk/io/FileBasedSink.java | 429 +++++++++++++++----
.../main/java/org/apache/beam/sdk/io/Sink.java | 55 ++-
.../java/org/apache/beam/sdk/io/TextIO.java | 98 ++++-
.../main/java/org/apache/beam/sdk/io/Write.java | 377 +++++++++-------
.../java/org/apache/beam/sdk/io/XmlSink.java | 6 +-
.../beam/sdk/testing/TestPipelineOptions.java | 5 +
.../beam/sdk/util/FileIOChannelFactory.java | 23 +-
.../beam/sdk/util/GcsIOChannelFactory.java | 3 +-
.../java/org/apache/beam/sdk/util/GcsUtil.java | 21 +-
.../apache/beam/sdk/util/IOChannelFactory.java | 3 +-
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 146 ++++++-
.../apache/beam/sdk/io/FileBasedSinkTest.java | 94 ++--
.../java/org/apache/beam/sdk/io/WriteTest.java | 49 ++-
.../org/apache/beam/sdk/io/XmlSinkTest.java | 8 +-
.../beam/sdk/testing/TestPipelineTest.java | 17 -
.../apache/beam/sdk/io/hdfs/HDFSFileSink.java | 24 +-
.../beam/sdk/io/hdfs/HDFSFileSinkTest.java | 2 +-
27 files changed, 1295 insertions(+), 518 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 2b18130..021a819 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -209,7 +209,6 @@
<configuration>
<includes>
<include>WordCountIT.java</include>
- <include>WindowedWordCountIT.java</include>
</includes>
<parallel>all</parallel>
<threadCount>4</threadCount>
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 5c19454..d88de54 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -21,7 +21,7 @@ import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExampleOptions;
-import org.apache.beam.examples.common.WriteWindowedFilesDoFn;
+import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
@@ -31,11 +31,9 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -203,33 +201,13 @@ public class WindowedWordCount {
PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
/**
- * Concept #5: Customize the output format using windowing information
- *
- * <p>At this point, the data is organized by window. We're writing text files and and have no
- * late data, so for simplicity we can use the window as the key and {@link GroupByKey} to get
- * one output file per window. (if we had late data this key would not be unique)
- *
- * <p>To access the window in a {@link DoFn}, add a {@link BoundedWindow} parameter. This will
- * be automatically detected and populated with the window for the current element.
- */
- PCollection<KV<IntervalWindow, KV<String, Long>>> keyedByWindow =
- wordCounts.apply(
- ParDo.of(
- new DoFn<KV<String, Long>, KV<IntervalWindow, KV<String, Long>>>() {
- @ProcessElement
- public void processElement(ProcessContext context, IntervalWindow window) {
- context.output(KV.of(window, context.element()));
- }
- }));
-
- /**
- * Concept #6: Format the results and write to a sharded file partitioned by window, using a
+ * Concept #5: Format the results and write to a sharded file partitioned by window, using a
* simple ParDo operation. Because there may be failures followed by retries, the
* writes must be idempotent, but the details of writing to files is elided here.
*/
- keyedByWindow
- .apply(GroupByKey.<IntervalWindow, KV<String, Long>>create())
- .apply(ParDo.of(new WriteWindowedFilesDoFn(output)));
+ wordCounts
+ .apply(MapElements.via(new WordCount.FormatAsTextFn()))
+ .apply(new WriteOneFilePerWindow(output));
PipelineResult result = pipeline.run();
try {
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
new file mode 100644
index 0000000..2ed8a74
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.common;
+
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+
+/**
+ * A {@link DoFn} that writes elements to files with names deterministically derived from the lower
+ * and upper bounds of their key (an {@link IntervalWindow}).
+ *
+ * <p>This is test utility code, not for end-users, so examples can be focused on their primary
+ * lessons.
+ */
+public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone> {
+
+ private static DateTimeFormatter formatter = ISODateTimeFormat.hourMinute();
+ private String filenamePrefix;
+
+ public WriteOneFilePerWindow(String filenamePrefix) {
+ this.filenamePrefix = filenamePrefix;
+ }
+
+ @Override
+ public PDone expand(PCollection<String> input) {
+ return input.apply(
+ TextIO.Write.to(new PerWindowFiles(filenamePrefix)).withWindowedWrites().withNumShards(3));
+ }
+
+ /**
+ * A {@link FilenamePolicy} produces a base file name for a write based on metadata about the data
+ * being written. This always includes the shard number and the total number of shards. For
+ * windowed writes, it also includes the window and pane index (a sequence number assigned to each
+ * trigger firing).
+ */
+ public static class PerWindowFiles extends FilenamePolicy {
+
+ private final String output;
+
+ public PerWindowFiles(String output) {
+ this.output = output;
+ }
+
+ @Override
+ public ValueProvider<String> getBaseOutputFilenameProvider() {
+ return StaticValueProvider.of(output);
+ }
+
+ public String filenamePrefixForWindow(IntervalWindow window) {
+ return String.format(
+ "%s-%s-%s", output, formatter.print(window.start()), formatter.print(window.end()));
+ }
+
+ @Override
+ public String windowedFilename(WindowedContext context) {
+ IntervalWindow window = (IntervalWindow) context.getWindow();
+ return String.format(
+ "%s-%s-of-%s",
+ filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards());
+ }
+
+ @Override
+ public String unwindowedFilename(Context context) {
+ throw new UnsupportedOperationException("Unsupported.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java
deleted file mode 100644
index cd6baad..0000000
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.examples.common;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.charset.StandardCharsets;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.values.KV;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.ISODateTimeFormat;
-
-/**
- * A {@link DoFn} that writes elements to files with names deterministically derived from the lower
- * and upper bounds of their key (an {@link IntervalWindow}).
- *
- * <p>This is test utility code, not for end-users, so examples can be focused
- * on their primary lessons.
- */
-public class WriteWindowedFilesDoFn
- extends DoFn<KV<IntervalWindow, Iterable<KV<String, Long>>>, Void> {
-
- static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
- static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-
- private static DateTimeFormatter formatter = ISODateTimeFormat.hourMinute();
-
- private final String output;
-
- public WriteWindowedFilesDoFn(String output) {
- this.output = output;
- }
-
- @VisibleForTesting
- public static String fileForWindow(String output, IntervalWindow window) {
- return String.format(
- "%s-%s-%s", output, formatter.print(window.start()), formatter.print(window.end()));
- }
-
- @ProcessElement
- public void processElement(ProcessContext context) throws Exception {
- // Build a file name from the window
- IntervalWindow window = context.element().getKey();
- String outputShard = fileForWindow(output, window);
-
- // Open the file and write all the values
- IOChannelFactory factory = IOChannelUtils.getFactory(outputShard);
- OutputStream out = Channels.newOutputStream(factory.create(outputShard, "text/plain"));
- for (KV<String, Long> wordCount : context.element().getValue()) {
- STRING_CODER.encode(
- wordCount.getKey() + ": " + wordCount.getValue(), out, Coder.Context.OUTER);
- out.write(NEWLINE);
- }
- out.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index 703f836..857f1d3 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -23,13 +23,14 @@ import com.google.api.client.util.Sleeper;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.examples.common.WriteWindowedFilesDoFn;
+import org.apache.beam.examples.common.WriteOneFilePerWindow.PerWindowFiles;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
@@ -42,6 +43,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.ExplicitShardedFile;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.NumberedShardedFile;
import org.apache.beam.sdk.util.ShardedFile;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
@@ -64,7 +66,7 @@ public class WindowedWordCountIT {
@Rule public TestName testName = new TestName();
private static final String DEFAULT_INPUT =
- "gs://apache-beam-samples/shakespeare/winterstale-personae";
+ "gs://apache-beam-samples/shakespeare/sonnets.txt";
static final int MAX_READ_RETRIES = 4;
static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L);
static final FluentBackoff BACK_OFF_FACTORY =
@@ -130,14 +132,18 @@ public class WindowedWordCountIT {
String outputPrefix = options.getOutput();
- List<String> expectedOutputFiles = Lists.newArrayListWithCapacity(6);
+ PerWindowFiles filenamePolicy = new PerWindowFiles(outputPrefix);
+
+ List<ShardedFile> expectedOutputFiles = Lists.newArrayListWithCapacity(6);
+
for (int startMinute : ImmutableList.of(0, 10, 20, 30, 40, 50)) {
- Instant windowStart =
+ final Instant windowStart =
new Instant(options.getMinTimestampMillis()).plus(Duration.standardMinutes(startMinute));
expectedOutputFiles.add(
- WriteWindowedFilesDoFn.fileForWindow(
- outputPrefix,
- new IntervalWindow(windowStart, windowStart.plus(Duration.standardMinutes(10)))));
+ new NumberedShardedFile(
+ filenamePolicy.filenamePrefixForWindow(
+ new IntervalWindow(
+ windowStart, windowStart.plus(Duration.standardMinutes(10)))) + "*"));
}
ShardedFile inputFile = new ExplicitShardedFile(Collections.singleton(options.getInputFile()));
@@ -157,7 +163,7 @@ public class WindowedWordCountIT {
}
options.setOnSuccessMatcher(
- new WordCountsMatcher(expectedWordCounts, new ExplicitShardedFile(expectedOutputFiles)));
+ new WordCountsMatcher(expectedWordCounts, expectedOutputFiles));
WindowedWordCount.main(TestPipeline.convertToArgs(options));
}
@@ -172,24 +178,28 @@ public class WindowedWordCountIT {
private static final Logger LOG = LoggerFactory.getLogger(FileChecksumMatcher.class);
private final SortedMap<String, Long> expectedWordCounts;
- private final ShardedFile outputFile;
+ private final List<ShardedFile> outputFiles;
private SortedMap<String, Long> actualCounts;
- public WordCountsMatcher(SortedMap<String, Long> expectedWordCounts, ShardedFile outputFile) {
+ public WordCountsMatcher(
+ SortedMap<String, Long> expectedWordCounts, List<ShardedFile> outputFiles) {
this.expectedWordCounts = expectedWordCounts;
- this.outputFile = outputFile;
+ this.outputFiles = outputFiles;
}
@Override
public boolean matchesSafely(PipelineResult pipelineResult) {
try {
// Load output data
- List<String> lines =
- outputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
+ List<String> outputLines = new ArrayList<>();
+ for (ShardedFile outputFile : outputFiles) {
+ outputLines.addAll(
+ outputFile.readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()));
+ }
// Since the windowing is nondeterministic we only check the sums
actualCounts = new TreeMap<>();
- for (String line : lines) {
+ for (String line : outputLines) {
String[] splits = line.split(": ");
String word = splits[0];
long count = Long.parseLong(splits[1]);
@@ -205,7 +215,8 @@ public class WindowedWordCountIT {
return actualCounts.equals(expectedWordCounts);
} catch (Exception e) {
throw new RuntimeException(
- String.format("Failed to read from sharded output: %s", outputFile));
+ String.format("Failed to read from sharded output: %s due to exception",
+ outputFiles), e);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index f4ae577..c4f1bd6 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -254,7 +254,8 @@ public class PTransformMatchers {
@Override
public boolean matches(AppliedPTransform<?, ?, ?> application) {
if (application.getTransform() instanceof Write) {
- return ((Write) application.getTransform()).getSharding() == null;
+ Write write = (Write) application.getTransform();
+ return write.getSharding() == null && write.getNumShards() == null;
}
return false;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 63122fe..1bf5839 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -35,6 +35,8 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
@@ -54,8 +56,7 @@ class WriteWithShardingFactory<InputT>
@Override
public PTransform<PCollection<InputT>, PDone> getReplacementTransform(
Write<InputT> transform) {
-
- return transform.withSharding(new LogElementShardsWithDrift<InputT>());
+ return transform.withSharding(new LogElementShardsWithDrift<InputT>());
}
@Override
@@ -74,6 +75,7 @@ class WriteWithShardingFactory<InputT>
@Override
public PCollectionView<Integer> expand(PCollection<T> records) {
return records
+ .apply(Window.<T>into(new GlobalWindows()))
.apply("CountRecords", Count.<T>globally())
.apply("GenerateShardCount", ParDo.of(new CalculateShardsFn()))
.apply(View.<Integer>asSingleton());
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
index 301d841..af36b80 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
@@ -28,6 +28,8 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.io.Sink;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -63,6 +65,10 @@ public class UnboundedFlinkSink<T> extends Sink<T> {
}
@Override
+ public void setWindowedWrites(boolean windowedWrites) {
+ }
+
+ @Override
public void finalize(Iterable<Object> writerResults, PipelineOptions options)
throws Exception {
@@ -141,7 +147,19 @@ public class UnboundedFlinkSink<T> extends Sink<T> {
public Writer<T, Object> createWriter(PipelineOptions options) throws Exception {
return new Writer<T, Object>() {
@Override
- public void open(String uId) throws Exception {
+ public void openWindowed(String uId,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ int shard,
+ int numShards) throws Exception {
+ }
+
+ @Override
+ public void openUnwindowed(String uId, int shard, int numShards) throws Exception {
+ }
+
+ @Override
+ public void cleanup() throws Exception {
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
index 572c291..38b790e 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
@@ -33,6 +33,8 @@ import org.apache.beam.sdk.io.Sink;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.test.util.JavaProgramTestBase;
@@ -119,6 +121,11 @@ public class WriteSinkITCase extends JavaProgramTestBase {
}
@Override
+ public void setWindowedWrites(boolean windowedWrites) {
+
+ }
+
+ @Override
public void finalize(Iterable<String> writerResults, PipelineOptions options)
throws Exception {
@@ -142,13 +149,27 @@ public class WriteSinkITCase extends JavaProgramTestBase {
private PrintWriter internalWriter;
@Override
- public void open(String uId) throws Exception {
+ public final void openWindowed(String uId,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ int shard,
+ int numShards) throws Exception {
+ throw new UnsupportedOperationException("Windowed writes not supported.");
+ }
+
+ @Override
+ public final void openUnwindowed(String uId, int shard, int numShards) throws Exception {
Path path = new Path(resultPath + "/" + uId);
FileSystem.get(new URI("file:///")).create(path, false);
internalWriter = new PrintWriter(new File(path.toUri()));
}
@Override
+ public void cleanup() throws Exception {
+
+ }
+
+ @Override
public void write(String value) throws Exception {
internalWriter.println(value);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 96f0a50..a41c9f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.BaseEncoding;
+
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
@@ -39,6 +40,7 @@ import org.apache.avro.reflect.ReflectData;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.Read.Bounded;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
@@ -85,11 +87,21 @@ import org.apache.beam.sdk.values.PDone;
* } </pre>
*
* <p>To write a {@link PCollection} to one or more Avro files, use
- * {@link AvroIO.Write}, specifying {@link AvroIO.Write#to} to specify
+ * {@link AvroIO.Write}, specifying {@link AvroIO.Write#to(String)} to specify
* the path of the file to write to (e.g., a local filename or sharded
* filename pattern if running locally, or a Google Cloud Storage
* filename or sharded filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}).
+ * {@code "gs://<bucket>/<filepath>"}). {@link AvroIO.Write#to(FilenamePolicy)} can also be used
+ * to specify a custom file naming policy.
+ *
+ * <p>By default, all input is put into the global window before writing. If per-window writes are
+ * desired - for example, when using a streaming runner -
+ * {@link AvroIO.Write.Bound#withWindowedWrites()} will cause windowing and triggering to be
+ * preserved. When producing windowed writes, the number of output shards must be set explicitly
+ * using {@link AvroIO.Write.Bound#withNumShards(int)}; some runners may set this for you to a
+ * runner-chosen value, so you may need not set it yourself. A
+ * {@link FileBasedSink.FilenamePolicy} must be set, and unique windows and triggers must produce
+ * unique filenames.
*
* <p>It is required to specify {@link AvroIO.Write#withSchema}. To
* write specific records, such as Avro-generated classes, provide an
@@ -369,6 +381,14 @@ public class AvroIO {
}
/**
+ * Returns a {@link PTransform} that writes to the file(s) specified by the provided
+ * {@link FileBasedSink.FilenamePolicy}.
+ */
+ public static Bound<GenericRecord> to(FilenamePolicy filenamePolicy) {
+ return new Bound<>(GenericRecord.class).to(filenamePolicy);
+ }
+
+ /**
* Returns a {@link PTransform} that writes to the file(s) with the
* given filename suffix.
*/
@@ -496,6 +516,9 @@ public class AvroIO {
final Schema schema;
/** An option to indicate if output validation is desired. Default is true. */
final boolean validate;
+ final boolean windowedWrites;
+ FilenamePolicy filenamePolicy;
+
/**
* The codec used to encode the blocks in the Avro file. String value drawn from those in
* https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
@@ -515,7 +538,9 @@ public class AvroIO {
null,
true,
DEFAULT_CODEC,
- ImmutableMap.<String, Object>of());
+ ImmutableMap.<String, Object>of(),
+ false,
+ null);
}
Bound(
@@ -528,7 +553,9 @@ public class AvroIO {
Schema schema,
boolean validate,
SerializableAvroCodecFactory codec,
- Map<String, Object> metadata) {
+ Map<String, Object> metadata,
+ boolean windowedWrites,
+ FilenamePolicy filenamePolicy) {
super(name);
this.filenamePrefix = filenamePrefix;
this.filenameSuffix = filenameSuffix;
@@ -538,6 +565,8 @@ public class AvroIO {
this.schema = schema;
this.validate = validate;
this.codec = codec;
+ this.windowedWrites = windowedWrites;
+ this.filenamePolicy = filenamePolicy;
Map<String, String> badKeys = Maps.newLinkedHashMap();
for (Map.Entry<String, Object> entry : metadata.entrySet()) {
@@ -573,7 +602,25 @@ public class AvroIO {
schema,
validate,
codec,
- metadata);
+ metadata,
+ windowedWrites,
+ filenamePolicy);
+ }
+
+ public Bound<T> to(FilenamePolicy filenamePolicy) {
+ return new Bound<>(
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ type,
+ schema,
+ validate,
+ codec,
+ metadata,
+ windowedWrites,
+ filenamePolicy);
}
/**
@@ -596,7 +643,9 @@ public class AvroIO {
schema,
validate,
codec,
- metadata);
+ metadata,
+ windowedWrites,
+ filenamePolicy);
}
/**
@@ -625,7 +674,9 @@ public class AvroIO {
schema,
validate,
codec,
- metadata);
+ metadata,
+ windowedWrites,
+ filenamePolicy);
}
/**
@@ -647,7 +698,9 @@ public class AvroIO {
schema,
validate,
codec,
- metadata);
+ metadata,
+ windowedWrites,
+ filenamePolicy);
}
/**
@@ -670,7 +723,25 @@ public class AvroIO {
schema,
validate,
codec,
- metadata);
+ metadata,
+ windowedWrites,
+ filenamePolicy);
+ }
+
+ public Bound<T> withWindowedWrites() {
+ return new Bound<>(
+ name,
+ filenamePrefix,
+ filenameSuffix,
+ numShards,
+ shardTemplate,
+ type,
+ schema,
+ validate,
+ codec,
+ metadata,
+ true,
+ filenamePolicy);
}
/**
@@ -693,7 +764,9 @@ public class AvroIO {
ReflectData.get().getSchema(type),
validate,
codec,
- metadata);
+ metadata,
+ windowedWrites,
+ filenamePolicy);
}
/**
@@ -714,7 +787,9 @@ public class AvroIO {
schema,
validate,
codec,
- metadata);
+ metadata,
+ windowedWrites,
+ filenamePolicy);
}
/**
@@ -749,7 +824,9 @@ public class AvroIO {
schema,
false,
codec,
- metadata);
+ metadata,
+ windowedWrites,
+ filenamePolicy);
}
/**
@@ -769,7 +846,9 @@ public class AvroIO {
schema,
validate,
new SerializableAvroCodecFactory(codec),
- metadata);
+ metadata,
+ windowedWrites,
+ filenamePolicy);
}
/**
@@ -789,31 +868,49 @@ public class AvroIO {
schema,
validate,
codec,
- metadata);
+ metadata,
+ windowedWrites,
+ filenamePolicy);
}
@Override
public PDone expand(PCollection<T> input) {
- if (filenamePrefix == null) {
+ if (filenamePolicy == null && filenamePrefix == null) {
throw new IllegalStateException(
"need to set the filename prefix of an AvroIO.Write transform");
}
+ if (filenamePolicy != null && filenamePrefix != null) {
+ throw new IllegalStateException(
+ "cannot set both a filename policy and a filename prefix");
+ }
if (schema == null) {
throw new IllegalStateException("need to set the schema of an AvroIO.Write transform");
}
- org.apache.beam.sdk.io.Write<T> write =
- org.apache.beam.sdk.io.Write.to(
- new AvroSink<>(
- filenamePrefix,
- filenameSuffix,
- shardTemplate,
- AvroCoder.of(type, schema),
- codec,
- metadata));
+ org.apache.beam.sdk.io.Write<T> write = null;
+ if (filenamePolicy != null) {
+ write = org.apache.beam.sdk.io.Write.to(
+ new AvroSink<>(
+ filenamePolicy,
+ AvroCoder.of(type, schema),
+ codec,
+ metadata));
+ } else {
+ write = org.apache.beam.sdk.io.Write.to(
+ new AvroSink<>(
+ filenamePrefix,
+ filenameSuffix,
+ shardTemplate,
+ AvroCoder.of(type, schema),
+ codec,
+ metadata));
+ }
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
+ if (windowedWrites) {
+ write = write.withWindowedWrites();
+ }
return input.apply("Write", write);
}
@@ -940,6 +1037,18 @@ public class AvroIO {
@VisibleForTesting
AvroSink(
+ FilenamePolicy filenamePolicy,
+ AvroCoder<T> coder,
+ SerializableAvroCodecFactory codec,
+ ImmutableMap<String, Object> metadata) {
+ super(filenamePolicy);
+ this.coder = coder;
+ this.codec = codec;
+ this.metadata = metadata;
+ }
+
+ @VisibleForTesting
+ AvroSink(
String baseOutputFilename,
String extension,
String fileNameTemplate,
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index ae28b62..9b5f130 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -17,34 +17,48 @@
*/
package org.apache.beam.sdk.io;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Serializable;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.WindowedContext;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
@@ -146,21 +160,165 @@ public abstract class FileBasedSink<T> extends Sink<T> {
*/
protected final WritableByteChannelFactory writableByteChannelFactory;
+
/**
- * Base filename for final output files.
+ * A naming policy for output files.
*/
- protected final ValueProvider<String> baseOutputFilename;
+ public abstract static class FilenamePolicy implements Serializable {
+ /**
+ * Context used for generating a name based on shard numer, and num shards.
+ * The policy must produce unique filenames for unique {@link Context} objects.
+ *
+ * <p>Be careful about adding fields to this as existing strategies will not notice the new
+ * fields, and may not produce unique filenames.
+ */
+ public static class Context {
+ private int shardNumber;
+ private int numShards;
+
+
+ public Context(int shardNumber, int numShards) {
+ this.shardNumber = shardNumber;
+ this.numShards = numShards;
+ }
+
+ public int getShardNumber() {
+ return shardNumber;
+ }
+
+
+ public int getNumShards() {
+ return numShards;
+ }
+ }
+
+ /**
+ * Context used for generating a name based on window, pane, shard numer, and num shards.
+ * The policy must produce unique filenames for unique {@link WindowedContext} objects.
+ *
+ * <p>Be careful about adding fields to this as existing strategies will not notice the new
+ * fields, and may not produce unique filenames.
+ */
+ public static class WindowedContext {
+ private int shardNumber;
+ private int numShards;
+ private BoundedWindow window;
+ private PaneInfo paneInfo;
+
+ public WindowedContext(
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ int shardNumber,
+ int numShards) {
+ this.window = window;
+ this.paneInfo = paneInfo;
+ this.shardNumber = shardNumber;
+ this.numShards = numShards;
+ }
+
+ public BoundedWindow getWindow() {
+ return window;
+ }
+
+ public PaneInfo getPaneInfo() {
+ return paneInfo;
+ }
+
+ public int getShardNumber() {
+ return shardNumber;
+ }
+
+ public int getNumShards() {
+ return numShards;
+ }
+ }
+
+ /**
+ * When a sink has requested windowed or triggered output, this method will be invoked to return
+ * the filename. The {@link WindowedContext} object gives access to the window and pane, as
+ * well as sharding information. The policy must return unique and consistent filenames
+ * for different windows and panes.
+ */
+ public abstract String windowedFilename(WindowedContext c);
+
+ /**
+ * When a sink has not requested windowed output, this method will be invoked to return the
+ * filename. The {@link Context} object only provides sharding information, which is used by
+ * the policy to generate unique and consistent filenames.
+ */
+ public abstract String unwindowedFilename(Context c);
+
+ /**
+ * @return The base filename for all output files.
+ */
+ public abstract ValueProvider<String> getBaseOutputFilenameProvider();
+
+ /**
+ * Populates the display data.
+ */
+ public void populateDisplayData(DisplayData.Builder builder) {
+ }
+ }
/**
- * The extension to be used for the final output files.
+ * A default filename policy.
*/
- protected final String extension;
+ protected class DefaultFilenamePolicy extends FilenamePolicy {
+ ValueProvider<String> baseOutputFilename;
+ String extension;
+ String fileNamingTemplate;
+
+ public DefaultFilenamePolicy(ValueProvider<String> baseOutputFilename, String extension,
+ String fileNamingTemplate) {
+ this.baseOutputFilename = baseOutputFilename;
+ if (!isNullOrEmpty(writableByteChannelFactory.getFilenameSuffix())) {
+ this.extension = extension + getFileExtension(
+ writableByteChannelFactory.getFilenameSuffix());
+ } else {
+ this.extension = extension;
+ }
+ this.fileNamingTemplate = fileNamingTemplate;
+ }
+
+ @Override
+ public String unwindowedFilename(FilenamePolicy.Context context) {
+ if (context.numShards <= 0) {
+ return null;
+ }
+
+ String suffix = getFileExtension(extension);
+ String filename = IOChannelUtils.constructName(
+ baseOutputFilename.get(), fileNamingTemplate, suffix, context.getShardNumber(),
+ context.getNumShards());
+ return filename;
+ }
+
+ @Override
+ public String windowedFilename(FilenamePolicy.WindowedContext c) {
+ throw new UnsupportedOperationException("There is no default policy for windowed file"
+ + " output. Please provide an explicit FilenamePolicy to generate filenames.");
+ }
+
+ @Override
+ public ValueProvider<String> getBaseOutputFilenameProvider() {
+ return baseOutputFilename;
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ String fileNamePattern = String.format("%s%s%s",
+ baseOutputFilename.isAccessible()
+ ? baseOutputFilename.get() : baseOutputFilename.toString(),
+ fileNamingTemplate, getFileExtension(extension));
+ builder.add(DisplayData.item("fileNamePattern", fileNamePattern)
+ .withLabel("File Name Pattern"));
+ }
+ }
/**
- * Naming template for output files. See {@link ShardNameTemplate} for a description of
- * possible naming templates. Default is {@link ShardNameTemplate#INDEX_OF_MAX}.
+ * The policy used to generate output filenames.
*/
- protected final String fileNamingTemplate;
+ protected FilenamePolicy fileNamePolicy;
/**
* Construct a FileBasedSink with the given base output filename and extension. A
@@ -201,20 +359,30 @@ public abstract class FileBasedSink<T> extends Sink<T> {
public FileBasedSink(ValueProvider<String> baseOutputFilename, String extension,
String fileNamingTemplate, WritableByteChannelFactory writableByteChannelFactory) {
this.writableByteChannelFactory = writableByteChannelFactory;
- this.baseOutputFilename = baseOutputFilename;
- if (!isNullOrEmpty(writableByteChannelFactory.getFilenameSuffix())) {
- this.extension = extension + getFileExtension(writableByteChannelFactory.getFilenameSuffix());
- } else {
- this.extension = extension;
- }
- this.fileNamingTemplate = fileNamingTemplate;
+ this.fileNamePolicy = new DefaultFilenamePolicy(baseOutputFilename, extension,
+ fileNamingTemplate);
+ }
+
+ public FileBasedSink(FilenamePolicy fileNamePolicy) {
+ this(fileNamePolicy, CompressionType.UNCOMPRESSED);
+
+ }
+
+ public FileBasedSink(FilenamePolicy fileNamePolicy,
+ WritableByteChannelFactory writableByteChannelFactory) {
+ this.fileNamePolicy = fileNamePolicy;
+ this.writableByteChannelFactory = writableByteChannelFactory;
}
/**
* Returns the base output filename for this file based sink.
*/
public ValueProvider<String> getBaseOutputFilenameProvider() {
- return baseOutputFilename;
+ return fileNamePolicy.getBaseOutputFilenameProvider();
+ }
+
+ public FilenamePolicy getFileNamePolicy() {
+ return fileNamePolicy;
}
@Override
@@ -230,13 +398,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
-
- String fileNamePattern = String.format("%s%s%s",
- baseOutputFilename.isAccessible()
- ? baseOutputFilename.get() : baseOutputFilename.toString(),
- fileNamingTemplate, getFileExtension(extension));
- builder.add(DisplayData.item("fileNamePattern", fileNamePattern)
- .withLabel("File Name Pattern"));
+ getFileNamePolicy().populateDisplayData(builder);
}
/**
@@ -286,7 +448,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
* constructor arguments.
*
* <p>Subclass implementations can change the file naming template by supplying a value for
- * {@link FileBasedSink#fileNamingTemplate}.
+ * fileNamingTemplate.
*
* <p>Note that in the case of permanent failure of a bundle's write, no clean up of temporary
* files will occur.
@@ -304,6 +466,9 @@ public abstract class FileBasedSink<T> extends Sink<T> {
/** Directory for temporary output files. */
protected final ValueProvider<String> tempDirectory;
+ /** Whether windowed writes are being used. */
+ protected boolean windowedWrites;
+
/** Constructs a temporary file path given the temporary directory and a filename. */
protected static String buildTemporaryFilename(String tempDirectory, String filename)
throws IOException {
@@ -361,6 +526,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
private FileBasedWriteOperation(FileBasedSink<T> sink, ValueProvider<String> tempDirectory) {
this.sink = sink;
this.tempDirectory = tempDirectory;
+ this.windowedWrites = false;
}
/**
@@ -371,6 +537,11 @@ public abstract class FileBasedSink<T> extends Sink<T> {
@Override
public abstract FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception;
+ @Override
+ public void setWindowedWrites(boolean windowedWrites) {
+ this.windowedWrites = windowedWrites;
+ }
+
/**
* Initialization of the sink. Default implementation is a no-op. May be overridden by subclass
* implementations to perform initialization of the sink at pipeline runtime. This method must
@@ -395,22 +566,55 @@ public abstract class FileBasedSink<T> extends Sink<T> {
* @param writerResults the results of writes (FileResult).
*/
@Override
- public void finalize(Iterable<FileResult> writerResults, PipelineOptions options)
+ public void finalize(Iterable<FileResult> writerResults,
+ PipelineOptions options)
throws Exception {
// Collect names of temporary files and rename them.
- List<String> files = new ArrayList<>();
- for (FileResult result : writerResults) {
- LOG.debug("Temporary bundle output file {} will be copied.", result.getFilename());
- files.add(result.getFilename());
- }
- copyToOutputFiles(files, options);
+ Map<String, String> outputFilenames = buildOutputFilenames(writerResults);
+ copyToOutputFiles(outputFilenames, options);
+ // Optionally remove temporary files.
// We remove the entire temporary directory, rather than specifically removing the files
// from writerResults, because writerResults includes only successfully completed bundles,
// and we'd like to clean up the failed ones too.
// Note that due to GCS eventual consistency, matching files in the temp directory is also
// currently non-perfect and may fail to delete some files.
- removeTemporaryFiles(files, options);
+ //
+ // When windows or triggers are specified, files are generated incrementally so deleting
+ // the entire directory in finalize is incorrect.
+ removeTemporaryFiles(outputFilenames.keySet(), !windowedWrites, options);
+ }
+
+ protected final Map<String, String> buildOutputFilenames(Iterable<FileResult> writerResults) {
+ Map<String, String> outputFilenames = new HashMap<>();
+ List<String> files = new ArrayList<>();
+ for (FileResult result : writerResults) {
+ if (result.getDestinationFilename() != null) {
+ outputFilenames.put(result.getFilename(), result.getDestinationFilename());
+ } else {
+ files.add(result.getFilename());
+ }
+ }
+
+ // If the user does not specify numShards() (not supported with windowing). Then the
+ // writerResults won't contain destination filenames, so we dynamically generate them here.
+ if (files.size() > 0) {
+ checkArgument(outputFilenames.isEmpty());
+ // Sort files for idempotence.
+ files = Ordering.natural().sortedCopy(files);
+ FilenamePolicy filenamePolicy = getSink().fileNamePolicy;
+ for (int i = 0; i < files.size(); i++) {
+ outputFilenames.put(files.get(i),
+ filenamePolicy.unwindowedFilename(new Context(i, files.size())));
+ }
+ }
+
+ int numDistinctShards = new HashSet<String>(outputFilenames.values()).size();
+ checkState(numDistinctShards == outputFilenames.size(),
+ "Only generated %s distinct file names for %s files.",
+ numDistinctShards, outputFilenames.size());
+
+ return outputFilenames;
}
/**
@@ -425,47 +629,19 @@ public abstract class FileBasedSink<T> extends Sink<T> {
* file-000-of-003.txt, the contents of B will be copied to file-001-of-003.txt, etc.
*
* @param filenames the filenames of temporary files.
- * @return a list containing the names of final output files.
*/
- protected final List<String> copyToOutputFiles(List<String> filenames, PipelineOptions options)
+ protected final void copyToOutputFiles(Map<String, String> filenames,
+ PipelineOptions options)
throws IOException {
int numFiles = filenames.size();
- // Sort files for idempotence.
- List<String> srcFilenames = Ordering.natural().sortedCopy(filenames);
- List<String> destFilenames = generateDestinationFilenames(numFiles);
-
if (numFiles > 0) {
LOG.debug("Copying {} files.", numFiles);
- IOChannelUtils.getFactory(destFilenames.get(0))
- .copy(srcFilenames, destFilenames);
+ IOChannelFactory channelFactory =
+ IOChannelUtils.getFactory(filenames.values().iterator().next());
+ channelFactory.copy(filenames.keySet(), filenames.values());
} else {
LOG.info("No output files to write.");
}
-
- return destFilenames;
- }
-
- /**
- * Generate output bundle filenames.
- */
- protected final List<String> generateDestinationFilenames(int numFiles) {
- List<String> destFilenames = new ArrayList<>();
- String extension = getSink().extension;
- String baseOutputFilename = getSink().baseOutputFilename.get();
- String fileNamingTemplate = getSink().fileNamingTemplate;
-
- String suffix = getFileExtension(extension);
- for (int i = 0; i < numFiles; i++) {
- destFilenames.add(IOChannelUtils.constructName(
- baseOutputFilename, fileNamingTemplate, suffix, i, numFiles));
- }
-
- int numDistinctShards = new HashSet<String>(destFilenames).size();
- checkState(numDistinctShards == numFiles,
- "Shard name template '%s' only generated %s distinct file names for %s files.",
- fileNamingTemplate, numDistinctShards, numFiles);
-
- return destFilenames;
}
/**
@@ -475,7 +651,9 @@ public abstract class FileBasedSink<T> extends Sink<T> {
* <b>Note:</b>If finalize is overridden and does <b>not</b> rename or otherwise finalize
* temporary files, this method will remove them.
*/
- protected final void removeTemporaryFiles(List<String> knownFiles, PipelineOptions options)
+ protected final void removeTemporaryFiles(Set<String> knownFiles,
+ boolean shouldRemoveTemporaryDirectory,
+ PipelineOptions options)
throws IOException {
String tempDir = tempDirectory.get();
LOG.debug("Removing temporary bundle output files in {}.", tempDir);
@@ -485,15 +663,18 @@ public abstract class FileBasedSink<T> extends Sink<T> {
// directory matching APIs, we remove not only files that the filesystem says exist
// in the directory (which may be incomplete), but also files that are known to exist
// (produced by successfully completed bundles).
+
// This may still fail to remove temporary outputs of some failed bundles, but at least
// the common case (where all bundles succeed) is guaranteed to be fully addressed.
Set<String> matches = new HashSet<>();
// TODO: Windows OS cannot resolves and matches '*' in the path,
// ignore the exception for now to avoid failing the pipeline.
- try {
- matches.addAll(factory.match(factory.resolve(tempDir, "*")));
- } catch (Exception e) {
- LOG.warn("Failed to match temporary files under: [{}].", tempDir);
+ if (shouldRemoveTemporaryDirectory) {
+ try {
+ matches.addAll(factory.match(factory.resolve(tempDir, "*")));
+ } catch (Exception e) {
+ LOG.warn("Failed to match temporary files under: [{}].", tempDir);
+ }
}
Set<String> allMatches = new HashSet<>(matches);
allMatches.addAll(knownFiles);
@@ -517,7 +698,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
*/
@Override
public Coder<FileResult> getWriterResultCoder() {
- return SerializableCoder.of(FileResult.class);
+ return FileResultCoder.of();
}
/**
@@ -553,8 +734,13 @@ public abstract class FileBasedSink<T> extends Sink<T> {
*/
private String id;
+ private BoundedWindow window;
+ private PaneInfo paneInfo;
+ private int shard = -1;
+ private int numShards = -1;
+
/**
- * The filename of the output bundle - $tempDirectory/$id.
+ * The filename of the output bundle.
*/
private String filename;
@@ -610,8 +796,37 @@ public abstract class FileBasedSink<T> extends Sink<T> {
* Opens the channel.
*/
@Override
- public final void open(String uId) throws Exception {
+ public final void openWindowed(String uId,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ int shard,
+ int numShards) throws Exception {
+ if (!getWriteOperation().windowedWrites) {
+ throw new IllegalStateException("openWindowed called a non-windowed sink.");
+ }
+ open(uId, window, paneInfo, shard, numShards);
+ }
+
+ @Override
+ public final void openUnwindowed(String uId,
+ int shard,
+ int numShards) throws Exception {
+ if (getWriteOperation().windowedWrites) {
+ throw new IllegalStateException("openUnwindowed called a windowed sink.");
+ }
+ open(uId, null, null, shard, numShards);
+ }
+
+ private void open(String uId,
+ @Nullable BoundedWindow window,
+ @Nullable PaneInfo paneInfo,
+ int shard,
+ int numShards) throws Exception {
this.id = uId;
+ this.window = window;
+ this.paneInfo = paneInfo;
+ this.shard = shard;
+ this.numShards = numShards;
filename = FileBasedWriteOperation.buildTemporaryFilename(
getWriteOperation().tempDirectory.get(), uId);
LOG.debug("Opening {}.", filename);
@@ -639,6 +854,13 @@ public abstract class FileBasedSink<T> extends Sink<T> {
LOG.debug("Starting write of bundle {} to {}.", this.id, filename);
}
+ @Override
+ public void cleanup() throws Exception {
+ if (filename != null) {
+ IOChannelUtils.getFactory(filename).remove(Lists.<String>newArrayList(filename));
+ }
+ }
+
/**
* Closes the channel and returns the bundle result.
*/
@@ -653,8 +875,17 @@ public abstract class FileBasedSink<T> extends Sink<T> {
throw new IllegalStateException("Channel should only be closed by its owner: " + channel);
}
}
- FileResult result = new FileResult(filename);
- LOG.debug("Result for bundle {}: {}", this.id, filename);
+
+ FilenamePolicy filenamePolicy = getWriteOperation().getSink().fileNamePolicy;
+ String destinationFile;
+ if (window != null) {
+ destinationFile = filenamePolicy.windowedFilename(new WindowedContext(
+ window, paneInfo, shard, numShards));
+ } else {
+ destinationFile = filenamePolicy.unwindowedFilename(new Context(shard, numShards));
+ }
+ FileResult result = new FileResult(filename, destinationFile);
+ LOG.debug("Result for bundle {}: {} {}", this.id, filename, destinationFile);
return result;
}
@@ -670,18 +901,62 @@ public abstract class FileBasedSink<T> extends Sink<T> {
/**
* Result of a single bundle write. Contains the filename of the bundle.
*/
- public static final class FileResult implements Serializable {
+ public static final class FileResult {
private final String filename;
+ private final String destinationFilename;
- public FileResult(String filename) {
+ public FileResult(String filename, String destinationFilename) {
this.filename = filename;
+ this.destinationFilename = destinationFilename;
}
public String getFilename() {
return filename;
}
+
+ public String getDestinationFilename() {
+ return destinationFilename;
+ }
+
+ }
+
+ /**
+ * A coder for FileResult objects.
+ */
+ public static final class FileResultCoder extends AtomicCoder<FileResult> {
+ private static final FileResultCoder INSTANCE = new FileResultCoder();
+ private final Coder<String> stringCoder = NullableCoder.of(StringUtf8Coder.of());
+
+ @JsonCreator
+ public static FileResultCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(FileResult value, OutputStream outStream, Context context)
+ throws IOException {
+ if (value == null) {
+ throw new CoderException("cannot encode a null value");
+ }
+ stringCoder.encode(value.getFilename(), outStream, context.nested());
+ stringCoder.encode(value.getDestinationFilename(), outStream, context.nested());
+ }
+
+ @Override
+ public FileResult decode(InputStream inStream, Context context)
+ throws IOException {
+ return new FileResult(
+ stringCoder.decode(inStream, context.nested()),
+ stringCoder.decode(inStream, context.nested()));
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ throw new NonDeterministicException(this, "TableRows are not deterministic.");
+ }
}
+
/**
* Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink}
* and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
index 6742784..d53c6ce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
@@ -23,6 +23,8 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
/**
@@ -63,11 +65,12 @@ import org.apache.beam.sdk.values.PCollection;
* operation corresponds to. See below for more information about these methods and restrictions on
* their implementation.
*
- * <li>{@link Writer}: A Writer writes a bundle of records. Writer defines four methods:
- * {@link Writer#open}, which is called once at the start of writing a bundle; {@link Writer#write},
- * which writes a single record from the bundle; {@link Writer#close}, which is called once at the
- * end of writing a bundle; and {@link Writer#getWriteOperation}, which returns the write operation
- * that the writer belongs to.
+ * <li>{@link Writer}: A Writer writes a bundle of records. Writer defines several methods:
+ * {@link Writer#openWindowed} and {@link Writer#openUnwindowed}, which are called once at the
+ * start of writing a bundle, depending on whether windowed or unwindowed output is requested.
+ * {@link Writer#write}, which writes a single record from the bundle; {@link Writer#close},
+ * which is called once at the end of writing a bundle; and {@link Writer#getWriteOperation},
+ * which returns the write operation that the writer belongs to.
* </ul>
*
* <h2>WriteOperation</h2>
@@ -95,9 +98,10 @@ import org.apache.beam.sdk.values.PCollection;
*
* <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
* event of failure/retry or for redundancy). However, exactly one of these executions will have its
- * result passed to the WriteOperation's finalize method. Each call to {@link Writer#open} is passed
- * a unique <i>bundle id</i> when it is called by the Write transform, so even redundant or retried
- * bundles will have a unique way of identifying their output.
+ * result passed to the WriteOperation's finalize method. Each call to {@link Writer#openWindowed}
+ * or {@link Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the
+ * Write transform, so even redundant or retried bundles will have a unique way of identifying
+ * their output.
*
* <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness
* guarantee is important; if a bundle is to be output to a file, for example, the name of the file
@@ -174,6 +178,11 @@ public abstract class Sink<T> implements Serializable, HasDisplayData {
public abstract void initialize(PipelineOptions options) throws Exception;
/**
+ * Indicates that the operation will be performing windowed writes.
+ */
+ public abstract void setWindowedWrites(boolean windowedWrites);
+
+ /**
* Given an Iterable of results from bundle writes, performs finalization after writing and
* closes the sink. Called after all bundle writes are complete.
*
@@ -200,7 +209,7 @@ public abstract class Sink<T> implements Serializable, HasDisplayData {
* Creates a new {@link Sink.Writer} to write a bundle of the input to the sink.
*
* <p>The bundle id that the writer will use to uniquely identify its output will be passed to
- * {@link Writer#open}.
+ * {@link Writer#openWindowed} or {@link Writer#openUnwindowed}.
*
* <p>Must not mutate the state of the WriteOperation.
*/
@@ -218,9 +227,10 @@ public abstract class Sink<T> implements Serializable, HasDisplayData {
}
/**
- * A Writer writes a bundle of elements from a PCollection to a sink. {@link Writer#open} is
- * called before writing begins and {@link Writer#close} is called after all elements in the
- * bundle have been written. {@link Writer#write} writes an element to the sink.
+ * A Writer writes a bundle of elements from a PCollection to a sink.
+ * {@link Writer#openWindowed} or {@link Writer#openUnwindowed} is called before writing begins
+ * and {@link Writer#close} is called after all elements in the bundle have been written.
+ * {@link Writer#write} writes an element to the sink.
*
* <p>Note that any access to static members or methods of a Writer must be thread-safe, as
* multiple instances of a Writer may be instantiated in different threads on the same worker.
@@ -238,8 +248,25 @@ public abstract class Sink<T> implements Serializable, HasDisplayData {
* <p>The unique id that is given to open should be used to ensure that the writer's output does
* not interfere with the output of other Writers, as a bundle may be executed many times for
* fault tolerance. See {@link Sink} for more information about bundle ids.
+ *
+ * <p></p>The window and paneInfo arguments are populated when windowed writes are requested.
+ * shard and numbShards are populated for the case of static sharding. In cases where the
+ * runner is dynamically picking sharding, shard and numShards might both be set to -1.
+ */
+ public abstract void openWindowed(String uId,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ int shard,
+ int numShards) throws Exception;
+
+ /**
+ * Perform bundle initialization for the case where the file is written unwindowed.
*/
- public abstract void open(String uId) throws Exception;
+ public abstract void openUnwindowed(String uId,
+ int shard,
+ int numShards) throws Exception;
+
+ public abstract void cleanup() throws Exception;
/**
* Called for each value in the bundle.
@@ -262,5 +289,7 @@ public abstract class Sink<T> implements Serializable, HasDisplayData {
* Returns the write operation this writer belongs to.
*/
public abstract WriteOperation<T, WriteT> getWriteOperation();
+
+
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 58b55a9..ea80639 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -41,6 +41,7 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
import org.apache.beam.sdk.io.Read.Bounded;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -85,6 +86,14 @@ import org.apache.beam.sdk.values.PDone;
* filename or sharded filename pattern of the form
* {@code "gs://<bucket>/<filepath>"}).
*
+ * <p>By default, all input is put into the global window before writing. If per-window writes are
+ * desired - for example, when using a streaming runner -
+ * {@link AvroIO.Write.Bound#withWindowedWrites()} will cause windowing and triggering to be
+ * preserved. When producing windowed writes, the number of output shards must be set explicitly
+ * using {@link AvroIO.Write.Bound#withNumShards(int)}; some runners may set this for you to a
+ * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} must be
+ * set, and unique windows and triggers must produce unique filenames.
+ *
* <p>Any existing files with the same names as generated output files
* will be overwritten.
*
@@ -352,6 +361,10 @@ public class TextIO {
return new Bound().to(prefix);
}
+ public static Bound to(FilenamePolicy filenamePolicy) {
+ return new Bound().to(filenamePolicy);
+
+ }
/**
* Like {@link #to(String)}, but with a {@link ValueProvider}.
*/
@@ -479,6 +492,12 @@ public class TextIO {
/** An option to indicate if output validation is desired. Default is true. */
private final boolean validate;
+ /** A policy for naming output files. */
+ private final FilenamePolicy filenamePolicy;
+
+ /** Whether to write windowed output files. */
+ private boolean windowedWrites;
+
/**
* The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default is
* {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
@@ -487,13 +506,15 @@ public class TextIO {
private Bound() {
this(null, null, "", null, null, 0, DEFAULT_SHARD_TEMPLATE, true,
- FileBasedSink.CompressionType.UNCOMPRESSED);
+ FileBasedSink.CompressionType.UNCOMPRESSED, null, false);
}
private Bound(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
@Nullable String header, @Nullable String footer, int numShards,
String shardTemplate, boolean validate,
- WritableByteChannelFactory writableByteChannelFactory) {
+ WritableByteChannelFactory writableByteChannelFactory,
+ FilenamePolicy filenamePolicy,
+ boolean windowedWrites) {
super(name);
this.header = header;
this.footer = footer;
@@ -504,6 +525,8 @@ public class TextIO {
this.validate = validate;
this.writableByteChannelFactory =
firstNonNull(writableByteChannelFactory, FileBasedSink.CompressionType.UNCOMPRESSED);
+ this.filenamePolicy = filenamePolicy;
+ this.windowedWrites = windowedWrites;
}
/**
@@ -518,7 +541,7 @@ public class TextIO {
validateOutputComponent(filenamePrefix);
return new Bound(name, StaticValueProvider.of(filenamePrefix), filenameSuffix,
header, footer, numShards, shardTemplate, validate,
- writableByteChannelFactory);
+ writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
@@ -526,7 +549,15 @@ public class TextIO {
*/
public Bound to(ValueProvider<String> filenamePrefix) {
return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, validate, writableByteChannelFactory);
+ shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+ }
+
+ /**
+ * Like {@link #to(String)}, but with a {@link FilenamePolicy}.
+ */
+ public Bound to(FilenamePolicy filenamePolicy) {
+ return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+ shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
@@ -540,7 +571,7 @@ public class TextIO {
public Bound withSuffix(String nameExtension) {
validateOutputComponent(nameExtension);
return new Bound(name, filenamePrefix, nameExtension, header, footer, numShards,
- shardTemplate, validate, writableByteChannelFactory);
+ shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
@@ -560,7 +591,7 @@ public class TextIO {
public Bound withNumShards(int numShards) {
checkArgument(numShards >= 0);
return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, validate, writableByteChannelFactory);
+ shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
@@ -573,7 +604,7 @@ public class TextIO {
*/
public Bound withShardNameTemplate(String shardTemplate) {
return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, validate, writableByteChannelFactory);
+ shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
@@ -591,7 +622,7 @@ public class TextIO {
*/
public Bound withoutSharding() {
return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 1, "",
- validate, writableByteChannelFactory);
+ validate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
@@ -606,7 +637,7 @@ public class TextIO {
*/
public Bound withoutValidation() {
return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, false, writableByteChannelFactory);
+ shardTemplate, false, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
@@ -621,7 +652,7 @@ public class TextIO {
*/
public Bound withHeader(@Nullable String header) {
return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, validate, writableByteChannelFactory);
+ shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
@@ -636,7 +667,7 @@ public class TextIO {
*/
public Bound withFooter(@Nullable String footer) {
return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, validate, writableByteChannelFactory);
+ shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
@@ -653,22 +684,39 @@ public class TextIO {
public Bound withWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory) {
return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, validate, writableByteChannelFactory);
+ shardTemplate, validate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+ }
+
+ public Bound withWindowedWrites() {
+ return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+ shardTemplate, validate, writableByteChannelFactory, filenamePolicy, true);
}
@Override
public PDone expand(PCollection<String> input) {
- if (filenamePrefix == null) {
+ if (filenamePolicy == null && filenamePrefix == null) {
+ throw new IllegalStateException(
+ "need to set the filename prefix of an TextIO.Write transform");
+ }
+ if (filenamePolicy != null && filenamePrefix != null) {
throw new IllegalStateException(
- "need to set the filename prefix of a TextIO.Write transform");
+ "cannot set both a filename policy and a filename prefix");
+ }
+ org.apache.beam.sdk.io.Write<String> write = null;
+ if (filenamePolicy != null) {
+ write = org.apache.beam.sdk.io.Write.to(
+ new TextSink(filenamePolicy, header, footer, writableByteChannelFactory));
+ } else {
+ write = org.apache.beam.sdk.io.Write.to(
+ new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
+ writableByteChannelFactory));
}
- org.apache.beam.sdk.io.Write<String> write =
- org.apache.beam.sdk.io.Write.to(
- new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
- writableByteChannelFactory));
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
+ if (windowedWrites) {
+ write = write.withWindowedWrites();
+ }
return input.apply("Write", write);
}
@@ -676,8 +724,11 @@ public class TextIO {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- String prefixString = filenamePrefix.isAccessible()
- ? filenamePrefix.get() : filenamePrefix.toString();
+ String prefixString = "";
+ if (filenamePrefix != null) {
+ prefixString = filenamePrefix.isAccessible()
+ ? filenamePrefix.get() : filenamePrefix.toString();
+ }
builder
.addIfNotNull(DisplayData.item("filePrefix", prefixString)
.withLabel("Output File Prefix"))
@@ -1023,6 +1074,13 @@ public class TextIO {
@Nullable private final String footer;
@VisibleForTesting
+ TextSink(FilenamePolicy filenamePolicy, @Nullable String header, @Nullable String footer,
+ WritableByteChannelFactory writableByteChannelFactory) {
+ super(filenamePolicy, writableByteChannelFactory);
+ this.header = header;
+ this.footer = footer;
+ }
+ @VisibleForTesting
TextSink(
ValueProvider<String> baseOutputFilename,
String extension,
[3/3] beam git commit: This closes #1926: Allow unbounded windowed
PCollections for FileBasedSinks
Posted by ke...@apache.org.
This closes #1926: Allow unbounded windowed PCollections for FileBasedSinks
Add windowing support to FileBasedSink
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bc907c58
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bc907c58
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bc907c58
Branch: refs/heads/master
Commit: bc907c58b1da97e53dd0f4b6bda0834b41bb6e66
Parents: 8e5cfde 6addc95
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Apr 5 10:23:20 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Apr 5 10:23:20 2017 -0700
----------------------------------------------------------------------
examples/java/pom.xml | 1 -
.../apache/beam/examples/WindowedWordCount.java | 34 +-
.../examples/common/WriteOneFilePerWindow.java | 91 ++++
.../examples/common/WriteWindowedFilesDoFn.java | 77 ----
.../beam/examples/WindowedWordCountIT.java | 41 +-
.../core/construction/PTransformMatchers.java | 3 +-
.../direct/WriteWithShardingFactory.java | 6 +-
.../streaming/io/UnboundedFlinkSink.java | 20 +-
.../beam/runners/flink/WriteSinkITCase.java | 23 +-
.../java/org/apache/beam/sdk/io/AvroIO.java | 157 +++++--
.../org/apache/beam/sdk/io/FileBasedSink.java | 429 +++++++++++++++----
.../main/java/org/apache/beam/sdk/io/Sink.java | 55 ++-
.../java/org/apache/beam/sdk/io/TextIO.java | 98 ++++-
.../main/java/org/apache/beam/sdk/io/Write.java | 377 +++++++++-------
.../java/org/apache/beam/sdk/io/XmlSink.java | 6 +-
.../beam/sdk/testing/TestPipelineOptions.java | 5 +
.../beam/sdk/util/FileIOChannelFactory.java | 23 +-
.../beam/sdk/util/GcsIOChannelFactory.java | 3 +-
.../java/org/apache/beam/sdk/util/GcsUtil.java | 21 +-
.../apache/beam/sdk/util/IOChannelFactory.java | 3 +-
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 146 ++++++-
.../apache/beam/sdk/io/FileBasedSinkTest.java | 94 ++--
.../java/org/apache/beam/sdk/io/WriteTest.java | 49 ++-
.../org/apache/beam/sdk/io/XmlSinkTest.java | 8 +-
.../beam/sdk/testing/TestPipelineTest.java | 17 -
.../apache/beam/sdk/io/hdfs/HDFSFileSink.java | 24 +-
.../beam/sdk/io/hdfs/HDFSFileSinkTest.java | 2 +-
27 files changed, 1295 insertions(+), 518 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/bc907c58/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------