You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:43 UTC
[16/28] beam git commit: Revert "[BEAM-2610] This closes #3553"
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 7013044..2fd10ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -20,12 +20,9 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.hash.Hashing;
-import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -33,12 +30,8 @@ import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ShardedKeyCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink.FileResult;
import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
@@ -49,11 +42,9 @@ import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -62,17 +53,11 @@ import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.ShardedKey;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,12 +66,13 @@ import org.slf4j.LoggerFactory;
* global initialization of a sink, followed by a parallel write, and ends with a sequential
* finalization of the write. The output of a write is {@link PDone}.
*
- * <p>By default, every bundle in the input {@link PCollection} will be processed by a {@link
- * WriteOperation}, so the number of output will vary based on runner behavior, though at least 1
- * output will always be produced. The exact parallelism of the write stage can be controlled using
- * {@link WriteFiles#withNumShards}, typically used to control how many files are produced or to
- * globally limit the number of workers connecting to an external service. However, this option can
- * often hurt performance: it adds an additional {@link GroupByKey} to the pipeline.
+ * <p>By default, every bundle in the input {@link PCollection} will be processed by a
+ * {@link WriteOperation}, so the number of output
+ * will vary based on runner behavior, though at least 1 output will always be produced. The
+ * exact parallelism of the write stage can be controlled using {@link WriteFiles#withNumShards},
+ * typically used to control how many files are produced or to globally limit the number of
+ * workers connecting to an external service. However, this option can often hurt performance: it
+ * adds an additional {@link GroupByKey} to the pipeline.
*
* <p>Example usage with runner-determined sharding:
*
@@ -97,70 +83,44 @@ import org.slf4j.LoggerFactory;
* <pre>{@code p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));}</pre>
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
-public class WriteFiles<UserT, DestinationT, OutputT>
- extends PTransform<PCollection<UserT>, PDone> {
+public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class);
- // The maximum number of file writers to keep open in a single bundle at a time, since file
- // writers default to 64mb buffers. This comes into play when writing per-window files.
- // The first 20 files from a single WriteFiles transform will write files inline in the
- // transform. Anything beyond that might be shuffled.
- // Keep in mind that specific runners may decide to run multiple bundles in parallel, based on
- // their own policy.
- private static final int DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE = 20;
-
- // When we spill records, shard the output keys to prevent hotspots.
- // We could consider making this a parameter.
- private static final int SPILLED_RECORD_SHARDING_FACTOR = 10;
-
static final int UNKNOWN_SHARDNUM = -1;
- private FileBasedSink<OutputT, DestinationT> sink;
- private SerializableFunction<UserT, OutputT> formatFunction;
- private WriteOperation<OutputT, DestinationT> writeOperation;
+ private FileBasedSink<T> sink;
+ private WriteOperation<T> writeOperation;
// This allows the number of shards to be dynamically computed based on the input
// PCollection.
- @Nullable private final PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards;
+ @Nullable
+ private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
// We don't use a side input for static sharding, as we want this value to be updatable
// when a pipeline is updated.
@Nullable
private final ValueProvider<Integer> numShardsProvider;
private final boolean windowedWrites;
- private int maxNumWritersPerBundle;
/**
* Creates a {@link WriteFiles} transform that writes to the given {@link FileBasedSink}, letting
* the runner control how many different shards are produced.
*/
- public static <UserT, DestinationT, OutputT> WriteFiles<UserT, DestinationT, OutputT> to(
- FileBasedSink<OutputT, DestinationT> sink,
- SerializableFunction<UserT, OutputT> formatFunction) {
+ public static <T> WriteFiles<T> to(FileBasedSink<T> sink) {
checkNotNull(sink, "sink");
- return new WriteFiles<>(
- sink,
- formatFunction,
- null /* runner-determined sharding */,
- null,
- false,
- DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE);
+ return new WriteFiles<>(sink, null /* runner-determined sharding */, null, false);
}
private WriteFiles(
- FileBasedSink<OutputT, DestinationT> sink,
- SerializableFunction<UserT, OutputT> formatFunction,
- @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards,
+ FileBasedSink<T> sink,
+ @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards,
@Nullable ValueProvider<Integer> numShardsProvider,
- boolean windowedWrites,
- int maxNumWritersPerBundle) {
+ boolean windowedWrites) {
this.sink = sink;
- this.formatFunction = checkNotNull(formatFunction);
this.computeNumShards = computeNumShards;
this.numShardsProvider = numShardsProvider;
this.windowedWrites = windowedWrites;
- this.maxNumWritersPerBundle = maxNumWritersPerBundle;
}
@Override
- public PDone expand(PCollection<UserT> input) {
+ public PDone expand(PCollection<T> input) {
if (input.isBounded() == IsBounded.UNBOUNDED) {
checkArgument(windowedWrites,
"Must use windowed writes when applying %s to an unbounded PCollection",
@@ -199,16 +159,13 @@ public class WriteFiles<UserT, DestinationT, OutputT>
}
}
- /** Returns the {@link FileBasedSink} associated with this PTransform. */
- public FileBasedSink<OutputT, DestinationT> getSink() {
+ /**
+ * Returns the {@link FileBasedSink} associated with this PTransform.
+ */
+ public FileBasedSink<T> getSink() {
return sink;
}
- /** Returns the the format function that maps the user type to the record written to files. */
- public SerializableFunction<UserT, OutputT> getFormatFunction() {
- return formatFunction;
- }
-
/**
* Returns whether or not to perform windowed writes.
*/
@@ -223,7 +180,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
* #withRunnerDeterminedSharding()}.
*/
@Nullable
- public PTransform<PCollection<UserT>, PCollectionView<Integer>> getSharding() {
+ public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
return computeNumShards;
}
@@ -241,7 +198,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
* <p>A value less than or equal to 0 will be equivalent to the default behavior of
* runner-determined sharding.
*/
- public WriteFiles<UserT, DestinationT, OutputT> withNumShards(int numShards) {
+ public WriteFiles<T> withNumShards(int numShards) {
if (numShards > 0) {
return withNumShards(StaticValueProvider.of(numShards));
}
@@ -255,27 +212,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
* <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for
* more information.
*/
- public WriteFiles<UserT, DestinationT, OutputT> withNumShards(
- ValueProvider<Integer> numShardsProvider) {
- return new WriteFiles<>(
- sink,
- formatFunction,
- computeNumShards,
- numShardsProvider,
- windowedWrites,
- maxNumWritersPerBundle);
- }
-
- /** Set the maximum number of writers created in a bundle before spilling to shuffle. */
- public WriteFiles<UserT, DestinationT, OutputT> withMaxNumWritersPerBundle(
- int maxNumWritersPerBundle) {
- return new WriteFiles<>(
- sink,
- formatFunction,
- computeNumShards,
- numShardsProvider,
- windowedWrites,
- maxNumWritersPerBundle);
+ public WriteFiles<T> withNumShards(ValueProvider<Integer> numShardsProvider) {
+ return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites);
}
/**
@@ -285,169 +223,127 @@ public class WriteFiles<UserT, DestinationT, OutputT>
* <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for
* more information.
*/
- public WriteFiles<UserT, DestinationT, OutputT> withSharding(
- PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) {
+ public WriteFiles<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
checkNotNull(
sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
- return new WriteFiles<>(
- sink, formatFunction, sharding, null, windowedWrites, maxNumWritersPerBundle);
+ return new WriteFiles<>(sink, sharding, null, windowedWrites);
}
/**
* Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} with
* runner-determined sharding.
*/
- public WriteFiles<UserT, DestinationT, OutputT> withRunnerDeterminedSharding() {
- return new WriteFiles<>(
- sink, formatFunction, null, null, windowedWrites, maxNumWritersPerBundle);
+ public WriteFiles<T> withRunnerDeterminedSharding() {
+ return new WriteFiles<>(sink, null, null, windowedWrites);
}
/**
* Returns a new {@link WriteFiles} that writes preserves windowing on it's input.
*
- * <p>If this option is not specified, windowing and triggering are replaced by {@link
- * GlobalWindows} and {@link DefaultTrigger}.
+ * <p>If this option is not specified, windowing and triggering are replaced by
+ * {@link GlobalWindows} and {@link DefaultTrigger}.
*
- * <p>If there is no data for a window, no output shards will be generated for that window. If a
- * window triggers multiple times, then more than a single output shard might be generated
- * multiple times; it's up to the sink implementation to keep these output shards unique.
+ * <p>If there is no data for a window, no output shards will be generated for that window.
+ * If a window triggers multiple times, then more than a single output shard might be
+ * generated multiple times; it's up to the sink implementation to keep these output shards
+ * unique.
*
- * <p>This option can only be used if {@link #withNumShards(int)} is also set to a positive value.
+ * <p>This option can only be used if {@link #withNumShards(int)} is also set to a
+ * positive value.
*/
- public WriteFiles<UserT, DestinationT, OutputT> withWindowedWrites() {
- return new WriteFiles<>(
- sink, formatFunction, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle);
+ public WriteFiles<T> withWindowedWrites() {
+ return new WriteFiles<>(sink, computeNumShards, numShardsProvider, true);
}
- private static class WriterKey<DestinationT> {
- private final BoundedWindow window;
- private final PaneInfo paneInfo;
- private final DestinationT destination;
+ /**
+ * Writes all the elements in a bundle using a {@link Writer} produced by the
+ * {@link WriteOperation} associated with the {@link FileBasedSink} with windowed writes enabled.
+ */
+ private class WriteWindowedBundles extends DoFn<T, FileResult> {
+ private Map<KV<BoundedWindow, PaneInfo>, Writer<T>> windowedWriters;
- WriterKey(BoundedWindow window, PaneInfo paneInfo, DestinationT destination) {
- this.window = window;
- this.paneInfo = paneInfo;
- this.destination = destination;
+ @StartBundle
+ public void startBundle(StartBundleContext c) {
+ // Reset state in case of reuse. We need to make sure that each bundle gets unique writers.
+ windowedWriters = Maps.newHashMap();
}
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof WriterKey)) {
- return false;
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+ PaneInfo paneInfo = c.pane();
+ Writer<T> writer;
+ // If we are doing windowed writes, we need to ensure that we have separate files for
+ // data in different windows/panes.
+ KV<BoundedWindow, PaneInfo> key = KV.of(window, paneInfo);
+ writer = windowedWriters.get(key);
+ if (writer == null) {
+ String uuid = UUID.randomUUID().toString();
+ LOG.info(
+ "Opening writer {} for write operation {}, window {} pane {}",
+ uuid,
+ writeOperation,
+ window,
+ paneInfo);
+ writer = writeOperation.createWriter();
+ writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM);
+ windowedWriters.put(key, writer);
+ LOG.debug("Done opening writer");
}
- WriterKey other = (WriterKey) o;
- return Objects.equal(window, other.window)
- && Objects.equal(paneInfo, other.paneInfo)
- && Objects.equal(destination, other.destination);
+
+ writeOrClose(writer, c.element());
}
- @Override
- public int hashCode() {
- return Objects.hashCode(window, paneInfo, destination);
+ @FinishBundle
+ public void finishBundle(FinishBundleContext c) throws Exception {
+ for (Map.Entry<KV<BoundedWindow, PaneInfo>, Writer<T>> entry : windowedWriters.entrySet()) {
+ FileResult result = entry.getValue().close();
+ BoundedWindow window = entry.getKey().getKey();
+ c.output(result, window.maxTimestamp(), window);
+ }
}
- }
- // Hash the destination in a manner that we can then use as a key in a GBK. Since Java's
- // hashCode isn't guaranteed to be stable across machines, we instead serialize the destination
- // and use murmur3_32 to hash it. We enforce that destinationCoder must be deterministic, so
- // this can be used as a key.
- private static <DestinationT> int hashDestination(
- DestinationT destination, Coder<DestinationT> destinationCoder) throws IOException {
- return Hashing.murmur3_32()
- .hashBytes(CoderUtils.encodeToByteArray(destinationCoder, destination))
- .asInt();
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.delegate(WriteFiles.this);
+ }
}
/**
- * Writes all the elements in a bundle using a {@link Writer} produced by the {@link
- * WriteOperation} associated with the {@link FileBasedSink}.
+ * Writes all the elements in a bundle using a {@link Writer} produced by the
+ * {@link WriteOperation} associated with the {@link FileBasedSink} with windowed writes disabled.
*/
- private class WriteBundles extends DoFn<UserT, FileResult<DestinationT>> {
- private final TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag;
- private final Coder<DestinationT> destinationCoder;
- private final boolean windowedWrites;
-
- private Map<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> writers;
- private int spilledShardNum = UNKNOWN_SHARDNUM;
-
- WriteBundles(
- boolean windowedWrites,
- TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag,
- Coder<DestinationT> destinationCoder) {
- this.windowedWrites = windowedWrites;
- this.unwrittenRecordsTag = unwrittenRecordsTag;
- this.destinationCoder = destinationCoder;
- }
+ private class WriteUnwindowedBundles extends DoFn<T, FileResult> {
+ // Writer that will write the records in this bundle. Lazily
+ // initialized in processElement.
+ private Writer<T> writer = null;
+ private BoundedWindow window = null;
@StartBundle
public void startBundle(StartBundleContext c) {
// Reset state in case of reuse. We need to make sure that each bundle gets unique writers.
- writers = Maps.newHashMap();
+ writer = null;
}
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
- PaneInfo paneInfo = c.pane();
- // If we are doing windowed writes, we need to ensure that we have separate files for
- // data in different windows/panes. Similar for dynamic writes, make sure that different
- // destinations go to different writers.
- // In the case of unwindowed writes, the window and the pane will always be the same, and
- // the map will only have a single element.
- DestinationT destination = sink.getDynamicDestinations().getDestination(c.element());
- WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), destination);
- Writer<OutputT, DestinationT> writer = writers.get(key);
+ // Cache a single writer for the bundle.
if (writer == null) {
- if (writers.size() <= maxNumWritersPerBundle) {
- String uuid = UUID.randomUUID().toString();
- LOG.info(
- "Opening writer {} for write operation {}, window {} pane {} destination {}",
- uuid,
- writeOperation,
- window,
- paneInfo,
- destination);
- writer = writeOperation.createWriter();
- if (windowedWrites) {
- writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM, destination);
- } else {
- writer.openUnwindowed(uuid, UNKNOWN_SHARDNUM, destination);
- }
- writers.put(key, writer);
- LOG.debug("Done opening writer");
- } else {
- if (spilledShardNum == UNKNOWN_SHARDNUM) {
- // Cache the random value so we only call ThreadLocalRandom once per DoFn instance.
- spilledShardNum = ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR);
- } else {
- spilledShardNum = (spilledShardNum + 1) % SPILLED_RECORD_SHARDING_FACTOR;
- }
- c.output(
- unwrittenRecordsTag,
- KV.of(
- ShardedKey.of(hashDestination(destination, destinationCoder), spilledShardNum),
- c.element()));
- return;
- }
+ LOG.info("Opening writer for write operation {}", writeOperation);
+ writer = writeOperation.createWriter();
+ writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
+ LOG.debug("Done opening writer");
}
- writeOrClose(writer, formatFunction.apply(c.element()));
+ this.window = window;
+ writeOrClose(this.writer, c.element());
}
@FinishBundle
public void finishBundle(FinishBundleContext c) throws Exception {
- for (Map.Entry<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> entry :
- writers.entrySet()) {
- Writer<OutputT, DestinationT> writer = entry.getValue();
- FileResult<DestinationT> result;
- try {
- result = writer.close();
- } catch (Exception e) {
- // If anything goes wrong, make sure to delete the temporary file.
- writer.cleanup();
- throw e;
- }
- BoundedWindow window = entry.getKey().window;
- c.output(result, window.maxTimestamp(), window);
+ if (writer == null) {
+ return;
}
+ FileResult result = writer.close();
+ c.output(result, window.maxTimestamp(), window);
}
@Override
@@ -456,62 +352,38 @@ public class WriteFiles<UserT, DestinationT, OutputT>
}
}
- enum ShardAssignment { ASSIGN_IN_FINALIZE, ASSIGN_WHEN_WRITING }
-
- /*
- * Like {@link WriteBundles}, but where the elements for each shard have been collected into a
- * single iterable.
+ /**
+ * Like {@link WriteWindowedBundles} and {@link WriteUnwindowedBundles}, but where the elements
+ * for each shard have been collected into a single iterable.
*/
- private class WriteShardedBundles
- extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> {
- ShardAssignment shardNumberAssignment;
- WriteShardedBundles(ShardAssignment shardNumberAssignment) {
- this.shardNumberAssignment = shardNumberAssignment;
- }
-
+ private class WriteShardedBundles extends DoFn<KV<Integer, Iterable<T>>, FileResult> {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
- // Since we key by a 32-bit hash of the destination, there might be multiple destinations
- // in this iterable. The number of destinations is generally very small (1000s or less), so
- // there will rarely be hash collisions.
- Map<DestinationT, Writer<OutputT, DestinationT>> writers = Maps.newHashMap();
- for (UserT input : c.element().getValue()) {
- DestinationT destination = sink.getDynamicDestinations().getDestination(input);
- Writer<OutputT, DestinationT> writer = writers.get(destination);
- if (writer == null) {
- LOG.debug("Opening writer for write operation {}", writeOperation);
- writer = writeOperation.createWriter();
- if (windowedWrites) {
- int shardNumber =
- shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
- ? c.element().getKey().getShardNumber()
- : UNKNOWN_SHARDNUM;
- writer.openWindowed(
- UUID.randomUUID().toString(), window, c.pane(), shardNumber, destination);
- } else {
- writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, destination);
- }
- LOG.debug("Done opening writer");
- writers.put(destination, writer);
- }
- writeOrClose(writer, formatFunction.apply(input));
- }
+ // In a sharded write, single input element represents one shard. We can open and close
+ // the writer in each call to processElement.
+ LOG.info("Opening writer for write operation {}", writeOperation);
+ Writer<T> writer = writeOperation.createWriter();
+ if (windowedWrites) {
+ writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey());
+ } else {
+ writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
+ }
+ LOG.debug("Done opening writer");
- // Close all writers.
- for (Map.Entry<DestinationT, Writer<OutputT, DestinationT>> entry : writers.entrySet()) {
- Writer<OutputT, DestinationT> writer = entry.getValue();
- FileResult<DestinationT> result;
- try {
- // Close the writer; if this throws let the error propagate.
- result = writer.close();
- c.output(result);
- } catch (Exception e) {
- // If anything goes wrong, make sure to delete the temporary file.
- writer.cleanup();
- throw e;
+ try {
+ for (T t : c.element().getValue()) {
+ writeOrClose(writer, t);
}
+
+ // Close the writer; if this throws let the error propagate.
+ FileResult result = writer.close();
+ c.output(result);
+ } catch (Exception e) {
+ // If anything goes wrong, make sure to delete the temporary file.
+ writer.cleanup();
+ throw e;
}
- }
+ }
@Override
public void populateDisplayData(DisplayData.Builder builder) {
@@ -519,15 +391,12 @@ public class WriteFiles<UserT, DestinationT, OutputT>
}
}
- private static <OutputT, DestinationT> void writeOrClose(
- Writer<OutputT, DestinationT> writer, OutputT t) throws Exception {
+ private static <T> void writeOrClose(Writer<T> writer, T t) throws Exception {
try {
writer.write(t);
} catch (Exception e) {
try {
writer.close();
- // If anything goes wrong, make sure to delete the temporary file.
- writer.cleanup();
} catch (Exception closeException) {
if (closeException instanceof InterruptedException) {
// Do not silently ignore interrupted state.
@@ -540,25 +409,20 @@ public class WriteFiles<UserT, DestinationT, OutputT>
}
}
- private class ApplyShardingKey extends DoFn<UserT, KV<ShardedKey<Integer>, UserT>> {
+ private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
private final PCollectionView<Integer> numShardsView;
private final ValueProvider<Integer> numShardsProvider;
- private final Coder<DestinationT> destinationCoder;
-
private int shardNumber;
- ApplyShardingKey(
- PCollectionView<Integer> numShardsView,
- ValueProvider<Integer> numShardsProvider,
- Coder<DestinationT> destinationCoder) {
- this.destinationCoder = destinationCoder;
+ ApplyShardingKey(PCollectionView<Integer> numShardsView,
+ ValueProvider<Integer> numShardsProvider) {
this.numShardsView = numShardsView;
this.numShardsProvider = numShardsProvider;
shardNumber = UNKNOWN_SHARDNUM;
}
@ProcessElement
- public void processElement(ProcessContext context) throws IOException {
+ public void processElement(ProcessContext context) {
final int shardCount;
if (numShardsView != null) {
shardCount = context.sideInput(numShardsView);
@@ -578,110 +442,65 @@ public class WriteFiles<UserT, DestinationT, OutputT>
} else {
shardNumber = (shardNumber + 1) % shardCount;
}
- // We avoid using destination itself as a sharding key, because destination is often large.
- // e.g. when using {@link DefaultFilenamePolicy}, the destination contains the entire path
- // to the file. Often most of the path is constant across all destinations, just the path
- // suffix is appended by the destination function. Instead we key by a 32-bit hash (carefully
- // chosen to be guaranteed stable), and call getDestination again in the next ParDo to resolve
- // the destinations. This does mean that multiple destinations might end up on the same shard,
- // however the number of collisions should be small, so there's no need to worry about memory
- // issues.
- DestinationT destination = sink.getDynamicDestinations().getDestination(context.element());
- context.output(
- KV.of(
- ShardedKey.of(hashDestination(destination, destinationCoder), shardNumber),
- context.element()));
+ context.output(KV.of(shardNumber, context.element()));
}
}
/**
* A write is performed as sequence of three {@link ParDo}'s.
*
- * <p>This singleton collection containing the WriteOperation is then used as a side input to a
- * ParDo over the PCollection of elements to write. In this bundle-writing phase, {@link
- * WriteOperation#createWriter} is called to obtain a {@link Writer}. {@link Writer#open} and
- * {@link Writer#close} are called in {@link DoFn.StartBundle} and {@link DoFn.FinishBundle},
- * respectively, and {@link Writer#write} method is called for every element in the bundle. The
- * output of this ParDo is a PCollection of <i>writer result</i> objects (see {@link
- * FileBasedSink} for a description of writer results)-one for each bundle.
+ * <p>This singleton collection containing the WriteOperation is then used as a side
+ * input to a ParDo over the PCollection of elements to write. In this bundle-writing phase,
+ * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
+ * {@link Writer#open} and {@link Writer#close} are called in
+ * {@link DoFn.StartBundle} and {@link DoFn.FinishBundle}, respectively, and
+ * {@link Writer#write} method is called for every element in the bundle. The output
+ * of this ParDo is a PCollection of <i>writer result</i> objects (see {@link FileBasedSink}
+ * for a description of writer results)-one for each bundle.
*
* <p>The final do-once ParDo uses a singleton collection asinput and the collection of writer
- * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called to finalize
- * the write.
+ * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called
+ * to finalize the write.
*
- * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
- * before the exception that caused the write to fail is propagated and the write result will be
- * discarded.
+ * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be
+ * called before the exception that caused the write to fail is propagated and the write result
+ * will be discarded.
*
* <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
* deserialized in the bundle-writing and finalization phases, any state change to the
- * WriteOperation object that occurs during initialization is visible in the latter phases.
- * However, the WriteOperation is not serialized after the bundle-writing phase. This is why
- * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
- * WriteOperation).
+ * WriteOperation object that occurs during initialization is visible in the latter
+ * phases. However, the WriteOperation is not serialized after the bundle-writing
+ * phase. This is why implementations should guarantee that
+ * {@link WriteOperation#createWriter} does not mutate WriteOperation).
*/
- private PDone createWrite(PCollection<UserT> input) {
+ private PDone createWrite(PCollection<T> input) {
Pipeline p = input.getPipeline();
if (!windowedWrites) {
// Re-window the data into the global window and remove any existing triggers.
input =
input.apply(
- Window.<UserT>into(new GlobalWindows())
+ Window.<T>into(new GlobalWindows())
.triggering(DefaultTrigger.of())
.discardingFiredPanes());
}
+
// Perform the per-bundle writes as a ParDo on the input PCollection (with the
// WriteOperation as a side input) and collect the results of the writes in a
// PCollection. There is a dependency between this ParDo and the first (the
// WriteOperation PCollection as a side input), so this will happen after the
// initial ParDo.
- PCollection<FileResult<DestinationT>> results;
+ PCollection<FileResult> results;
final PCollectionView<Integer> numShardsView;
- @SuppressWarnings("unchecked")
Coder<BoundedWindow> shardedWindowCoder =
(Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
- final Coder<DestinationT> destinationCoder;
- try {
- destinationCoder =
- sink.getDynamicDestinations()
- .getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry());
- destinationCoder.verifyDeterministic();
- } catch (CannotProvideCoderException | NonDeterministicException e) {
- throw new RuntimeException(e);
- }
-
if (computeNumShards == null && numShardsProvider == null) {
numShardsView = null;
- TupleTag<FileResult<DestinationT>> writtenRecordsTag = new TupleTag<>("writtenRecordsTag");
- TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittedRecordsTag =
- new TupleTag<>("unwrittenRecordsTag");
- String writeName = windowedWrites ? "WriteWindowedBundles" : "WriteBundles";
- PCollectionTuple writeTuple =
- input.apply(
- writeName,
- ParDo.of(new WriteBundles(windowedWrites, unwrittedRecordsTag, destinationCoder))
- .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag)));
- PCollection<FileResult<DestinationT>> writtenBundleFiles =
- writeTuple
- .get(writtenRecordsTag)
- .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
- // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
- // finalize to stay consistent with what WriteWindowedBundles does.
- PCollection<FileResult<DestinationT>> writtenGroupedFiles =
- writeTuple
- .get(unwrittedRecordsTag)
- .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
- .apply("GroupUnwritten", GroupByKey.<ShardedKey<Integer>, UserT>create())
- .apply(
- "WriteUnwritten",
- ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE)))
- .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
results =
- PCollectionList.of(writtenBundleFiles)
- .and(writtenGroupedFiles)
- .apply(Flatten.<FileResult<DestinationT>>pCollections());
+ input.apply(
+ "WriteBundles",
+ ParDo.of(windowedWrites ? new WriteWindowedBundles() : new WriteUnwindowedBundles()));
} else {
List<PCollectionView<?>> sideInputs = Lists.newArrayList();
if (computeNumShards != null) {
@@ -690,31 +509,20 @@ public class WriteFiles<UserT, DestinationT, OutputT>
} else {
numShardsView = null;
}
- PCollection<KV<ShardedKey<Integer>, Iterable<UserT>>> sharded =
+
+ PCollection<KV<Integer, Iterable<T>>> sharded =
input
- .apply(
- "ApplyShardLabel",
- ParDo.of(
- new ApplyShardingKey(
- numShardsView,
- (numShardsView != null) ? null : numShardsProvider,
- destinationCoder))
- .withSideInputs(sideInputs))
- .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
- .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create());
+ .apply("ApplyShardLabel", ParDo.of(
+ new ApplyShardingKey<T>(numShardsView,
+ (numShardsView != null) ? null : numShardsProvider))
+ .withSideInputs(sideInputs))
+ .apply("GroupIntoShards", GroupByKey.<Integer, T>create());
shardedWindowCoder =
(Coder<BoundedWindow>) sharded.getWindowingStrategy().getWindowFn().windowCoder();
- // Since this path might be used by streaming runners processing triggers, it's important
- // to assign shard numbers here so that they are deterministic. The ASSIGN_IN_FINALIZE
- // strategy works by sorting all FileResult objects and assigning them numbers, which is not
- // guaranteed to work well when processing triggers - if the finalize step retries it might
- // see a different Iterable of FileResult objects, and it will assign different shard numbers.
- results =
- sharded.apply(
- "WriteShardedBundles",
- ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING)));
+
+ results = sharded.apply("WriteShardedBundles", ParDo.of(new WriteShardedBundles()));
}
- results.setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
+ results.setCoder(FileResultCoder.of(shardedWindowCoder));
if (windowedWrites) {
// When processing streaming windowed writes, results will arrive multiple times. This
@@ -722,31 +530,26 @@ public class WriteFiles<UserT, DestinationT, OutputT>
// as new data arriving into a side input does not trigger the listening DoFn. Instead
// we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered
// whenever new data arrives.
- PCollection<KV<Void, FileResult<DestinationT>>> keyedResults =
- results.apply(
- "AttachSingletonKey", WithKeys.<Void, FileResult<DestinationT>>of((Void) null));
- keyedResults.setCoder(
- KvCoder.of(VoidCoder.of(), FileResultCoder.of(shardedWindowCoder, destinationCoder)));
+ PCollection<KV<Void, FileResult>> keyedResults =
+ results.apply("AttachSingletonKey", WithKeys.<Void, FileResult>of((Void) null));
+ keyedResults.setCoder(KvCoder.of(VoidCoder.of(),
+ FileResultCoder.of(shardedWindowCoder)));
// Is the continuation trigger sufficient?
keyedResults
- .apply("FinalizeGroupByKey", GroupByKey.<Void, FileResult<DestinationT>>create())
- .apply(
- "Finalize",
- ParDo.of(
- new DoFn<KV<Void, Iterable<FileResult<DestinationT>>>, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- LOG.info("Finalizing write operation {}.", writeOperation);
- List<FileResult<DestinationT>> results =
- Lists.newArrayList(c.element().getValue());
- writeOperation.finalize(results);
- LOG.debug("Done finalizing write operation");
- }
- }));
+ .apply("FinalizeGroupByKey", GroupByKey.<Void, FileResult>create())
+ .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<FileResult>>, Integer>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("Finalizing write operation {}.", writeOperation);
+ List<FileResult> results = Lists.newArrayList(c.element().getValue());
+ writeOperation.finalize(results);
+ LOG.debug("Done finalizing write operation");
+ }
+ }));
} else {
- final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
- results.apply(View.<FileResult<DestinationT>>asIterable());
+ final PCollectionView<Iterable<FileResult>> resultsView =
+ results.apply(View.<FileResult>asIterable());
ImmutableList.Builder<PCollectionView<?>> sideInputs =
ImmutableList.<PCollectionView<?>>builder().add(resultsView);
if (numShardsView != null) {
@@ -762,53 +565,41 @@ public class WriteFiles<UserT, DestinationT, OutputT>
// set numShards, then all shards will be written out as empty files. For this reason we
// use a side input here.
PCollection<Void> singletonCollection = p.apply(Create.of((Void) null));
- singletonCollection.apply(
- "Finalize",
- ParDo.of(
- new DoFn<Void, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- LOG.info("Finalizing write operation {}.", writeOperation);
- List<FileResult<DestinationT>> results =
- Lists.newArrayList(c.sideInput(resultsView));
- LOG.debug(
- "Side input initialized to finalize write operation {}.", writeOperation);
-
- // We must always output at least 1 shard, and honor user-specified numShards
- // if
- // set.
- int minShardsNeeded;
- if (numShardsView != null) {
- minShardsNeeded = c.sideInput(numShardsView);
- } else if (numShardsProvider != null) {
- minShardsNeeded = numShardsProvider.get();
- } else {
- minShardsNeeded = 1;
- }
- int extraShardsNeeded = minShardsNeeded - results.size();
- if (extraShardsNeeded > 0) {
- LOG.info(
- "Creating {} empty output shards in addition to {} written "
- + "for a total of {}.",
- extraShardsNeeded,
- results.size(),
- minShardsNeeded);
- for (int i = 0; i < extraShardsNeeded; ++i) {
- Writer<OutputT, DestinationT> writer = writeOperation.createWriter();
- writer.openUnwindowed(
- UUID.randomUUID().toString(),
- UNKNOWN_SHARDNUM,
- sink.getDynamicDestinations().getDefaultDestination());
- FileResult<DestinationT> emptyWrite = writer.close();
- results.add(emptyWrite);
- }
- LOG.debug("Done creating extra shards.");
- }
- writeOperation.finalize(results);
- LOG.debug("Done finalizing write operation {}", writeOperation);
- }
- })
- .withSideInputs(sideInputs.build()));
+ singletonCollection
+ .apply("Finalize", ParDo.of(new DoFn<Void, Integer>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("Finalizing write operation {}.", writeOperation);
+ List<FileResult> results = Lists.newArrayList(c.sideInput(resultsView));
+ LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
+
+ // We must always output at least 1 shard, and honor user-specified numShards if
+ // set.
+ int minShardsNeeded;
+ if (numShardsView != null) {
+ minShardsNeeded = c.sideInput(numShardsView);
+ } else if (numShardsProvider != null) {
+ minShardsNeeded = numShardsProvider.get();
+ } else {
+ minShardsNeeded = 1;
+ }
+ int extraShardsNeeded = minShardsNeeded - results.size();
+ if (extraShardsNeeded > 0) {
+ LOG.info(
+ "Creating {} empty output shards in addition to {} written for a total of {}.",
+ extraShardsNeeded, results.size(), minShardsNeeded);
+ for (int i = 0; i < extraShardsNeeded; ++i) {
+ Writer<T> writer = writeOperation.createWriter();
+ writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
+ FileResult emptyWrite = writer.close();
+ results.add(emptyWrite);
+ }
+ LOG.debug("Done creating extra shards.");
+ }
+ writeOperation.finalize(results);
+ LOG.debug("Done finalizing write operation {}", writeOperation);
+ }
+ }).withSideInputs(sideInputs.build()));
}
return PDone.in(input.getPipeline());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
index b889ec7..99717a4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
@@ -71,10 +71,6 @@ public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> {
"Trying to return record which is before the last-returned record");
if (position == null) {
- LOG.info(
- "Adjusting range start from {} to {} as position of first returned record",
- range.getStartKey(),
- recordStart);
range = range.withStartKey(recordStart);
}
position = recordStart;
@@ -91,15 +87,6 @@ public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> {
@Override
public synchronized boolean trySplitAtPosition(ByteKey splitPosition) {
- // Sanity check.
- if (!range.containsKey(splitPosition)) {
- LOG.warn(
- "{}: Rejecting split request at {} because it is not within the range.",
- this,
- splitPosition);
- return false;
- }
-
// Unstarted.
if (position == null) {
LOG.warn(
@@ -119,6 +106,15 @@ public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> {
return false;
}
+ // Sanity check.
+ if (!range.containsKey(splitPosition)) {
+ LOG.warn(
+ "{}: Rejecting split request at {} because it is not within the range.",
+ this,
+ splitPosition);
+ return false;
+ }
+
range = range.withEndKey(splitPosition);
return true;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java
deleted file mode 100644
index d3bff37..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.range;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
-
-/** A restriction represented by a range of integers [from, to). */
-public class OffsetRange
- implements Serializable,
- HasDefaultTracker<
- OffsetRange, org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker> {
- private final long from;
- private final long to;
-
- public OffsetRange(long from, long to) {
- checkArgument(from <= to, "Malformed range [%s, %s)", from, to);
- this.from = from;
- this.to = to;
- }
-
- public long getFrom() {
- return from;
- }
-
- public long getTo() {
- return to;
- }
-
- @Override
- public org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker newTracker() {
- return new org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker(this);
- }
-
- @Override
- public String toString() {
- return "[" + from + ", " + to + ')';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- OffsetRange that = (OffsetRange) o;
-
- if (from != that.from) {
- return false;
- }
- return to == that.to;
- }
-
- @Override
- public int hashCode() {
- int result = (int) (from ^ (from >>> 32));
- result = 31 * result + (int) (to ^ (to >>> 32));
- return result;
- }
-
- public List<OffsetRange> split(long desiredNumOffsetsPerSplit, long minNumOffsetPerSplit) {
- List<OffsetRange> res = new ArrayList<>();
- long start = getFrom();
- long maxEnd = getTo();
-
- while (start < maxEnd) {
- long end = start + desiredNumOffsetsPerSplit;
- end = Math.min(end, maxEnd);
- // Avoid having a too small range at the end and ensure that we respect minNumOffsetPerSplit.
- long remaining = maxEnd - end;
- if ((remaining < desiredNumOffsetsPerSplit / 4) || (remaining < minNumOffsetPerSplit)) {
- end = maxEnd;
- }
- res.add(new OffsetRange(start, end));
- start = end;
- }
- return res;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
index 8f0083e..51e2b1a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
@@ -26,9 +26,6 @@ import org.slf4j.LoggerFactory;
/**
* A {@link RangeTracker} for non-negative positions of type {@code long}.
- *
- * <p>Not to be confused with {@link
- * org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker}.
*/
public class OffsetRangeTracker implements RangeTracker<Long> {
private static final Logger LOG = LoggerFactory.getLogger(OffsetRangeTracker.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
index d7e6cc8..c0990cb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
@@ -184,20 +184,18 @@ public class PipelineOptionsFactory {
private final String[] args;
private final boolean validation;
private final boolean strictParsing;
- private final boolean isCli;
// Do not allow direct instantiation
private Builder() {
- this(null, false, true, false);
+ this(null, false, true);
}
private Builder(String[] args, boolean validation,
- boolean strictParsing, boolean isCli) {
+ boolean strictParsing) {
this.defaultAppName = findCallersClassName();
this.args = args;
this.validation = validation;
this.strictParsing = strictParsing;
- this.isCli = isCli;
}
/**
@@ -239,7 +237,7 @@ public class PipelineOptionsFactory {
*/
public Builder fromArgs(String... args) {
checkNotNull(args, "Arguments should not be null.");
- return new Builder(args, validation, strictParsing, true);
+ return new Builder(args, validation, strictParsing);
}
/**
@@ -249,7 +247,7 @@ public class PipelineOptionsFactory {
* validation.
*/
public Builder withValidation() {
- return new Builder(args, true, strictParsing, isCli);
+ return new Builder(args, true, strictParsing);
}
/**
@@ -257,7 +255,7 @@ public class PipelineOptionsFactory {
* arguments.
*/
public Builder withoutStrictParsing() {
- return new Builder(args, validation, false, isCli);
+ return new Builder(args, validation, false);
}
/**
@@ -302,11 +300,7 @@ public class PipelineOptionsFactory {
}
if (validation) {
- if (isCli) {
- PipelineOptionsValidator.validateCli(klass, t);
- } else {
- PipelineOptionsValidator.validate(klass, t);
- }
+ PipelineOptionsValidator.validate(klass, t);
}
return t;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsValidator.java
index fcffd74..bd54ec3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsValidator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsValidator.java
@@ -43,29 +43,9 @@ public class PipelineOptionsValidator {
*
* @param klass The interface to fetch validation criteria from.
* @param options The {@link PipelineOptions} to validate.
- * @return Validated options.
+ * @return The type
*/
public static <T extends PipelineOptions> T validate(Class<T> klass, PipelineOptions options) {
- return validate(klass, options, false);
- }
-
- /**
- * Validates that the passed {@link PipelineOptions} from command line interface (CLI)
- * conforms to all the validation criteria from the passed in interface.
- *
- * <p>Note that the interface requested must conform to the validation criteria specified on
- * {@link PipelineOptions#as(Class)}.
- *
- * @param klass The interface to fetch validation criteria from.
- * @param options The {@link PipelineOptions} to validate.
- * @return Validated options.
- */
- public static <T extends PipelineOptions> T validateCli(Class<T> klass, PipelineOptions options) {
- return validate(klass, options, true);
- }
-
- private static <T extends PipelineOptions> T validate(Class<T> klass, PipelineOptions options,
- boolean isCli) {
checkNotNull(klass);
checkNotNull(options);
checkArgument(Proxy.isProxyClass(options.getClass()));
@@ -87,15 +67,9 @@ public class PipelineOptionsValidator {
requiredGroups.put(requiredGroup, method);
}
} else {
- if (isCli) {
- checkArgument(handler.invoke(asClassOptions, method, null) != null,
- "Missing required value for [--%s, \"%s\"]. ",
- handler.getOptionName(method), getDescription(method));
- } else {
- checkArgument(handler.invoke(asClassOptions, method, null) != null,
- "Missing required value for [%s, \"%s\"]. ",
- method, getDescription(method));
- }
+ checkArgument(handler.invoke(asClassOptions, method, null) != null,
+ "Missing required value for [%s, \"%s\"]. ",
+ method, getDescription(method));
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index 926a7b9..eda21a8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -45,8 +45,6 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.MutableClassToInstanceMap;
import java.beans.PropertyDescriptor;
import java.io.IOException;
-import java.io.NotSerializableException;
-import java.io.Serializable;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
@@ -89,7 +87,7 @@ import org.apache.beam.sdk.util.common.ReflectHelpers;
* {@link PipelineOptions#as(Class)}.
*/
@ThreadSafe
-class ProxyInvocationHandler implements InvocationHandler, Serializable {
+class ProxyInvocationHandler implements InvocationHandler {
/**
* No two instances of this class are considered equivalent hence we generate a random hash code.
*/
@@ -166,21 +164,6 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
+ Arrays.toString(args) + "].");
}
- public String getOptionName(Method method) {
- return gettersToPropertyNames.get(method.getName());
- }
-
- private void writeObject(java.io.ObjectOutputStream stream)
- throws IOException {
- throw new NotSerializableException(
- "PipelineOptions objects are not serializable and should not be embedded into transforms "
- + "(did you capture a PipelineOptions object in a field or in an anonymous class?). "
- + "Instead, if you're using a DoFn, access PipelineOptions at runtime "
- + "via ProcessContext/StartBundleContext/FinishBundleContext.getPipelineOptions(), "
- + "or pre-extract necessary fields from PipelineOptions "
- + "at pipeline construction time.");
- }
-
/**
* Track whether options values are explicitly set, or retrieved from defaults.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index d8ff59e..2f0e8ef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -24,23 +24,21 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
@@ -70,7 +68,7 @@ public class TransformHierarchy {
producers = new HashMap<>();
producerInput = new HashMap<>();
unexpandedInputs = new HashMap<>();
- root = new Node();
+ root = new Node(null, null, "", null);
current = root;
}
@@ -145,6 +143,14 @@ public class TransformHierarchy {
Node producerNode = getProducer(inputValue);
PInput input = producerInput.remove(inputValue);
inputValue.finishSpecifying(input, producerNode.getTransform());
+ checkState(
+ producers.get(inputValue) != null,
+ "Producer unknown for input %s",
+ inputValue);
+ checkState(
+ producers.get(inputValue) != null,
+ "Producer unknown for input %s",
+ inputValue);
}
}
@@ -159,7 +165,7 @@ public class TransformHierarchy {
* nodes.
*/
public void setOutput(POutput output) {
- for (PCollection<?> value : fullyExpand(output).values()) {
+ for (PValue value : output.expand().values()) {
if (!producers.containsKey(value)) {
producers.put(value, current);
value.finishSpecifyingOutput(
@@ -193,13 +199,13 @@ public class TransformHierarchy {
}
Node getProducer(PValue produced) {
- return checkNotNull(producers.get(produced), "No producer found for %s", produced);
+ return producers.get(produced);
}
public Set<PValue> visit(PipelineVisitor visitor) {
finishSpecifying();
Set<PValue> visitedValues = new HashSet<>();
- root.visit(visitor, visitedValues, new HashSet<Node>(), new HashSet<Node>());
+ root.visit(visitor, visitedValues);
return visitedValues;
}
@@ -220,47 +226,6 @@ public class TransformHierarchy {
return current;
}
- private Map<TupleTag<?>, PCollection<?>> fullyExpand(POutput output) {
- Map<TupleTag<?>, PCollection<?>> result = new LinkedHashMap<>();
- for (Map.Entry<TupleTag<?>, PValue> value : output.expand().entrySet()) {
- if (value.getValue() instanceof PCollection) {
- PCollection<?> previous = result.put(value.getKey(), (PCollection<?>) value.getValue());
- checkArgument(
- previous == null,
- "Found conflicting %ss in flattened expansion of %s: %s maps to %s and %s",
- output,
- TupleTag.class.getSimpleName(),
- value.getKey(),
- previous,
- value.getValue());
- } else {
- if (value.getValue().expand().size() == 1
- && Iterables.getOnlyElement(value.getValue().expand().values())
- .equals(value.getValue())) {
- throw new IllegalStateException(
- String.format(
- "Non %s %s that expands into itself %s",
- PCollection.class.getSimpleName(),
- PValue.class.getSimpleName(),
- value.getValue()));
- }
- for (Map.Entry<TupleTag<?>, PCollection<?>> valueComponent :
- fullyExpand(value.getValue()).entrySet()) {
- PCollection<?> previous = result.put(valueComponent.getKey(), valueComponent.getValue());
- checkArgument(
- previous == null,
- "Found conflicting %ss in flattened expansion of %s: %s maps to %s and %s",
- output,
- TupleTag.class.getSimpleName(),
- valueComponent.getKey(),
- previous,
- valueComponent.getValue());
- }
- }
- }
- return result;
- }
-
/**
* Provides internal tracking of transform relationships with helper methods
* for initialization and ordered visitation.
@@ -288,36 +253,25 @@ public class TransformHierarchy {
boolean finishedSpecifying = false;
/**
- * Creates the root-level node. The root level node has a null enclosing node, a null transform,
- * an empty map of inputs, and a name equal to the empty string.
- */
- private Node() {
- this.enclosingNode = null;
- this.transform = null;
- this.fullName = "";
- this.inputs = Collections.emptyMap();
- }
-
- /**
* Creates a new Node with the given parent and transform.
*
+ * <p>EnclosingNode and transform may both be null for a root-level node, which holds all other
+ * nodes.
+ *
* @param enclosingNode the composite node containing this node
* @param transform the PTransform tracked by this node
* @param fullName the fully qualified name of the transform
* @param input the unexpanded input to the transform
*/
private Node(
- Node enclosingNode,
- PTransform<?, ?> transform,
+ @Nullable Node enclosingNode,
+ @Nullable PTransform<?, ?> transform,
String fullName,
- PInput input) {
+ @Nullable PInput input) {
this.enclosingNode = enclosingNode;
this.transform = transform;
this.fullName = fullName;
- ImmutableMap.Builder<TupleTag<?>, PValue> inputs = ImmutableMap.builder();
- inputs.putAll(input.expand());
- inputs.putAll(transform.getAdditionalInputs());
- this.inputs = inputs.build();
+ this.inputs = input == null ? Collections.<TupleTag<?>, PValue>emptyMap() : input.expand();
}
/**
@@ -398,7 +352,7 @@ public class TransformHierarchy {
return fullName;
}
- /** Returns the transform input, in fully expanded form. */
+ /** Returns the transform input, in unexpanded form. */
public Map<TupleTag<?>, PValue> getInputs() {
return inputs == null ? Collections.<TupleTag<?>, PValue>emptyMap() : inputs;
}
@@ -505,60 +459,10 @@ public class TransformHierarchy {
/**
* Visit the transform node.
*
- * <p>The visit proceeds in the following order:
- *
- * <ul>
- * <li>Visit all input {@link PValue PValues} returned by the flattened expansion of {@link
- * Node#getInputs()}.
- * <li>If the node is a composite:
- * <ul>
- * <li>Enter the node via {@link PipelineVisitor#enterCompositeTransform(Node)}.
- * <li>If the result of {@link PipelineVisitor#enterCompositeTransform(Node)} was {@link
- * CompositeBehavior#ENTER_TRANSFORM}, visit each child node of this {@link Node}.
- * <li>Leave the node via {@link PipelineVisitor#leaveCompositeTransform(Node)}.
- * </ul>
- * <li>If the node is a primitive, visit it via {@link
- * PipelineVisitor#visitPrimitiveTransform(Node)}.
- * <li>Visit each {@link PValue} that was output by this node.
- * </ul>
- *
- * <p>Additionally, the following ordering restrictions are observed:
- *
- * <ul>
- * <li>A {@link Node} will be visited after its enclosing node has been entered and before its
- * enclosing node has been left
- * <li>A {@link Node} will not be visited if any enclosing {@link Node} has returned {@link
- * CompositeBehavior#DO_NOT_ENTER_TRANSFORM} from the call to {@link
- * PipelineVisitor#enterCompositeTransform(Node)}.
- * <li>A {@link PValue} will only be visited after the {@link Node} that originally produced
- * it has been visited.
- * </ul>
- *
* <p>Provides an ordered visit of the input values, the primitive transform (or child nodes for
* composite transforms), then the output values.
*/
- private void visit(
- PipelineVisitor visitor,
- Set<PValue> visitedValues,
- Set<Node> visitedNodes,
- Set<Node> skippedComposites) {
- if (getEnclosingNode() != null && !visitedNodes.contains(getEnclosingNode())) {
- // Recursively enter all enclosing nodes, as appropriate.
- getEnclosingNode().visit(visitor, visitedValues, visitedNodes, skippedComposites);
- }
- // These checks occur after visiting the enclosing node to ensure that if this node has been
- // visited while visiting the enclosing node the node is not revisited, or, if an enclosing
- // Node is skipped, this node is also skipped.
- if (!visitedNodes.add(this)) {
- LOG.debug("Not revisiting previously visited node {}", this);
- return;
- } else if (childNodeOf(skippedComposites)) {
- // This node is a child of a node that has been passed over via CompositeBehavior, and
- // should also be skipped. All child nodes of a skipped composite should always be skipped.
- LOG.debug("Not revisiting Node {} which is a child of a previously passed composite", this);
- return;
- }
-
+ private void visit(PipelineVisitor visitor, Set<PValue> visitedValues) {
if (!finishedSpecifying) {
finishSpecifying();
}
@@ -566,31 +470,22 @@ public class TransformHierarchy {
if (!isRootNode()) {
// Visit inputs.
for (PValue inputValue : inputs.values()) {
- Node valueProducer = getProducer(inputValue);
- if (!visitedNodes.contains(valueProducer)) {
- valueProducer.visit(visitor, visitedValues, visitedNodes, skippedComposites);
- }
if (visitedValues.add(inputValue)) {
- LOG.debug("Visiting input value {}", inputValue);
- visitor.visitValue(inputValue, valueProducer);
+ visitor.visitValue(inputValue, getProducer(inputValue));
}
}
}
if (isCompositeNode()) {
- LOG.debug("Visiting composite node {}", this);
PipelineVisitor.CompositeBehavior recurse = visitor.enterCompositeTransform(this);
if (recurse.equals(CompositeBehavior.ENTER_TRANSFORM)) {
for (Node child : parts) {
- child.visit(visitor, visitedValues, visitedNodes, skippedComposites);
+ child.visit(visitor, visitedValues);
}
- } else {
- skippedComposites.add(this);
}
visitor.leaveCompositeTransform(this);
} else {
- LOG.debug("Visiting primitive node {}", this);
visitor.visitPrimitiveTransform(this);
}
@@ -599,24 +494,12 @@ public class TransformHierarchy {
// Visit outputs.
for (PValue pValue : outputs.values()) {
if (visitedValues.add(pValue)) {
- LOG.debug("Visiting output value {}", pValue);
visitor.visitValue(pValue, this);
}
}
}
}
- private boolean childNodeOf(Set<Node> nodes) {
- if (isRootNode()) {
- return false;
- }
- Node parent = this.getEnclosingNode();
- while (!parent.isRootNode() && !nodes.contains(parent)) {
- parent = parent.getEnclosingNode();
- }
- return nodes.contains(parent);
- }
-
/**
* Finish specifying a transform.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
index eba6978..c11057a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
@@ -126,9 +126,4 @@ final class StaticWindows extends NonMergingWindowFn<Object, BoundedWindow> {
}
};
}
-
- @Override
- public boolean assignsToOneWindow() {
- return true;
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index d13fcf1..9ad8fd8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -271,18 +271,6 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
return events;
}
- /**
- * <b>For internal use only. No backwards-compatibility guarantees.</b>
- *
- * <p>Builder a test stream directly from events. No validation is performed on
- * watermark monotonicity, etc. This is assumed to be a previously-serialized
- * {@link TestStream} transform that is correct by construction.
- */
- @Internal
- public static <T> TestStream<T> fromRawEvents(Coder<T> coder, List<Event<T>> events) {
- return new TestStream<>(coder, events);
- }
-
@Override
public boolean equals(Object other) {
if (!(other instanceof TestStream)) {
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index d7effb5..9e1cc71 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.io.InputStream;
@@ -1121,7 +1122,11 @@ public class Combine {
*/
@Override
public Map<TupleTag<?>, PValue> getAdditionalInputs() {
- return PCollectionViews.toAdditionalInputs(sideInputs);
+ ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
+ for (PCollectionView<?> sideInput : sideInputs) {
+ additionalInputs.put(sideInput.getTagInternal(), sideInput.getPCollection());
+ }
+ return additionalInputs.build();
}
/**
@@ -1272,15 +1277,14 @@ public class Combine {
public PCollectionView<OutputT> expand(PCollection<InputT> input) {
PCollection<OutputT> combined =
input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
- PCollectionView<OutputT> view =
- PCollectionViews.singletonView(
- combined,
- input.getWindowingStrategy(),
- insertDefault,
- insertDefault ? fn.defaultValue() : null,
- combined.getCoder());
- combined.apply(CreatePCollectionView.<OutputT, OutputT>of(view));
- return view;
+ return combined.apply(
+ CreatePCollectionView.<OutputT, OutputT>of(
+ PCollectionViews.singletonView(
+ combined,
+ input.getWindowingStrategy(),
+ insertDefault,
+ insertDefault ? fn.defaultValue() : null,
+ combined.getCoder())));
}
public int getFanout() {
@@ -1573,7 +1577,11 @@ public class Combine {
*/
@Override
public Map<TupleTag<?>, PValue> getAdditionalInputs() {
- return PCollectionViews.toAdditionalInputs(sideInputs);
+ ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
+ for (PCollectionView<?> sideInput : sideInputs) {
+ additionalInputs.put(sideInput.getTagInternal(), sideInput.getPCollection());
+ }
+ return additionalInputs.build();
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 1b809c2..e711ac2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.transforms;
-import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
@@ -386,7 +385,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* <pre><code>{@literal new DoFn<KV<Key, Foo>, Baz>()} {
*
* {@literal @StateId("my-state-id")}
- * {@literal private final StateSpec<ValueState<MyState>>} myStateSpec =
+ * {@literal private final StateSpec<K, ValueState<MyState>>} myStateSpec =
* StateSpecs.value(new MyStateCoder());
*
* {@literal @ProcessElement}
@@ -546,15 +545,11 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* returned by {@link GetInitialRestriction} implements {@link HasDefaultTracker}.
* <li>It <i>may</i> define a {@link GetRestrictionCoder} method.
* <li>The type of restrictions used by all of these methods must be the same.
- * <li>Its {@link ProcessElement} method <i>may</i> return a {@link ProcessContinuation} to
- * indicate whether there is more work to be done for the current element.
* <li>Its {@link ProcessElement} method <i>must not</i> use any extra context parameters, such as
* {@link BoundedWindow}.
* <li>The {@link DoFn} itself <i>may</i> be annotated with {@link BoundedPerElement} or
* {@link UnboundedPerElement}, but not both at the same time. If it's not annotated with
- * either of these, it's assumed to be {@link BoundedPerElement} if its {@link
- * ProcessElement} method returns {@code void} and {@link UnboundedPerElement} if it
- * returns a {@link ProcessContinuation}.
+ * either of these, it's assumed to be {@link BoundedPerElement}.
* </ul>
*
* <p>A non-splittable {@link DoFn} <i>must not</i> define any of these methods.
@@ -682,49 +677,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
@Experimental(Kind.SPLITTABLE_DO_FN)
public @interface UnboundedPerElement {}
- // This can't be put into ProcessContinuation itself due to the following problem:
- // http://ternarysearch.blogspot.com/2013/07/static-initialization-deadlock.html
- private static final ProcessContinuation PROCESS_CONTINUATION_STOP =
- new AutoValue_DoFn_ProcessContinuation(false, Duration.ZERO);
-
- /**
- * When used as a return value of {@link ProcessElement}, indicates whether there is more work to
- * be done for the current element.
- *
- * <p>If the {@link ProcessElement} call completes because of a failed {@code tryClaim()} call
- * on the {@link RestrictionTracker}, then the call MUST return {@link #stop()}.
- */
- @Experimental(Kind.SPLITTABLE_DO_FN)
- @AutoValue
- public abstract static class ProcessContinuation {
- /** Indicates that there is no more work to be done for the current element. */
- public static ProcessContinuation stop() {
- return PROCESS_CONTINUATION_STOP;
- }
-
- /** Indicates that there is more work to be done for the current element. */
- public static ProcessContinuation resume() {
- return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO);
- }
-
- /**
- * If false, the {@link DoFn} promises that there is no more work remaining for the current
- * element, so the runner should not resume the {@link ProcessElement} call.
- */
- public abstract boolean shouldResume();
-
- /**
- * A minimum duration that should elapse between the end of this {@link ProcessElement} call and
- * the {@link ProcessElement} call continuing processing of the same element. By default, zero.
- */
- public abstract Duration resumeDelay();
-
- /** Builder method to set the value of {@link #resumeDelay()}. */
- public ProcessContinuation withResumeDelay(Duration resumeDelay) {
- return new AutoValue_DoFn_ProcessContinuation(shouldResume(), resumeDelay);
- }
- }
-
/**
* Finalize the {@link DoFn} construction to prepare for processing.
* This method should be called by runners before any processing methods.
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index b2377dd..8a03f3c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -290,11 +290,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
@Override
- public PipelineOptions pipelineOptions() {
- return getPipelineOptions();
- }
-
- @Override
public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(
DoFn<InputT, OutputT> doFn) {
throw new UnsupportedOperationException(
@@ -551,6 +546,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
fn.super();
}
+ private void throwUnsupportedOutputFromBundleMethods() {
+ throw new UnsupportedOperationException(
+ "DoFnTester doesn't support output from bundle methods");
+ }
+
@Override
public PipelineOptions getPipelineOptions() {
return options;
@@ -559,13 +559,12 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
@Override
public void output(
OutputT output, Instant timestamp, BoundedWindow window) {
- output(mainOutputTag, output, timestamp, window);
+ throwUnsupportedOutputFromBundleMethods();
}
@Override
public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
- getMutableOutput(tag)
- .add(ValueInSingleWindow.of(output, timestamp, window, PaneInfo.NO_FIRING));
+ throwUnsupportedOutputFromBundleMethods();
}
}
@@ -643,6 +642,12 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
getMutableOutput(tag)
.add(ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane()));
}
+
+ private void throwUnsupportedOutputFromBundleMethods() {
+ throw new UnsupportedOperationException(
+ "DoFnTester doesn't support output from bundle methods");
+ }
+
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 0d03835..edf1419 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
@@ -32,7 +33,6 @@ import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.transforms.DoFn.WindowedContext;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -50,7 +50,6 @@ import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
@@ -456,27 +455,6 @@ public class ParDo {
}
}
- private static void validateStateApplicableForInput(
- DoFn<?, ?> fn,
- PCollection<?> input) {
- Coder<?> inputCoder = input.getCoder();
- checkArgument(
- inputCoder instanceof KvCoder,
- "%s requires its input to use %s in order to use state and timers.",
- ParDo.class.getSimpleName(),
- KvCoder.class.getSimpleName());
-
- KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) inputCoder;
- try {
- kvCoder.getKeyCoder().verifyDeterministic();
- } catch (Coder.NonDeterministicException exc) {
- throw new IllegalArgumentException(
- String.format(
- "%s requires a deterministic key coder in order to use state and timers",
- ParDo.class.getSimpleName()));
- }
- }
-
/**
* Try to provide coders for as many of the type arguments of given
* {@link DoFnSignature.StateDeclaration} as possible.
@@ -684,7 +662,11 @@ public class ParDo {
*/
@Override
public Map<TupleTag<?>, PValue> getAdditionalInputs() {
- return PCollectionViews.toAdditionalInputs(sideInputs);
+ ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
+ for (PCollectionView<?> sideInput : sideInputs) {
+ additionalInputs.put(sideInput.getTagInternal(), sideInput.getPCollection());
+ }
+ return additionalInputs.build();
}
}
@@ -759,11 +741,6 @@ public class ParDo {
// Use coder registry to determine coders for all StateSpec defined in the fn signature.
finishSpecifyingStateSpecs(fn, input.getPipeline().getCoderRegistry(), input.getCoder());
- DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
- if (signature.usesState() || signature.usesTimers()) {
- validateStateApplicableForInput(fn, input);
- }
-
PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
input.getPipeline(),
TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),
@@ -830,7 +807,11 @@ public class ParDo {
*/
@Override
public Map<TupleTag<?>, PValue> getAdditionalInputs() {
- return PCollectionViews.toAdditionalInputs(sideInputs);
+ ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
+ for (PCollectionView<?> sideInput : sideInputs) {
+ additionalInputs.put(sideInput.getTagInternal(), sideInput.getPCollection());
+ }
+ return additionalInputs.build();
}
}