You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:46 UTC
[22/67] [partial] incubator-beam git commit: Directory reorganization
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnWithContext.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnWithContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnWithContext.java
deleted file mode 100644
index 4f131ad..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnWithContext.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn.DelegatingAggregator;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.WindowingInternals;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * The argument to {@link ParDo} providing the code to use to process
- * elements of the input
- * {@link com.google.cloud.dataflow.sdk.values.PCollection}.
- *
- * <p>See {@link ParDo} for more explanation, examples of use, and
- * discussion of constraints on {@code DoFnWithContext}s, including their
- * serializability, lack of access to global shared mutable state,
- * requirements for failure tolerance, and benefits of optimization.
- *
- * <p>{@code DoFnWithContext}s can be tested in a particular
- * {@code Pipeline} by running that {@code Pipeline} on sample input
- * and then checking its output. Unit testing of a {@code DoFnWithContext},
- * separately from any {@code ParDo} transform or {@code Pipeline},
- * can be done via the {@link DoFnTester} harness.
- *
- * <p>Implementations must define a method annotated with {@link ProcessElement}
- * that satisfies the requirements described there. See the {@link ProcessElement}
- * for details.
- *
- * <p>This functionality is experimental and likely to change.
- *
- * <p>Example usage:
- *
- * <pre> {@code
- * PCollection<String> lines = ... ;
- * PCollection<String> words =
- * lines.apply(ParDo.of(new DoFnWithContext<String, String>() {
- * @ProcessElement
- * public void processElement(ProcessContext c, BoundedWindow window) {
- *
- * }}));
- * } </pre>
- *
- * @param <InputT> the type of the (main) input elements
- * @param <OutputT> the type of the (main) output elements
- */
-@Experimental
-public abstract class DoFnWithContext<InputT, OutputT> implements Serializable {
-
- /** Information accessible to all methods in this {@code DoFnWithContext}. */
- public abstract class Context {
-
- /**
- * Returns the {@code PipelineOptions} specified with the
- * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner}
- * invoking this {@code DoFnWithContext}. The {@code PipelineOptions} will
- * be the default running via {@link DoFnTester}.
- */
- public abstract PipelineOptions getPipelineOptions();
-
- /**
- * Adds the given element to the main output {@code PCollection}.
- *
- * <p>Once passed to {@code output} the element should not be modified in
- * any way.
- *
- * <p>If invoked from {@link ProcessElement}, the output
- * element will have the same timestamp and be in the same windows
- * as the input element passed to the method annotated with
- * {@code @ProcessElement}.
- *
- * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
- * this will attempt to use the
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element. The output element
- * will have a timestamp of negative infinity.
- */
- public abstract void output(OutputT output);
-
- /**
- * Adds the given element to the main output {@code PCollection},
- * with the given timestamp.
- *
- * <p>Once passed to {@code outputWithTimestamp} the element should not be
- * modified in any way.
- *
- * <p>If invoked from {@link ProcessElement}), the timestamp
- * must not be older than the input element's timestamp minus
- * {@link DoFn#getAllowedTimestampSkew}. The output element will
- * be in the same windows as the input element.
- *
- * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
- * this will attempt to use the
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element except for the
- * timestamp.
- */
- public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
-
- /**
- * Adds the given element to the side output {@code PCollection} with the
- * given tag.
- *
- * <p>Once passed to {@code sideOutput} the element should not be modified
- * in any way.
- *
- * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags} to
- * specify the tags of side outputs that it consumes. Non-consumed side
- * outputs, e.g., outputs for monitoring purposes only, don't necessarily
- * need to be specified.
- *
- * <p>The output element will have the same timestamp and be in the same
- * windows as the input element passed to {@link ProcessElement}).
- *
- * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
- * this will attempt to use the
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element. The output element
- * will have a timestamp of negative infinity.
- *
- * @see ParDo#withOutputTags
- */
- public abstract <T> void sideOutput(TupleTag<T> tag, T output);
-
- /**
- * Adds the given element to the specified side output {@code PCollection},
- * with the given timestamp.
- *
- * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be
- * modified in any way.
- *
- * <p>If invoked from {@link ProcessElement}), the timestamp
- * must not be older than the input element's timestamp minus
- * {@link DoFn#getAllowedTimestampSkew}. The output element will
- * be in the same windows as the input element.
- *
- * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
- * this will attempt to use the
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element except for the
- * timestamp.
- *
- * @see ParDo#withOutputTags
- */
- public abstract <T> void sideOutputWithTimestamp(
- TupleTag<T> tag, T output, Instant timestamp);
- }
-
- /**
- * Information accessible when running {@link DoFn#processElement}.
- */
- public abstract class ProcessContext extends Context {
-
- /**
- * Returns the input element to be processed.
- *
- * <p>The element will not be changed -- it is safe to cache, etc.
- * without copying.
- */
- public abstract InputT element();
-
-
- /**
- * Returns the value of the side input.
- *
- * @throws IllegalArgumentException if this is not a side input
- * @see ParDo#withSideInputs
- */
- public abstract <T> T sideInput(PCollectionView<T> view);
-
- /**
- * Returns the timestamp of the input element.
- *
- * <p>See {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window}
- * for more information.
- */
- public abstract Instant timestamp();
-
- /**
- * Returns information about the pane within this window into which the
- * input element has been assigned.
- *
- * <p>Generally all data is in a single, uninteresting pane unless custom
- * triggering and/or late data has been explicitly requested.
- * See {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window}
- * for more information.
- */
- public abstract PaneInfo pane();
- }
-
- /**
- * Returns the allowed timestamp skew duration, which is the maximum
- * duration that timestamps can be shifted backward in
- * {@link DoFnWithContext.Context#outputWithTimestamp}.
- *
- * <p>The default value is {@code Duration.ZERO}, in which case
- * timestamps can only be shifted forward to future. For infinite
- * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
- */
- public Duration getAllowedTimestampSkew() {
- return Duration.ZERO;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>();
-
- /**
- * Protects aggregators from being created after initialization.
- */
- private boolean aggregatorsAreFinal;
-
- /**
- * Returns a {@link TypeDescriptor} capturing what is known statically
- * about the input type of this {@code DoFnWithContext} instance's most-derived
- * class.
- *
- * <p>See {@link #getOutputTypeDescriptor} for more discussion.
- */
- protected TypeDescriptor<InputT> getInputTypeDescriptor() {
- return new TypeDescriptor<InputT>(getClass()) {};
- }
-
- /**
- * Returns a {@link TypeDescriptor} capturing what is known statically
- * about the output type of this {@code DoFnWithContext} instance's
- * most-derived class.
- *
- * <p>In the normal case of a concrete {@code DoFnWithContext} subclass with
- * no generic type parameters of its own (including anonymous inner
- * classes), this will be a complete non-generic type, which is good
- * for choosing a default output {@code Coder<O>} for the output
- * {@code PCollection<O>}.
- */
- protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- return new TypeDescriptor<OutputT>(getClass()) {};
- }
-
- /**
- * Interface for runner implementors to provide implementations of extra context information.
- *
- * <p>The methods on this interface are called by {@link DoFnReflector} before invoking an
- * annotated {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that
- * has indicated it needs the given extra context.
- *
- * <p>In the case of {@link ProcessElement} it is called once per invocation of
- * {@link ProcessElement}.
- */
- public interface ExtraContextFactory<InputT, OutputT> {
- /**
- * Construct the {@link BoundedWindow} to use within a {@link DoFnWithContext} that
- * needs it. This is called if the {@link ProcessElement} method has a parameter of type
- * {@link BoundedWindow}.
- *
- * @return {@link BoundedWindow} of the element currently being processed.
- */
- BoundedWindow window();
-
- /**
- * Construct the {@link WindowingInternals} to use within a {@link DoFnWithContext} that
- * needs it. This is called if the {@link ProcessElement} method has a parameter of type
- * {@link WindowingInternals}.
- */
- WindowingInternals<InputT, OutputT> windowingInternals();
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Annotation for the method to use to prepare an instance for processing a batch of elements.
- * The method annotated with this must satisfy the following constraints:
- * <ul>
- * <li>It must have at least one argument.
- * <li>Its first (and only) argument must be a {@link DoFnWithContext.Context}.
- * </ul>
- */
- @Documented
- @Retention(RetentionPolicy.RUNTIME)
- @Target(ElementType.METHOD)
- public @interface StartBundle {}
-
- /**
- * Annotation for the method to use for processing elements. A subclass of
- * {@link DoFnWithContext} must have a method with this annotation satisfying
- * the following constraints in order for it to be executable:
- * <ul>
- * <li>It must have at least one argument.
- * <li>Its first argument must be a {@link DoFnWithContext.ProcessContext}.
- * <li>Its remaining arguments must be {@link BoundedWindow}, or
- * {@link WindowingInternals WindowingInternals<InputT, OutputT>}.
- * </ul>
- */
- @Documented
- @Retention(RetentionPolicy.RUNTIME)
- @Target(ElementType.METHOD)
- public @interface ProcessElement {}
-
- /**
- * Annotation for the method to use to prepare an instance for processing a batch of elements.
- * The method annotated with this must satisfy the following constraints:
- * <ul>
- * <li>It must have at least one argument.
- * <li>Its first (and only) argument must be a {@link DoFnWithContext.Context}.
- * </ul>
- */
- @Documented
- @Retention(RetentionPolicy.RUNTIME)
- @Target(ElementType.METHOD)
- public @interface FinishBundle {}
-
- /**
- * Returns an {@link Aggregator} with aggregation logic specified by the
- * {@link CombineFn} argument. The name provided must be unique across
- * {@link Aggregator}s created within the DoFn. Aggregators can only be created
- * during pipeline construction.
- *
- * @param name the name of the aggregator
- * @param combiner the {@link CombineFn} to use in the aggregator
- * @return an aggregator for the provided name and combiner in the scope of
- * this DoFn
- * @throws NullPointerException if the name or combiner is null
- * @throws IllegalArgumentException if the given name collides with another
- * aggregator in this scope
- * @throws IllegalStateException if called during pipeline execution.
- */
- public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
- createAggregator(String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
- checkNotNull(name, "name cannot be null");
- checkNotNull(combiner, "combiner cannot be null");
- checkArgument(!aggregators.containsKey(name),
- "Cannot create aggregator with name %s."
- + " An Aggregator with that name already exists within this scope.",
- name);
- checkState(!aggregatorsAreFinal,
- "Cannot create an aggregator during pipeline execution."
- + " Aggregators should be registered during pipeline construction.");
-
- DelegatingAggregator<AggInputT, AggOutputT> aggregator =
- new DelegatingAggregator<>(name, combiner);
- aggregators.put(name, aggregator);
- return aggregator;
- }
-
- /**
- * Returns an {@link Aggregator} with the aggregation logic specified by the
- * {@link SerializableFunction} argument. The name provided must be unique
- * across {@link Aggregator}s created within the DoFn. Aggregators can only be
- * created during pipeline construction.
- *
- * @param name the name of the aggregator
- * @param combiner the {@link SerializableFunction} to use in the aggregator
- * @return an aggregator for the provided name and combiner in the scope of
- * this DoFn
- * @throws NullPointerException if the name or combiner is null
- * @throws IllegalArgumentException if the given name collides with another
- * aggregator in this scope
- * @throws IllegalStateException if called during pipeline execution.
- */
- public final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(
- String name, SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
- checkNotNull(combiner, "combiner cannot be null.");
- return createAggregator(name, Combine.IterableCombineFn.of(combiner));
- }
-
- /**
- * Finalize the {@link DoFnWithContext} construction to prepare for processing.
- * This method should be called by runners before any processing methods.
- */
- void prepareForProcessing() {
- aggregatorsAreFinal = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Filter.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Filter.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Filter.java
deleted file mode 100644
index 9e123a1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Filter.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * {@code PTransform}s for filtering from a {@code PCollection} the
- * elements satisfying a predicate, or satisfying an inequality with
- * a given value based on the elements' natural ordering.
- *
- * @param <T> the type of the values in the input {@code PCollection},
- * and the type of the elements in the output {@code PCollection}
- */
-public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
-
- /**
- * Returns a {@code PTransform} that takes an input
- * {@code PCollection<T>} and returns a {@code PCollection<T>} with
- * elements that satisfy the given predicate. The predicate must be
- * a {@code SerializableFunction<T, Boolean>}.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<String> wordList = ...;
- * PCollection<String> longWords =
- * wordList.apply(Filter.byPredicate(new MatchIfWordLengthGT(6)));
- * } </pre>
- *
- * <p>See also {@link #lessThan}, {@link #lessThanEq},
- * {@link #greaterThan}, {@link #greaterThanEq}, which return elements
- * satisfying various inequalities with the specified value based on
- * the elements' natural ordering.
- */
- public static <T, PredicateT extends SerializableFunction<T, Boolean>> Filter<T>
- byPredicate(PredicateT predicate) {
- return new Filter<T>("Filter", predicate);
- }
-
- /**
- * @deprecated use {@link #byPredicate}, which returns a {@link Filter} transform instead of
- * a {@link ParDo.Bound}.
- */
- @Deprecated
- public static <T, PredicateT extends SerializableFunction<T, Boolean>> ParDo.Bound<T, T>
- by(final PredicateT filterPred) {
- return ParDo.named("Filter").of(new DoFn<T, T>() {
- @Override
- public void processElement(ProcessContext c) {
- if (filterPred.apply(c.element()) == true) {
- c.output(c.element());
- }
- }
- });
- }
-
- /**
- * Returns a {@code PTransform} that takes an input
- * {@link PCollection} and returns a {@link PCollection} with
- * elements that are less than a given value, based on the
- * elements' natural ordering. Elements must be {@code Comparable}.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<Integer> listOfNumbers = ...;
- * PCollection<Integer> smallNumbers =
- * listOfNumbers.apply(Filter.lessThan(10));
- * } </pre>
- *
- * <p>See also {@link #lessThanEq}, {@link #greaterThanEq},
- * and {@link #greaterThan}, which return elements satisfying various
- * inequalities with the specified value based on the elements'
- * natural ordering.
- *
- * <p>See also {@link #byPredicate}, which returns elements
- * that satisfy the given predicate.
- */
- public static <T extends Comparable<T>> ParDo.Bound<T, T> lessThan(final T value) {
- return ParDo.named("Filter.lessThan").of(new DoFn<T, T>() {
- @Override
- public void processElement(ProcessContext c) {
- if (c.element().compareTo(value) < 0) {
- c.output(c.element());
- }
- }
- });
- }
-
- /**
- * Returns a {@code PTransform} that takes an input
- * {@code PCollection<T>} and returns a {@code PCollection<T>} with
- * elements that are greater than a given value, based on the
- * elements' natural ordering. Elements must be {@code Comparable}.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<Integer> listOfNumbers = ...;
- * PCollection<Integer> largeNumbers =
- * listOfNumbers.apply(Filter.greaterThan(1000));
- * } </pre>
- *
- * <p>See also {@link #greaterThanEq}, {@link #lessThan},
- * and {@link #lessThanEq}, which return elements satisfying various
- * inequalities with the specified value based on the elements'
- * natural ordering.
- *
- * <p>See also {@link #byPredicate}, which returns elements
- * that satisfy the given predicate.
- */
- public static <T extends Comparable<T>> ParDo.Bound<T, T> greaterThan(final T value) {
- return ParDo.named("Filter.greaterThan").of(new DoFn<T, T>() {
- @Override
- public void processElement(ProcessContext c) {
- if (c.element().compareTo(value) > 0) {
- c.output(c.element());
- }
- }
- });
- }
-
- /**
- * Returns a {@code PTransform} that takes an input
- * {@code PCollection<T>} and returns a {@code PCollection<T>} with
- * elements that are less than or equal to a given value, based on the
- * elements' natural ordering. Elements must be {@code Comparable}.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<Integer> listOfNumbers = ...;
- * PCollection<Integer> smallOrEqualNumbers =
- * listOfNumbers.apply(Filter.lessThanEq(10));
- * } </pre>
- *
- * <p>See also {@link #lessThan}, {@link #greaterThanEq},
- * and {@link #greaterThan}, which return elements satisfying various
- * inequalities with the specified value based on the elements'
- * natural ordering.
- *
- * <p>See also {@link #byPredicate}, which returns elements
- * that satisfy the given predicate.
- */
- public static <T extends Comparable<T>> ParDo.Bound<T, T> lessThanEq(final T value) {
- return ParDo.named("Filter.lessThanEq").of(new DoFn<T, T>() {
- @Override
- public void processElement(ProcessContext c) {
- if (c.element().compareTo(value) <= 0) {
- c.output(c.element());
- }
- }
- });
- }
-
- /**
- * Returns a {@code PTransform} that takes an input
- * {@code PCollection<T>} and returns a {@code PCollection<T>} with
- * elements that are greater than or equal to a given value, based on
- * the elements' natural ordering. Elements must be {@code Comparable}.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<Integer> listOfNumbers = ...;
- * PCollection<Integer> largeOrEqualNumbers =
- * listOfNumbers.apply(Filter.greaterThanEq(1000));
- * } </pre>
- *
- * <p>See also {@link #greaterThan}, {@link #lessThan},
- * and {@link #lessThanEq}, which return elements satisfying various
- * inequalities with the specified value based on the elements'
- * natural ordering.
- *
- * <p>See also {@link #byPredicate}, which returns elements
- * that satisfy the given predicate.
- */
- public static <T extends Comparable<T>> ParDo.Bound<T, T> greaterThanEq(final T value) {
- return ParDo.named("Filter.greaterThanEq").of(new DoFn<T, T>() {
- @Override
- public void processElement(ProcessContext c) {
- if (c.element().compareTo(value) >= 0) {
- c.output(c.element());
- }
- }
- });
- }
-
- ///////////////////////////////////////////////////////////////////////////////
-
- private SerializableFunction<T, Boolean> predicate;
-
- private Filter(SerializableFunction<T, Boolean> predicate) {
- this.predicate = predicate;
- }
-
- private Filter(String name, SerializableFunction<T, Boolean> predicate) {
- super(name);
- this.predicate = predicate;
- }
-
- public Filter<T> named(String name) {
- return new Filter<>(name, predicate);
- }
-
- @Override
- public PCollection<T> apply(PCollection<T> input) {
- PCollection<T> output = input.apply(ParDo.named("Filter").of(new DoFn<T, T>() {
- @Override
- public void processElement(ProcessContext c) {
- if (predicate.apply(c.element()) == true) {
- c.output(c.element());
- }
- }
- }));
- return output;
- }
-
- @Override
- protected Coder<T> getDefaultOutputCoder(PCollection<T> input) {
- return input.getCoder();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/FlatMapElements.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/FlatMapElements.java
deleted file mode 100644
index fbaad5b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/FlatMapElements.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-import java.lang.reflect.ParameterizedType;
-
-/**
- * {@code PTransform}s for mapping a simple function that returns iterables over the elements of a
- * {@link PCollection} and merging the results.
- */
-public class FlatMapElements<InputT, OutputT>
-extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
- /**
- * For a {@code SerializableFunction<InputT, ? extends Iterable<OutputT>>} {@code fn},
- * returns a {@link PTransform} that applies {@code fn} to every element of the input
- * {@code PCollection<InputT>} and outputs all of the elements to the output
- * {@code PCollection<OutputT>}.
- *
- * <p>Example of use in Java 8:
- * <pre>{@code
- * PCollection<String> words = lines.apply(
- * FlatMapElements.via((String line) -> Arrays.asList(line.split(" ")))
- * .withOutputType(new TypeDescriptor<String>(){});
- * }</pre>
- *
- * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type
- * descriptor need not be provided.
- */
- public static <InputT, OutputT> MissingOutputTypeDescriptor<InputT, OutputT>
- via(SerializableFunction<InputT, ? extends Iterable<OutputT>> fn) {
- return new MissingOutputTypeDescriptor<>(fn);
- }
-
- /**
- * For a {@code SimpleFunction<InputT, ? extends Iterable<OutputT>>} {@code fn},
- * return a {@link PTransform} that applies {@code fn} to every element of the input
- * {@code PCollection<InputT>} and outputs all of the elements to the output
- * {@code PCollection<OutputT>}.
- *
- * <p>This overload is intended primarily for use in Java 7. In Java 8, the overload
- * {@link #via(SerializableFunction)} supports use of lambda for greater concision.
- *
- * <p>Example of use in Java 7:
- * <pre>{@code
- * PCollection<String> lines = ...;
- * PCollection<String> words = lines.apply(FlatMapElements.via(
- * new SimpleFunction<String, List<String>>() {
- * public Integer apply(String line) {
- * return Arrays.asList(line.split(" "));
- * }
- * });
- * }</pre>
- *
- * <p>To use a Java 8 lambda, see {@link #via(SerializableFunction)}.
- */
- public static <InputT, OutputT> FlatMapElements<InputT, OutputT>
- via(SimpleFunction<InputT, ? extends Iterable<OutputT>> fn) {
-
- @SuppressWarnings({"rawtypes", "unchecked"}) // safe by static typing
- TypeDescriptor<Iterable<?>> iterableType = (TypeDescriptor) fn.getOutputTypeDescriptor();
-
- @SuppressWarnings("unchecked") // safe by correctness of getIterableElementType
- TypeDescriptor<OutputT> outputType =
- (TypeDescriptor<OutputT>) getIterableElementType(iterableType);
-
- return new FlatMapElements<>(fn, outputType);
- }
-
- /**
- * An intermediate builder for a {@link FlatMapElements} transform. To complete the transform,
- * provide an output type descriptor to {@link MissingOutputTypeDescriptor#withOutputType}. See
- * {@link #via(SerializableFunction)} for a full example of use.
- */
- public static final class MissingOutputTypeDescriptor<InputT, OutputT> {
-
- private final SerializableFunction<InputT, ? extends Iterable<OutputT>> fn;
-
- private MissingOutputTypeDescriptor(
- SerializableFunction<InputT, ? extends Iterable<OutputT>> fn) {
- this.fn = fn;
- }
-
- public FlatMapElements<InputT, OutputT> withOutputType(TypeDescriptor<OutputT> outputType) {
- return new FlatMapElements<>(fn, outputType);
- }
- }
-
- private static TypeDescriptor<?> getIterableElementType(
- TypeDescriptor<Iterable<?>> iterableTypeDescriptor) {
-
- // If a rawtype was used, the type token may be for Object, not a subtype of Iterable.
- // In this case, we rely on static typing of the function elsewhere to ensure it is
- // at least some kind of iterable, and grossly overapproximate the element type to be Object.
- if (!iterableTypeDescriptor.isSubtypeOf(new TypeDescriptor<Iterable<?>>() {})) {
- return new TypeDescriptor<Object>() {};
- }
-
- // Otherwise we can do the proper thing and get the actual type parameter.
- ParameterizedType iterableType =
- (ParameterizedType) iterableTypeDescriptor.getSupertype(Iterable.class).getType();
- return TypeDescriptor.of(iterableType.getActualTypeArguments()[0]);
- }
-
- //////////////////////////////////////////////////////////////////////////////////////////////////
-
- private final SerializableFunction<InputT, ? extends Iterable<OutputT>> fn;
- private final transient TypeDescriptor<OutputT> outputType;
-
- private FlatMapElements(
- SerializableFunction<InputT, ? extends Iterable<OutputT>> fn,
- TypeDescriptor<OutputT> outputType) {
- this.fn = fn;
- this.outputType = outputType;
- }
-
- @Override
- public PCollection<OutputT> apply(PCollection<InputT> input) {
- return input.apply(ParDo.named("Map").of(new DoFn<InputT, OutputT>() {
- private static final long serialVersionUID = 0L;
- @Override
- public void processElement(ProcessContext c) {
- for (OutputT element : fn.apply(c.element())) {
- c.output(element);
- }
- }
- })).setTypeDescriptorInternal(outputType);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Flatten.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Flatten.java
deleted file mode 100644
index de6add0..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Flatten.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.IterableLikeCoder;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * {@code Flatten<T>} takes multiple {@code PCollection<T>}s bundled
- * into a {@code PCollectionList<T>} and returns a single
- * {@code PCollection<T>} containing all the elements in all the input
- * {@code PCollection}s. The name "Flatten" suggests taking a list of
- * lists and flattening them into a single list.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<String> pc1 = ...;
- * PCollection<String> pc2 = ...;
- * PCollection<String> pc3 = ...;
- * PCollectionList<String> pcs = PCollectionList.of(pc1).and(pc2).and(pc3);
- * PCollection<String> merged = pcs.apply(Flatten.<String>pCollections());
- * } </pre>
- *
- * <p>By default, the {@code Coder} of the output {@code PCollection}
- * is the same as the {@code Coder} of the first {@code PCollection}
- * in the input {@code PCollectionList} (if the
- * {@code PCollectionList} is non-empty).
- *
- */
-public class Flatten {
-
- /**
- * Returns a {@link PTransform} that flattens a {@link PCollectionList}
- * into a {@link PCollection} containing all the elements of all
- * the {@link PCollection}s in its input.
- *
- * <p>All inputs must have equal {@link WindowFn}s.
- * The output elements of {@code Flatten<T>} are in the same windows and
- * have the same timestamps as their corresponding input elements. The output
- * {@code PCollection} will have the same
- * {@link WindowFn} as all of the inputs.
- *
- * @param <T> the type of the elements in the input and output
- * {@code PCollection}s.
- */
- public static <T> FlattenPCollectionList<T> pCollections() {
- return new FlattenPCollectionList<>();
- }
-
- /**
- * Returns a {@code PTransform} that takes a {@code PCollection<Iterable<T>>}
- * and returns a {@code PCollection<T>} containing all the elements from
- * all the {@code Iterable}s.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<Iterable<Integer>> pcOfIterables = ...;
- * PCollection<Integer> pc = pcOfIterables.apply(Flatten.<Integer>iterables());
- * } </pre>
- *
- * <p>By default, the output {@code PCollection} encodes its elements
- * using the same {@code Coder} that the input uses for
- * the elements in its {@code Iterable}.
- *
- * @param <T> the type of the elements of the input {@code Iterable} and
- * the output {@code PCollection}
- */
- public static <T> FlattenIterables<T> iterables() {
- return new FlattenIterables<>();
- }
-
- /**
- * A {@link PTransform} that flattens a {@link PCollectionList}
- * into a {@link PCollection} containing all the elements of all
- * the {@link PCollection}s in its input.
- * Implements {@link #pCollections}.
- *
- * @param <T> the type of the elements in the input and output
- * {@code PCollection}s.
- */
- public static class FlattenPCollectionList<T>
- extends PTransform<PCollectionList<T>, PCollection<T>> {
-
- private FlattenPCollectionList() { }
-
- @Override
- public PCollection<T> apply(PCollectionList<T> inputs) {
- WindowingStrategy<?, ?> windowingStrategy;
- IsBounded isBounded = IsBounded.BOUNDED;
- if (!inputs.getAll().isEmpty()) {
- windowingStrategy = inputs.get(0).getWindowingStrategy();
- for (PCollection<?> input : inputs.getAll()) {
- WindowingStrategy<?, ?> other = input.getWindowingStrategy();
- if (!windowingStrategy.getWindowFn().isCompatible(other.getWindowFn())) {
- throw new IllegalStateException(
- "Inputs to Flatten had incompatible window windowFns: "
- + windowingStrategy.getWindowFn() + ", " + other.getWindowFn());
- }
-
- if (!windowingStrategy.getTrigger().getSpec()
- .isCompatible(other.getTrigger().getSpec())) {
- throw new IllegalStateException(
- "Inputs to Flatten had incompatible triggers: "
- + windowingStrategy.getTrigger() + ", " + other.getTrigger());
- }
- isBounded = isBounded.and(input.isBounded());
- }
- } else {
- windowingStrategy = WindowingStrategy.globalDefault();
- }
-
- return PCollection.<T>createPrimitiveOutputInternal(
- inputs.getPipeline(),
- windowingStrategy,
- isBounded);
- }
-
- @Override
- protected Coder<?> getDefaultOutputCoder(PCollectionList<T> input)
- throws CannotProvideCoderException {
-
- // Take coder from first collection
- for (PCollection<T> pCollection : input.getAll()) {
- return pCollection.getCoder();
- }
-
- // No inputs
- throw new CannotProvideCoderException(
- this.getClass().getSimpleName() + " cannot provide a Coder for"
- + " empty " + PCollectionList.class.getSimpleName());
- }
- }
-
- /**
- * {@code FlattenIterables<T>} takes a {@code PCollection<Iterable<T>>} and returns a
- * {@code PCollection<T>} that contains all the elements from each iterable.
- * Implements {@link #iterables}.
- *
- * @param <T> the type of the elements of the input {@code Iterable}s and
- * the output {@code PCollection}
- */
- public static class FlattenIterables<T>
- extends PTransform<PCollection<? extends Iterable<T>>, PCollection<T>> {
-
- @Override
- public PCollection<T> apply(PCollection<? extends Iterable<T>> in) {
- Coder<? extends Iterable<T>> inCoder = in.getCoder();
- if (!(inCoder instanceof IterableLikeCoder)) {
- throw new IllegalArgumentException(
- "expecting the input Coder<Iterable> to be an IterableLikeCoder");
- }
- @SuppressWarnings("unchecked")
- Coder<T> elemCoder = ((IterableLikeCoder<T, ?>) inCoder).getElemCoder();
-
- return in.apply(ParDo.named("FlattenIterables").of(
- new DoFn<Iterable<T>, T>() {
- @Override
- public void processElement(ProcessContext c) {
- for (T i : c.element()) {
- c.output(i);
- }
- }
- }))
- .setCoder(elemCoder);
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- static {
- DirectPipelineRunner.registerDefaultTransformEvaluator(
- FlattenPCollectionList.class,
- new DirectPipelineRunner.TransformEvaluator<FlattenPCollectionList>() {
- @Override
- public void evaluate(
- FlattenPCollectionList transform,
- DirectPipelineRunner.EvaluationContext context) {
- evaluateHelper(transform, context);
- }
- });
- }
-
- private static <T> void evaluateHelper(
- FlattenPCollectionList<T> transform,
- DirectPipelineRunner.EvaluationContext context) {
- List<DirectPipelineRunner.ValueWithMetadata<T>> outputElems = new ArrayList<>();
- PCollectionList<T> inputs = context.getInput(transform);
-
- for (PCollection<T> input : inputs.getAll()) {
- outputElems.addAll(context.getPCollectionValuesWithMetadata(input));
- }
-
- context.setPCollectionValuesWithMetadata(context.getOutput(transform), outputElems);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
deleted file mode 100644
index 8fde3e0..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
+++ /dev/null
@@ -1,575 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.ValueWithMetadata;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn;
-import com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn;
-import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder;
-import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * {@code GroupByKey<K, V>} takes a {@code PCollection<KV<K, V>>},
- * groups the values by key and windows, and returns a
- * {@code PCollection<KV<K, Iterable<V>>>} representing a map from
- * each distinct key and window of the input {@code PCollection} to an
- * {@code Iterable} over all the values associated with that key in
- * the input per window. Absent repeatedly-firing
- * {@link Window#triggering triggering}, each key in the output
- * {@code PCollection} is unique within each window.
- *
- * <p>{@code GroupByKey} is analogous to converting a multi-map into
- * a uni-map, and related to {@code GROUP BY} in SQL. It corresponds
- * to the "shuffle" step between the Mapper and the Reducer in the
- * MapReduce framework.
- *
- * <p>Two keys of type {@code K} are compared for equality
- * <b>not</b> by regular Java {@link Object#equals}, but instead by
- * first encoding each of the keys using the {@code Coder} of the
- * keys of the input {@code PCollection}, and then comparing the
- * encoded bytes. This admits efficient parallel evaluation. Note that
- * this requires that the {@code Coder} of the keys be deterministic (see
- * {@link Coder#verifyDeterministic()}). If the key {@code Coder} is not
- * deterministic, an exception is thrown at pipeline construction time.
- *
- * <p>By default, the {@code Coder} of the keys of the output
- * {@code PCollection} is the same as that of the keys of the input,
- * and the {@code Coder} of the elements of the {@code Iterable}
- * values of the output {@code PCollection} is the same as the
- * {@code Coder} of the values of the input.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<KV<String, Doc>> urlDocPairs = ...;
- * PCollection<KV<String, Iterable<Doc>>> urlToDocs =
- * urlDocPairs.apply(GroupByKey.<String, Doc>create());
- * PCollection<R> results =
- * urlToDocs.apply(ParDo.of(new DoFn<KV<String, Iterable<Doc>>, R>() {
- * public void processElement(ProcessContext c) {
- * String url = c.element().getKey();
- * Iterable<Doc> docsWithThatUrl = c.element().getValue();
- * ... process all docs having that url ...
- * }}));
- * } </pre>
- *
- * <p>{@code GroupByKey} is a key primitive in data-parallel
- * processing, since it is the main way to efficiently bring
- * associated data together into one location. It is also a key
- * determiner of the performance of a data-parallel pipeline.
- *
- * <p>See {@link com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey}
- * for a way to group multiple input PCollections by a common key at once.
- *
- * <p>See {@link Combine.PerKey} for a common pattern of
- * {@code GroupByKey} followed by {@link Combine.GroupedValues}.
- *
- * <p>When grouping, windows that can be merged according to the {@link WindowFn}
- * of the input {@code PCollection} will be merged together, and a window pane
- * corresponding to the new, merged window will be created. The items in this pane
- * will be emitted when a trigger fires. By default this will be when the input
- * sources estimate there will be no more data for the window. See
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark}
- * for details on the estimation.
- *
- * <p>The timestamp for each emitted pane is determined by the
- * {@link Window.Bound#withOutputTimeFn windowing operation}.
- * The output {@code PCollection} will have the same {@link WindowFn}
- * as the input.
- *
- * <p>If the input {@code PCollection} contains late data (see
- * {@link com.google.cloud.dataflow.sdk.io.PubsubIO.Read.Bound#timestampLabel}
- * for an example of how this can occur) or the
- * {@link Window#triggering requested TriggerFn} can fire before
- * the watermark, then there may be multiple elements
- * output by a {@code GroupByKey} that correspond to the same key and window.
- *
- * <p>If the {@link WindowFn} of the input requires merging, it is not
- * valid to apply another {@code GroupByKey} without first applying a new
- * {@link WindowFn} or applying {@link Window#remerge()}.
- *
- * @param <K> the type of the keys of the input and output
- * {@code PCollection}s
- * @param <V> the type of the values of the input {@code PCollection}
- * and the elements of the {@code Iterable}s in the output
- * {@code PCollection}
- */
-public class GroupByKey<K, V>
- extends PTransform<PCollection<KV<K, V>>,
- PCollection<KV<K, Iterable<V>>>> {
-
- private final boolean fewKeys;
-
- private GroupByKey(boolean fewKeys) {
- this.fewKeys = fewKeys;
- }
-
- /**
- * Returns a {@code GroupByKey<K, V>} {@code PTransform}.
- *
- * @param <K> the type of the keys of the input and output
- * {@code PCollection}s
- * @param <V> the type of the values of the input {@code PCollection}
- * and the elements of the {@code Iterable}s in the output
- * {@code PCollection}
- */
- public static <K, V> GroupByKey<K, V> create() {
- return new GroupByKey<>(false);
- }
-
- /**
- * Returns a {@code GroupByKey<K, V>} {@code PTransform}.
- *
- * @param <K> the type of the keys of the input and output
- * {@code PCollection}s
- * @param <V> the type of the values of the input {@code PCollection}
- * and the elements of the {@code Iterable}s in the output
- * {@code PCollection}
- * @param fewKeys whether it groups just few keys.
- */
- static <K, V> GroupByKey<K, V> create(boolean fewKeys) {
- return new GroupByKey<>(fewKeys);
- }
-
- /**
- * Returns whether it groups just few keys.
- */
- public boolean fewKeys() {
- return fewKeys;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- public static void applicableTo(PCollection<?> input) {
- WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
- // Verify that the input PCollection is bounded, or that there is windowing/triggering being
- // used. Without this, the watermark (at end of global window) will never be reached.
- if (windowingStrategy.getWindowFn() instanceof GlobalWindows
- && windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger
- && input.isBounded() != IsBounded.BOUNDED) {
- throw new IllegalStateException("GroupByKey cannot be applied to non-bounded PCollection in "
- + "the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform "
- + "prior to GroupByKey.");
- }
-
- // Validate the window merge function.
- if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
- String cause = ((InvalidWindows<?>) windowingStrategy.getWindowFn()).getCause();
- throw new IllegalStateException(
- "GroupByKey must have a valid Window merge function. "
- + "Invalid because: " + cause);
- }
- }
-
- @Override
- public void validate(PCollection<KV<K, V>> input) {
- applicableTo(input);
-
- // Verify that the input Coder<KV<K, V>> is a KvCoder<K, V>, and that
- // the key coder is deterministic.
- Coder<K> keyCoder = getKeyCoder(input.getCoder());
- try {
- keyCoder.verifyDeterministic();
- } catch (NonDeterministicException e) {
- throw new IllegalStateException(
- "the keyCoder of a GroupByKey must be deterministic", e);
- }
- }
-
- public WindowingStrategy<?, ?> updateWindowingStrategy(WindowingStrategy<?, ?> inputStrategy) {
- WindowFn<?, ?> inputWindowFn = inputStrategy.getWindowFn();
- if (!inputWindowFn.isNonMerging()) {
- // Prevent merging windows again, without explicit user
- // involvement, e.g., by Window.into() or Window.remerge().
- inputWindowFn = new InvalidWindows<>(
- "WindowFn has already been consumed by previous GroupByKey", inputWindowFn);
- }
-
- // We also switch to the continuation trigger associated with the current trigger.
- return inputStrategy
- .withWindowFn(inputWindowFn)
- .withTrigger(inputStrategy.getTrigger().getSpec().getContinuationTrigger());
- }
-
- @Override
- public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
- // This operation groups by the combination of key and window,
- // merging windows as needed, using the windows assigned to the
- // key/value input elements and the window merge operation of the
- // window function associated with the input PCollection.
- WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-
- // By default, implement GroupByKey[AndWindow] via a series of lower-level
- // operations.
- return input
- // Make each input element's timestamp and assigned windows
- // explicit, in the value part.
- .apply(new ReifyTimestampsAndWindows<K, V>())
-
- // Group by just the key.
- // Combiner lifting will not happen regardless of the disallowCombinerLifting value.
- // There will be no combiners right after the GroupByKeyOnly because of the two ParDos
- // introduced in here.
- .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
-
- // Sort each key's values by timestamp. GroupAlsoByWindow requires
- // its input to be sorted by timestamp.
- .apply(new SortValuesByTimestamp<K, V>())
-
- // Group each key's values by window, merging windows as needed.
- .apply(new GroupAlsoByWindow<K, V>(windowingStrategy))
-
- // And update the windowing strategy as appropriate.
- .setWindowingStrategyInternal(updateWindowingStrategy(windowingStrategy));
- }
-
- @Override
- protected Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
- return getOutputKvCoder(input.getCoder());
- }
-
- /**
- * Returns the {@code Coder} of the input to this transform, which
- * should be a {@code KvCoder}.
- */
- @SuppressWarnings("unchecked")
- static <K, V> KvCoder<K, V> getInputKvCoder(Coder<KV<K, V>> inputCoder) {
- if (!(inputCoder instanceof KvCoder)) {
- throw new IllegalStateException(
- "GroupByKey requires its input to use KvCoder");
- }
- return (KvCoder<K, V>) inputCoder;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Returns the {@code Coder} of the keys of the input to this
- * transform, which is also used as the {@code Coder} of the keys of
- * the output of this transform.
- */
- static <K, V> Coder<K> getKeyCoder(Coder<KV<K, V>> inputCoder) {
- return getInputKvCoder(inputCoder).getKeyCoder();
- }
-
- /**
- * Returns the {@code Coder} of the values of the input to this transform.
- */
- public static <K, V> Coder<V> getInputValueCoder(Coder<KV<K, V>> inputCoder) {
- return getInputKvCoder(inputCoder).getValueCoder();
- }
-
- /**
- * Returns the {@code Coder} of the {@code Iterable} values of the
- * output of this transform.
- */
- static <K, V> Coder<Iterable<V>> getOutputValueCoder(Coder<KV<K, V>> inputCoder) {
- return IterableCoder.of(getInputValueCoder(inputCoder));
- }
-
- /**
- * Returns the {@code Coder} of the output of this transform.
- */
- static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, V>> inputCoder) {
- return KvCoder.of(getKeyCoder(inputCoder), getOutputValueCoder(inputCoder));
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Helper transform that makes timestamps and window assignments
- * explicit in the value part of each key/value pair.
- */
- public static class ReifyTimestampsAndWindows<K, V>
- extends PTransform<PCollection<KV<K, V>>,
- PCollection<KV<K, WindowedValue<V>>>> {
- @Override
- public PCollection<KV<K, WindowedValue<V>>> apply(
- PCollection<KV<K, V>> input) {
- @SuppressWarnings("unchecked")
- KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder();
- Coder<K> keyCoder = inputKvCoder.getKeyCoder();
- Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
- Coder<WindowedValue<V>> outputValueCoder = FullWindowedValueCoder.of(
- inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
- Coder<KV<K, WindowedValue<V>>> outputKvCoder =
- KvCoder.of(keyCoder, outputValueCoder);
- return input.apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>()))
- .setCoder(outputKvCoder);
- }
- }
-
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Helper transform that sorts the values associated with each key
- * by timestamp.
- */
- public static class SortValuesByTimestamp<K, V>
- extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>,
- PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
- @Override
- public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
- PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
- return input.apply(ParDo.of(
- new DoFn<KV<K, Iterable<WindowedValue<V>>>,
- KV<K, Iterable<WindowedValue<V>>>>() {
- @Override
- public void processElement(ProcessContext c) {
- KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
- K key = kvs.getKey();
- Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
- List<WindowedValue<V>> sortedValues = new ArrayList<>();
- for (WindowedValue<V> value : unsortedValues) {
- sortedValues.add(value);
- }
- Collections.sort(sortedValues,
- new Comparator<WindowedValue<V>>() {
- @Override
- public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
- return e1.getTimestamp().compareTo(e2.getTimestamp());
- }
- });
- c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues));
- }}))
- .setCoder(input.getCoder());
- }
- }
-
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Helper transform that takes a collection of timestamp-ordered
- * values associated with each key, groups the values by window,
- * combines windows as needed, and for each window in each key,
- * outputs a collection of key/value-list pairs implicitly assigned
- * to the window and with the timestamp derived from that window.
- */
- public static class GroupAlsoByWindow<K, V>
- extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>,
- PCollection<KV<K, Iterable<V>>>> {
- private final WindowingStrategy<?, ?> windowingStrategy;
-
- public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
- this.windowingStrategy = windowingStrategy;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public PCollection<KV<K, Iterable<V>>> apply(
- PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
- @SuppressWarnings("unchecked")
- KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
- (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
-
- Coder<K> keyCoder = inputKvCoder.getKeyCoder();
- Coder<Iterable<WindowedValue<V>>> inputValueCoder =
- inputKvCoder.getValueCoder();
-
- IterableCoder<WindowedValue<V>> inputIterableValueCoder =
- (IterableCoder<WindowedValue<V>>) inputValueCoder;
- Coder<WindowedValue<V>> inputIterableElementCoder =
- inputIterableValueCoder.getElemCoder();
- WindowedValueCoder<V> inputIterableWindowedValueCoder =
- (WindowedValueCoder<V>) inputIterableElementCoder;
-
- Coder<V> inputIterableElementValueCoder =
- inputIterableWindowedValueCoder.getValueCoder();
- Coder<Iterable<V>> outputValueCoder =
- IterableCoder.of(inputIterableElementValueCoder);
- Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
-
- return input
- .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder)))
- .setCoder(outputKvCoder);
- }
-
- private <W extends BoundedWindow> GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>
- groupAlsoByWindowsFn(
- WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) {
- return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
- strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
- }
- }
-
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Primitive helper transform that groups by key only, ignoring any
- * window assignments.
- */
- public static class GroupByKeyOnly<K, V>
- extends PTransform<PCollection<KV<K, V>>,
- PCollection<KV<K, Iterable<V>>>> {
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- @Override
- public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
- return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
- }
-
- /**
- * Returns the {@code Coder} of the input to this transform, which
- * should be a {@code KvCoder}.
- */
- @SuppressWarnings("unchecked")
- KvCoder<K, V> getInputKvCoder(Coder<KV<K, V>> inputCoder) {
- if (!(inputCoder instanceof KvCoder)) {
- throw new IllegalStateException(
- "GroupByKey requires its input to use KvCoder");
- }
- return (KvCoder<K, V>) inputCoder;
- }
-
- @Override
- protected Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
- return GroupByKey.getOutputKvCoder(input.getCoder());
- }
- }
-
-
- /////////////////////////////////////////////////////////////////////////////
-
- static {
- registerWithDirectPipelineRunner();
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private static <K, V> void registerWithDirectPipelineRunner() {
- DirectPipelineRunner.registerDefaultTransformEvaluator(
- GroupByKeyOnly.class,
- new DirectPipelineRunner.TransformEvaluator<GroupByKeyOnly>() {
- @Override
- public void evaluate(
- GroupByKeyOnly transform,
- DirectPipelineRunner.EvaluationContext context) {
- evaluateHelper(transform, context);
- }
- });
- }
-
- private static <K, V> void evaluateHelper(
- GroupByKeyOnly<K, V> transform,
- DirectPipelineRunner.EvaluationContext context) {
- PCollection<KV<K, V>> input = context.getInput(transform);
-
- List<ValueWithMetadata<KV<K, V>>> inputElems =
- context.getPCollectionValuesWithMetadata(input);
-
- Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder());
-
- Map<GroupingKey<K>, List<V>> groupingMap = new HashMap<>();
-
- for (ValueWithMetadata<KV<K, V>> elem : inputElems) {
- K key = elem.getValue().getKey();
- V value = elem.getValue().getValue();
- byte[] encodedKey;
- try {
- encodedKey = encodeToByteArray(keyCoder, key);
- } catch (CoderException exn) {
- // TODO: Put in better element printing:
- // truncate if too long.
- throw new IllegalArgumentException(
- "unable to encode key " + key + " of input to " + transform +
- " using " + keyCoder,
- exn);
- }
- GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
- List<V> values = groupingMap.get(groupingKey);
- if (values == null) {
- values = new ArrayList<V>();
- groupingMap.put(groupingKey, values);
- }
- values.add(value);
- }
-
- List<ValueWithMetadata<KV<K, Iterable<V>>>> outputElems =
- new ArrayList<>();
- for (Map.Entry<GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) {
- GroupingKey<K> groupingKey = entry.getKey();
- K key = groupingKey.getKey();
- List<V> values = entry.getValue();
- values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */);
- outputElems.add(ValueWithMetadata
- .of(WindowedValue.valueInEmptyWindows(KV.<K, Iterable<V>>of(key, values)))
- .withKey(key));
- }
-
- context.setPCollectionValuesWithMetadata(context.getOutput(transform),
- outputElems);
- }
-
- private static class GroupingKey<K> {
- private K key;
- private byte[] encodedKey;
-
- public GroupingKey(K key, byte[] encodedKey) {
- this.key = key;
- this.encodedKey = encodedKey;
- }
-
- public K getKey() {
- return key;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof GroupingKey) {
- GroupingKey<?> that = (GroupingKey<?>) o;
- return Arrays.equals(this.encodedKey, that.encodedKey);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(encodedKey);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/IntraBundleParallelization.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/IntraBundleParallelization.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/IntraBundleParallelization.java
deleted file mode 100644
index b6497b7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/IntraBundleParallelization.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.options.GcsOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.WindowingInternals;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-
-import org.joda.time.Instant;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Provides multi-threading of {@link DoFn}s, using threaded execution to
- * process multiple elements concurrently within a bundle.
- *
- * <p>Note, that each Dataflow worker will already process multiple bundles
- * concurrently and usage of this class is meant only for cases where processing
- * elements from within a bundle is limited by blocking calls.
- *
- * <p>CPU intensive or IO intensive tasks are in general a poor fit for parallelization.
- * This is because a limited resource that is already maximally utilized does not
- * benefit from sub-division of work. The parallelization will increase the amount of time
- * to process each element yet the throughput for processing will remain relatively the same.
- * For example, if the local disk (an IO resource) has a maximum write rate of 10 MiB/s,
- * and processing each element requires to write 20 MiBs to disk, then processing one element
- * to disk will take 2 seconds. Yet processing 3 elements concurrently (each getting an equal
- * share of the maximum write rate) will take at least 6 seconds to complete (there is additional
- * overhead in the extra parallelization).
- *
- * <p>To parallelize a {@link DoFn} to 10 threads:
- * <pre>{@code
- * PCollection<T> data = ...;
- * data.apply(
- * IntraBundleParallelization.of(new MyDoFn())
- * .withMaxParallelism(10)));
- * }</pre>
- *
- * <p>An uncaught exception from the wrapped {@link DoFn} will result in the exception
- * being rethrown in later calls to {@link MultiThreadedIntraBundleProcessingDoFn#processElement}
- * or a call to {@link MultiThreadedIntraBundleProcessingDoFn#finishBundle}.
- */
-public class IntraBundleParallelization {
- /**
- * Creates a {@link IntraBundleParallelization} {@link PTransform} for the given
- * {@link DoFn} that processes elements using multiple threads.
- *
- * <p>Note that the specified {@code doFn} needs to be thread safe.
- */
- public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> doFn) {
- return new Unbound().of(doFn);
- }
-
- /**
- * Creates a {@link IntraBundleParallelization} {@link PTransform} with the specified
- * maximum concurrency level.
- */
- public static Unbound withMaxParallelism(int maxParallelism) {
- return new Unbound().withMaxParallelism(maxParallelism);
- }
-
- /**
- * An incomplete {@code IntraBundleParallelization} transform, with unbound input/output types.
- *
- * <p>Before being applied, {@link IntraBundleParallelization.Unbound#of} must be
- * invoked to specify the {@link DoFn} to invoke, which will also
- * bind the input/output types of this {@code PTransform}.
- */
- public static class Unbound {
- private final int maxParallelism;
-
- Unbound() {
- this(DEFAULT_MAX_PARALLELISM);
- }
-
- Unbound(int maxParallelism) {
- Preconditions.checkArgument(maxParallelism > 0,
- "Expected parallelism factor greater than zero, received %s.", maxParallelism);
- this.maxParallelism = maxParallelism;
- }
-
- /**
- * Returns a new {@link IntraBundleParallelization} {@link PTransform} like this one
- * with the specified maximum concurrency level.
- */
- public Unbound withMaxParallelism(int maxParallelism) {
- return new Unbound(maxParallelism);
- }
-
- /**
- * Returns a new {@link IntraBundleParallelization} {@link PTransform} like this one
- * with the specified {@link DoFn}.
- *
- * <p>Note that the specified {@code doFn} needs to be thread safe.
- */
- public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> doFn) {
- return new Bound<>(doFn, maxParallelism);
- }
- }
-
- /**
- * A {@code PTransform} that, when applied to a {@code PCollection<InputT>},
- * invokes a user-specified {@code DoFn<InputT, OutputT>} on all its elements,
- * with all its outputs collected into an output
- * {@code PCollection<OutputT>}.
- *
- * <p>Note that the specified {@code doFn} needs to be thread safe.
- *
- * @param <InputT> the type of the (main) input {@code PCollection} elements
- * @param <OutputT> the type of the (main) output {@code PCollection} elements
- */
- public static class Bound<InputT, OutputT>
- extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
- private final DoFn<InputT, OutputT> doFn;
- private final int maxParallelism;
-
- Bound(DoFn<InputT, OutputT> doFn, int maxParallelism) {
- Preconditions.checkArgument(maxParallelism > 0,
- "Expected parallelism factor greater than zero, received %s.", maxParallelism);
- this.doFn = doFn;
- this.maxParallelism = maxParallelism;
- }
-
- /**
- * Returns a new {@link IntraBundleParallelization} {@link PTransform} like this one
- * with the specified maximum concurrency level.
- */
- public Bound<InputT, OutputT> withMaxParallelism(int maxParallelism) {
- return new Bound<>(doFn, maxParallelism);
- }
-
- /**
- * Returns a new {@link IntraBundleParallelization} {@link PTransform} like this one
- * with the specified {@link DoFn}.
- *
- * <p>Note that the specified {@code doFn} needs to be thread safe.
- */
- public <NewInputT, NewOutputT> Bound<NewInputT, NewOutputT>
- of(DoFn<NewInputT, NewOutputT> doFn) {
- return new Bound<>(doFn, maxParallelism);
- }
-
- @Override
- public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
- return input.apply(
- ParDo.of(new MultiThreadedIntraBundleProcessingDoFn<>(doFn, maxParallelism)));
- }
- }
-
- /**
- * A multi-threaded {@code DoFn} wrapper.
- *
- * @see IntraBundleParallelization#of(DoFn)
- *
- * @param <InputT> the type of the (main) input elements
- * @param <OutputT> the type of the (main) output elements
- */
- public static class MultiThreadedIntraBundleProcessingDoFn<InputT, OutputT>
- extends DoFn<InputT, OutputT> {
-
- public MultiThreadedIntraBundleProcessingDoFn(DoFn<InputT, OutputT> doFn, int maxParallelism) {
- Preconditions.checkArgument(maxParallelism > 0,
- "Expected parallelism factor greater than zero, received %s.", maxParallelism);
- this.doFn = doFn;
- this.maxParallelism = maxParallelism;
- }
-
- @Override
- public void startBundle(Context c) throws Exception {
- doFn.startBundle(c);
-
- executor = c.getPipelineOptions().as(GcsOptions.class).getExecutorService();
- workTickets = new Semaphore(maxParallelism);
- failure = new AtomicReference<>();
- }
-
- @Override
- public void processElement(final ProcessContext c) throws Exception {
- try {
- workTickets.acquire();
- } catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while scheduling work", e);
- }
-
- if (failure.get() != null) {
- throw Throwables.propagate(failure.get());
- }
-
- executor.submit(new Runnable() {
- @Override
- public void run() {
- try {
- doFn.processElement(new WrappedContext(c));
- } catch (Throwable t) {
- failure.compareAndSet(null, t);
- Throwables.propagateIfPossible(t);
- throw new AssertionError("Unexpected checked exception: " + t);
- } finally {
- workTickets.release();
- }
- }
- });
- }
-
- @Override
- public void finishBundle(Context c) throws Exception {
- // Acquire all the work tickets to guarantee that all the previous
- // processElement calls have finished.
- workTickets.acquire(maxParallelism);
- if (failure.get() != null) {
- throw Throwables.propagate(failure.get());
- }
- doFn.finishBundle(c);
- }
-
- @Override
- protected TypeDescriptor<InputT> getInputTypeDescriptor() {
- return doFn.getInputTypeDescriptor();
- }
-
- @Override
- protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- return doFn.getOutputTypeDescriptor();
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Wraps a DoFn context, forcing single-thread output so that threads don't
- * propagate through to downstream functions.
- */
- private class WrappedContext extends ProcessContext {
- private final ProcessContext context;
-
- WrappedContext(ProcessContext context) {
- this.context = context;
- }
-
- @Override
- public InputT element() {
- return context.element();
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return context.getPipelineOptions();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return context.sideInput(view);
- }
-
- @Override
- public void output(OutputT output) {
- synchronized (MultiThreadedIntraBundleProcessingDoFn.this) {
- context.output(output);
- }
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- synchronized (MultiThreadedIntraBundleProcessingDoFn.this) {
- context.outputWithTimestamp(output, timestamp);
- }
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- synchronized (MultiThreadedIntraBundleProcessingDoFn.this) {
- context.sideOutput(tag, output);
- }
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- synchronized (MultiThreadedIntraBundleProcessingDoFn.this) {
- context.sideOutputWithTimestamp(tag, output, timestamp);
- }
- }
-
- @Override
- public Instant timestamp() {
- return context.timestamp();
- }
-
- @Override
- public BoundedWindow window() {
- return context.window();
- }
-
- @Override
- public PaneInfo pane() {
- return context.pane();
- }
-
- @Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- return context.windowingInternals();
- }
-
- @Override
- protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
- String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- return context.createAggregatorInternal(name, combiner);
- }
- }
-
- private final DoFn<InputT, OutputT> doFn;
- private int maxParallelism;
-
- private transient ExecutorService executor;
- private transient Semaphore workTickets;
- private transient AtomicReference<Throwable> failure;
- }
-
- /**
- * Default maximum for number of concurrent elements to process.
- */
- private static final int DEFAULT_MAX_PARALLELISM = 16;
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Keys.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Keys.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Keys.java
deleted file mode 100644
index 370d43d..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Keys.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * {@code Keys<K>} takes a {@code PCollection} of {@code KV<K, V>}s and
- * returns a {@code PCollection<K>} of the keys.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<KV<String, Long>> wordCounts = ...;
- * PCollection<String> words = wordCounts.apply(Keys.<String>create());
- * } </pre>
- *
- * <p>Each output element has the same timestamp and is in the same windows
- * as its corresponding input element, and the output {@code PCollection}
- * has the same
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
- * associated with it as the input.
- *
- * <p>See also {@link Values}.
- *
- * @param <K> the type of the keys in the input {@code PCollection},
- * and the type of the elements in the output {@code PCollection}
- */
-public class Keys<K> extends PTransform<PCollection<? extends KV<K, ?>>,
- PCollection<K>> {
- /**
- * Returns a {@code Keys<K>} {@code PTransform}.
- *
- * @param <K> the type of the keys in the input {@code PCollection},
- * and the type of the elements in the output {@code PCollection}
- */
- public static <K> Keys<K> create() {
- return new Keys<>();
- }
-
- private Keys() { }
-
- @Override
- public PCollection<K> apply(PCollection<? extends KV<K, ?>> in) {
- return
- in.apply(ParDo.named("Keys")
- .of(new DoFn<KV<K, ?>, K>() {
- @Override
- public void processElement(ProcessContext c) {
- c.output(c.element().getKey());
- }
- }));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/KvSwap.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/KvSwap.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/KvSwap.java
deleted file mode 100644
index 5a9cc87..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/KvSwap.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * {@code KvSwap<K, V>} takes a {@code PCollection<KV<K, V>>} and
- * returns a {@code PCollection<KV<V, K>>}, where all the keys and
- * values have been swapped.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<String, Long> wordsToCounts = ...;
- * PCollection<Long, String> countsToWords =
- * wordToCounts.apply(KvSwap.<String, Long>create());
- * } </pre>
- *
- * <p>Each output element has the same timestamp and is in the same windows
- * as its corresponding input element, and the output {@code PCollection}
- * has the same
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
- * associated with it as the input.
- *
- * @param <K> the type of the keys in the input {@code PCollection}
- * and the values in the output {@code PCollection}
- * @param <V> the type of the values in the input {@code PCollection}
- * and the keys in the output {@code PCollection}
- */
-public class KvSwap<K, V> extends PTransform<PCollection<KV<K, V>>,
- PCollection<KV<V, K>>> {
- /**
- * Returns a {@code KvSwap<K, V>} {@code PTransform}.
- *
- * @param <K> the type of the keys in the input {@code PCollection}
- * and the values in the output {@code PCollection}
- * @param <V> the type of the values in the input {@code PCollection}
- * and the keys in the output {@code PCollection}
- */
- public static <K, V> KvSwap<K, V> create() {
- return new KvSwap<>();
- }
-
- private KvSwap() { }
-
- @Override
- public PCollection<KV<V, K>> apply(PCollection<KV<K, V>> in) {
- return
- in.apply(ParDo.named("KvSwap")
- .of(new DoFn<KV<K, V>, KV<V, K>>() {
- @Override
- public void processElement(ProcessContext c) {
- KV<K, V> e = c.element();
- c.output(KV.of(e.getValue(), e.getKey()));
- }
- }));
- }
-}