You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:46 UTC

[22/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnWithContext.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnWithContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnWithContext.java
deleted file mode 100644
index 4f131ad..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/DoFnWithContext.java
+++ /dev/null
@@ -1,416 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn.DelegatingAggregator;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.WindowingInternals;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * The argument to {@link ParDo} providing the code to use to process
- * elements of the input
- * {@link com.google.cloud.dataflow.sdk.values.PCollection}.
- *
- * <p>See {@link ParDo} for more explanation, examples of use, and
- * discussion of constraints on {@code DoFnWithContext}s, including their
- * serializability, lack of access to global shared mutable state,
- * requirements for failure tolerance, and benefits of optimization.
- *
- * <p>{@code DoFnWithContext}s can be tested in a particular
- * {@code Pipeline} by running that {@code Pipeline} on sample input
- * and then checking its output.  Unit testing of a {@code DoFnWithContext},
- * separately from any {@code ParDo} transform or {@code Pipeline},
- * can be done via the {@link DoFnTester} harness.
- *
- * <p>Implementations must define a method annotated with {@link ProcessElement}
- * that satisfies the requirements described there. See the {@link ProcessElement}
- * for details.
- *
- * <p>This functionality is experimental and likely to change.
- *
- * <p>Example usage:
- *
- * <pre> {@code
- * PCollection<String> lines = ... ;
- * PCollection<String> words =
- *     lines.apply(ParDo.of(new DoFnWithContext<String, String>() {
- *         @ProcessElement
- *         public void processElement(ProcessContext c, BoundedWindow window) {
- *
- *         }}));
- * } </pre>
- *
- * @param <InputT> the type of the (main) input elements
- * @param <OutputT> the type of the (main) output elements
- */
-@Experimental
-public abstract class DoFnWithContext<InputT, OutputT> implements Serializable {
-
-  /** Information accessible to all methods in this {@code DoFnWithContext}. */
-  public abstract class Context {
-
-    /**
-     * Returns the {@code PipelineOptions} specified with the
-     * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner}
-     * invoking this {@code DoFnWithContext}.  The {@code PipelineOptions} will
-     * be the default running via {@link DoFnTester}.
-     */
-    public abstract PipelineOptions getPipelineOptions();
-
-    /**
-     * Adds the given element to the main output {@code PCollection}.
-     *
-     * <p>Once passed to {@code output} the element should not be modified in
-     * any way.
-     *
-     * <p>If invoked from {@link ProcessElement}, the output
-     * element will have the same timestamp and be in the same windows
-     * as the input element passed to the method annotated with
-     * {@code @ProcessElement}.
-     *
-     * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
-     * this will attempt to use the
-     * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element. The output element
-     * will have a timestamp of negative infinity.
-     */
-    public abstract void output(OutputT output);
-
-    /**
-     * Adds the given element to the main output {@code PCollection},
-     * with the given timestamp.
-     *
-     * <p>Once passed to {@code outputWithTimestamp} the element should not be
-     * modified in any way.
-     *
-     * <p>If invoked from {@link ProcessElement}), the timestamp
-     * must not be older than the input element's timestamp minus
-     * {@link DoFn#getAllowedTimestampSkew}.  The output element will
-     * be in the same windows as the input element.
-     *
-     * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
-     * this will attempt to use the
-     * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element except for the
-     * timestamp.
-     */
-    public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
-
-    /**
-     * Adds the given element to the side output {@code PCollection} with the
-     * given tag.
-     *
-     * <p>Once passed to {@code sideOutput} the element should not be modified
-     * in any way.
-     *
-     * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags} to
-     * specify the tags of side outputs that it consumes. Non-consumed side
-     * outputs, e.g., outputs for monitoring purposes only, don't necessarily
-     * need to be specified.
-     *
-     * <p>The output element will have the same timestamp and be in the same
-     * windows as the input element passed to {@link ProcessElement}).
-     *
-     * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
-     * this will attempt to use the
-     * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element. The output element
-     * will have a timestamp of negative infinity.
-     *
-     * @see ParDo#withOutputTags
-     */
-    public abstract <T> void sideOutput(TupleTag<T> tag, T output);
-
-    /**
-     * Adds the given element to the specified side output {@code PCollection},
-     * with the given timestamp.
-     *
-     * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be
-     * modified in any way.
-     *
-     * <p>If invoked from {@link ProcessElement}), the timestamp
-     * must not be older than the input element's timestamp minus
-     * {@link DoFn#getAllowedTimestampSkew}.  The output element will
-     * be in the same windows as the input element.
-     *
-     * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
-     * this will attempt to use the
-     * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
-     * of the input {@code PCollection} to determine what windows the element
-     * should be in, throwing an exception if the {@code WindowFn} attempts
-     * to access any information about the input element except for the
-     * timestamp.
-     *
-     * @see ParDo#withOutputTags
-     */
-    public abstract <T> void sideOutputWithTimestamp(
-        TupleTag<T> tag, T output, Instant timestamp);
-  }
-
-  /**
-   * Information accessible when running {@link DoFn#processElement}.
-   */
-  public abstract class ProcessContext extends Context {
-
-    /**
-     * Returns the input element to be processed.
-     *
-     * <p>The element will not be changed -- it is safe to cache, etc.
-     * without copying.
-     */
-    public abstract InputT element();
-
-
-    /**
-     * Returns the value of the side input.
-     *
-     * @throws IllegalArgumentException if this is not a side input
-     * @see ParDo#withSideInputs
-     */
-    public abstract <T> T sideInput(PCollectionView<T> view);
-
-    /**
-     * Returns the timestamp of the input element.
-     *
-     * <p>See {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window}
-     * for more information.
-     */
-    public abstract Instant timestamp();
-
-    /**
-     * Returns information about the pane within this window into which the
-     * input element has been assigned.
-     *
-     * <p>Generally all data is in a single, uninteresting pane unless custom
-     * triggering and/or late data has been explicitly requested.
-     * See {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window}
-     * for more information.
-     */
-    public abstract PaneInfo pane();
-  }
-
-  /**
-   * Returns the allowed timestamp skew duration, which is the maximum
-   * duration that timestamps can be shifted backward in
-   * {@link DoFnWithContext.Context#outputWithTimestamp}.
-   *
-   * <p>The default value is {@code Duration.ZERO}, in which case
-   * timestamps can only be shifted forward to future.  For infinite
-   * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
-   */
-  public Duration getAllowedTimestampSkew() {
-    return Duration.ZERO;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  Map<String, DelegatingAggregator<?, ?>> aggregators = new HashMap<>();
-
-  /**
-   * Protects aggregators from being created after initialization.
-   */
-  private boolean aggregatorsAreFinal;
-
-  /**
-   * Returns a {@link TypeDescriptor} capturing what is known statically
-   * about the input type of this {@code DoFnWithContext} instance's most-derived
-   * class.
-   *
-   * <p>See {@link #getOutputTypeDescriptor} for more discussion.
-   */
-  protected TypeDescriptor<InputT> getInputTypeDescriptor() {
-    return new TypeDescriptor<InputT>(getClass()) {};
-  }
-
-  /**
-   * Returns a {@link TypeDescriptor} capturing what is known statically
-   * about the output type of this {@code DoFnWithContext} instance's
-   * most-derived class.
-   *
-   * <p>In the normal case of a concrete {@code DoFnWithContext} subclass with
-   * no generic type parameters of its own (including anonymous inner
-   * classes), this will be a complete non-generic type, which is good
-   * for choosing a default output {@code Coder<O>} for the output
-   * {@code PCollection<O>}.
-   */
-  protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-    return new TypeDescriptor<OutputT>(getClass()) {};
-  }
-
-  /**
-   * Interface for runner implementors to provide implementations of extra context information.
-   *
-   * <p>The methods on this interface are called by {@link DoFnReflector} before invoking an
-   * annotated {@link StartBundle}, {@link ProcessElement} or {@link FinishBundle} method that
-   * has indicated it needs the given extra context.
-   *
-   * <p>In the case of {@link ProcessElement} it is called once per invocation of
-   * {@link ProcessElement}.
-   */
-  public interface ExtraContextFactory<InputT, OutputT> {
-    /**
-     * Construct the {@link BoundedWindow} to use within a {@link DoFnWithContext} that
-     * needs it. This is called if the {@link ProcessElement} method has a parameter of type
-     * {@link BoundedWindow}.
-     *
-     * @return {@link BoundedWindow} of the element currently being processed.
-     */
-    BoundedWindow window();
-
-    /**
-     * Construct the {@link WindowingInternals} to use within a {@link DoFnWithContext} that
-     * needs it. This is called if the {@link ProcessElement} method has a parameter of type
-     * {@link WindowingInternals}.
-     */
-    WindowingInternals<InputT, OutputT> windowingInternals();
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Annotation for the method to use to prepare an instance for processing a batch of elements.
-   * The method annotated with this must satisfy the following constraints:
-   * <ul>
-   *   <li>It must have at least one argument.
-   *   <li>Its first (and only) argument must be a {@link DoFnWithContext.Context}.
-   * </ul>
-   */
-  @Documented
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target(ElementType.METHOD)
-  public @interface StartBundle {}
-
-  /**
-   * Annotation for the method to use for processing elements. A subclass of
-   * {@link DoFnWithContext} must have a method with this annotation satisfying
-   * the following constraints in order for it to be executable:
-   * <ul>
-   *   <li>It must have at least one argument.
-   *   <li>Its first argument must be a {@link DoFnWithContext.ProcessContext}.
-   *   <li>Its remaining arguments must be {@link BoundedWindow}, or
-   *   {@link WindowingInternals WindowingInternals&lt;InputT, OutputT&gt;}.
-   * </ul>
-   */
-  @Documented
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target(ElementType.METHOD)
-  public @interface ProcessElement {}
-
-  /**
-   * Annotation for the method to use to prepare an instance for processing a batch of elements.
-   * The method annotated with this must satisfy the following constraints:
-   * <ul>
-   *   <li>It must have at least one argument.
-   *   <li>Its first (and only) argument must be a {@link DoFnWithContext.Context}.
-   * </ul>
-   */
-  @Documented
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target(ElementType.METHOD)
-  public @interface FinishBundle {}
-
-  /**
-   * Returns an {@link Aggregator} with aggregation logic specified by the
-   * {@link CombineFn} argument. The name provided must be unique across
-   * {@link Aggregator}s created within the DoFn. Aggregators can only be created
-   * during pipeline construction.
-   *
-   * @param name the name of the aggregator
-   * @param combiner the {@link CombineFn} to use in the aggregator
-   * @return an aggregator for the provided name and combiner in the scope of
-   *         this DoFn
-   * @throws NullPointerException if the name or combiner is null
-   * @throws IllegalArgumentException if the given name collides with another
-   *         aggregator in this scope
-   * @throws IllegalStateException if called during pipeline execution.
-   */
-  public final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-      createAggregator(String name, Combine.CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
-    checkNotNull(name, "name cannot be null");
-    checkNotNull(combiner, "combiner cannot be null");
-    checkArgument(!aggregators.containsKey(name),
-        "Cannot create aggregator with name %s."
-        + " An Aggregator with that name already exists within this scope.",
-        name);
-    checkState(!aggregatorsAreFinal,
-        "Cannot create an aggregator during pipeline execution."
-        + " Aggregators should be registered during pipeline construction.");
-
-    DelegatingAggregator<AggInputT, AggOutputT> aggregator =
-        new DelegatingAggregator<>(name, combiner);
-    aggregators.put(name, aggregator);
-    return aggregator;
-  }
-
-  /**
-   * Returns an {@link Aggregator} with the aggregation logic specified by the
-   * {@link SerializableFunction} argument. The name provided must be unique
-   * across {@link Aggregator}s created within the DoFn. Aggregators can only be
-   * created during pipeline construction.
-   *
-   * @param name the name of the aggregator
-   * @param combiner the {@link SerializableFunction} to use in the aggregator
-   * @return an aggregator for the provided name and combiner in the scope of
-   *         this DoFn
-   * @throws NullPointerException if the name or combiner is null
-   * @throws IllegalArgumentException if the given name collides with another
-   *         aggregator in this scope
-   * @throws IllegalStateException if called during pipeline execution.
-   */
-  public final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(
-      String name, SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
-    checkNotNull(combiner, "combiner cannot be null.");
-    return createAggregator(name, Combine.IterableCombineFn.of(combiner));
-  }
-
-  /**
-   * Finalize the {@link DoFnWithContext} construction to prepare for processing.
-   * This method should be called by runners before any processing methods.
-   */
-  void prepareForProcessing() {
-    aggregatorsAreFinal = true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Filter.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Filter.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Filter.java
deleted file mode 100644
index 9e123a1..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Filter.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * {@code PTransform}s for filtering from a {@code PCollection} the
- * elements satisfying a predicate, or satisfying an inequality with
- * a given value based on the elements' natural ordering.
- *
- * @param <T> the type of the values in the input {@code PCollection},
- * and the type of the elements in the output {@code PCollection}
- */
-public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
-
-  /**
-   * Returns a {@code PTransform} that takes an input
-   * {@code PCollection<T>} and returns a {@code PCollection<T>} with
-   * elements that satisfy the given predicate.  The predicate must be
-   * a {@code SerializableFunction<T, Boolean>}.
-   *
-   * <p>Example of use:
-   * <pre> {@code
-   * PCollection<String> wordList = ...;
-   * PCollection<String> longWords =
-   *     wordList.apply(Filter.byPredicate(new MatchIfWordLengthGT(6)));
-   * } </pre>
-   *
-   * <p>See also {@link #lessThan}, {@link #lessThanEq},
-   * {@link #greaterThan}, {@link #greaterThanEq}, which return elements
-   * satisfying various inequalities with the specified value based on
-   * the elements' natural ordering.
-   */
-  public static <T, PredicateT extends SerializableFunction<T, Boolean>> Filter<T>
-  byPredicate(PredicateT predicate) {
-    return new Filter<T>("Filter", predicate);
-  }
-
-  /**
-   * @deprecated use {@link #byPredicate}, which returns a {@link Filter} transform instead of
-   * a {@link ParDo.Bound}.
-   */
-  @Deprecated
-  public static <T, PredicateT extends SerializableFunction<T, Boolean>> ParDo.Bound<T, T>
-  by(final PredicateT filterPred) {
-    return ParDo.named("Filter").of(new DoFn<T, T>() {
-      @Override
-      public void processElement(ProcessContext c) {
-        if (filterPred.apply(c.element()) == true) {
-          c.output(c.element());
-        }
-      }
-    });
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input
-   * {@link PCollection} and returns a {@link PCollection} with
-   * elements that are less than a given value, based on the
-   * elements' natural ordering. Elements must be {@code Comparable}.
-   *
-   * <p>Example of use:
-   * <pre> {@code
-   * PCollection<Integer> listOfNumbers = ...;
-   * PCollection<Integer> smallNumbers =
-   *     listOfNumbers.apply(Filter.lessThan(10));
-   * } </pre>
-   *
-   * <p>See also {@link #lessThanEq}, {@link #greaterThanEq},
-   * and {@link #greaterThan}, which return elements satisfying various
-   * inequalities with the specified value based on the elements'
-   * natural ordering.
-   *
-   * <p>See also {@link #byPredicate}, which returns elements
-   * that satisfy the given predicate.
-   */
-  public static <T extends Comparable<T>> ParDo.Bound<T, T> lessThan(final T value) {
-    return ParDo.named("Filter.lessThan").of(new DoFn<T, T>() {
-      @Override
-      public void processElement(ProcessContext c) {
-        if (c.element().compareTo(value) < 0) {
-          c.output(c.element());
-        }
-      }
-    });
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input
-   * {@code PCollection<T>} and returns a {@code PCollection<T>} with
-   * elements that are greater than a given value, based on the
-   * elements' natural ordering. Elements must be {@code Comparable}.
-   *
-   * <p>Example of use:
-   * <pre> {@code
-   * PCollection<Integer> listOfNumbers = ...;
-   * PCollection<Integer> largeNumbers =
-   *     listOfNumbers.apply(Filter.greaterThan(1000));
-   * } </pre>
-   *
-   * <p>See also {@link #greaterThanEq}, {@link #lessThan},
-   * and {@link #lessThanEq}, which return elements satisfying various
-   * inequalities with the specified value based on the elements'
-   * natural ordering.
-   *
-   * <p>See also {@link #byPredicate}, which returns elements
-   * that satisfy the given predicate.
-   */
-  public static <T extends Comparable<T>> ParDo.Bound<T, T> greaterThan(final T value) {
-    return ParDo.named("Filter.greaterThan").of(new DoFn<T, T>() {
-      @Override
-      public void processElement(ProcessContext c) {
-        if (c.element().compareTo(value) > 0) {
-          c.output(c.element());
-        }
-      }
-    });
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input
-   * {@code PCollection<T>} and returns a {@code PCollection<T>} with
-   * elements that are less than or equal to a given value, based on the
-   * elements' natural ordering. Elements must be {@code Comparable}.
-   *
-   * <p>Example of use:
-   * <pre> {@code
-   * PCollection<Integer> listOfNumbers = ...;
-   * PCollection<Integer> smallOrEqualNumbers =
-   *     listOfNumbers.apply(Filter.lessThanEq(10));
-   * } </pre>
-   *
-   * <p>See also {@link #lessThan}, {@link #greaterThanEq},
-   * and {@link #greaterThan}, which return elements satisfying various
-   * inequalities with the specified value based on the elements'
-   * natural ordering.
-   *
-   * <p>See also {@link #byPredicate}, which returns elements
-   * that satisfy the given predicate.
-   */
-  public static <T extends Comparable<T>> ParDo.Bound<T, T> lessThanEq(final T value) {
-    return ParDo.named("Filter.lessThanEq").of(new DoFn<T, T>() {
-      @Override
-      public void processElement(ProcessContext c) {
-        if (c.element().compareTo(value) <= 0) {
-          c.output(c.element());
-        }
-      }
-    });
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes an input
-   * {@code PCollection<T>} and returns a {@code PCollection<T>} with
-   * elements that are greater than or equal to a given value, based on
-   * the elements' natural ordering. Elements must be {@code Comparable}.
-   *
-   * <p>Example of use:
-   * <pre> {@code
-   * PCollection<Integer> listOfNumbers = ...;
-   * PCollection<Integer> largeOrEqualNumbers =
-   *     listOfNumbers.apply(Filter.greaterThanEq(1000));
-   * } </pre>
-   *
-   * <p>See also {@link #greaterThan}, {@link #lessThan},
-   * and {@link #lessThanEq}, which return elements satisfying various
-   * inequalities with the specified value based on the elements'
-   * natural ordering.
-   *
-   * <p>See also {@link #byPredicate}, which returns elements
-   * that satisfy the given predicate.
-   */
-  public static <T extends Comparable<T>> ParDo.Bound<T, T> greaterThanEq(final T value) {
-    return ParDo.named("Filter.greaterThanEq").of(new DoFn<T, T>() {
-      @Override
-      public void processElement(ProcessContext c) {
-        if (c.element().compareTo(value) >= 0) {
-          c.output(c.element());
-        }
-      }
-    });
-  }
-
-  ///////////////////////////////////////////////////////////////////////////////
-
-  private SerializableFunction<T, Boolean> predicate;
-
-  private Filter(SerializableFunction<T, Boolean> predicate) {
-    this.predicate = predicate;
-  }
-
-  private Filter(String name, SerializableFunction<T, Boolean> predicate) {
-    super(name);
-    this.predicate = predicate;
-  }
-
-  public Filter<T> named(String name) {
-    return new Filter<>(name, predicate);
-  }
-
-  @Override
-  public PCollection<T> apply(PCollection<T> input) {
-    PCollection<T> output = input.apply(ParDo.named("Filter").of(new DoFn<T, T>() {
-      @Override
-      public void processElement(ProcessContext c) {
-        if (predicate.apply(c.element()) == true) {
-          c.output(c.element());
-        }
-      }
-    }));
-    return output;
-  }
-
-  @Override
-  protected Coder<T> getDefaultOutputCoder(PCollection<T> input) {
-    return input.getCoder();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/FlatMapElements.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/FlatMapElements.java
deleted file mode 100644
index fbaad5b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/FlatMapElements.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-import java.lang.reflect.ParameterizedType;
-
-/**
- * {@code PTransform}s for mapping a simple function that returns iterables over the elements of a
- * {@link PCollection} and merging the results.
- */
-public class FlatMapElements<InputT, OutputT>
-extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
-  /**
-   * For a {@code SerializableFunction<InputT, ? extends Iterable<OutputT>>} {@code fn},
-   * returns a {@link PTransform} that applies {@code fn} to every element of the input
-   * {@code PCollection<InputT>} and outputs all of the elements to the output
-   * {@code PCollection<OutputT>}.
-   *
-   * <p>Example of use in Java 8:
-   * <pre>{@code
-   * PCollection<String> words = lines.apply(
-   *     FlatMapElements.via((String line) -> Arrays.asList(line.split(" ")))
-   *         .withOutputType(new TypeDescriptor<String>(){});
-   * }</pre>
-   *
-   * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type
-   * descriptor need not be provided.
-   */
-  public static <InputT, OutputT> MissingOutputTypeDescriptor<InputT, OutputT>
-  via(SerializableFunction<InputT, ? extends Iterable<OutputT>> fn) {
-    return new MissingOutputTypeDescriptor<>(fn);
-  }
-
-  /**
-   * For a {@code SimpleFunction<InputT, ? extends Iterable<OutputT>>} {@code fn},
-   * return a {@link PTransform} that applies {@code fn} to every element of the input
-   * {@code PCollection<InputT>} and outputs all of the elements to the output
-   * {@code PCollection<OutputT>}.
-   *
-   * <p>This overload is intended primarily for use in Java 7. In Java 8, the overload
-   * {@link #via(SerializableFunction)} supports use of lambda for greater concision.
-   *
-   * <p>Example of use in Java 7:
-   * <pre>{@code
-   * PCollection<String> lines = ...;
-   * PCollection<String> words = lines.apply(FlatMapElements.via(
-   *     new SimpleFunction<String, List<String>>() {
-   *       public Integer apply(String line) {
-   *         return Arrays.asList(line.split(" "));
-   *       }
-   *     });
-   * }</pre>
-   *
-   * <p>To use a Java 8 lambda, see {@link #via(SerializableFunction)}.
-   */
-  public static <InputT, OutputT> FlatMapElements<InputT, OutputT>
-  via(SimpleFunction<InputT, ? extends Iterable<OutputT>> fn) {
-
-    @SuppressWarnings({"rawtypes", "unchecked"}) // safe by static typing
-    TypeDescriptor<Iterable<?>> iterableType = (TypeDescriptor) fn.getOutputTypeDescriptor();
-
-    @SuppressWarnings("unchecked") // safe by correctness of getIterableElementType
-    TypeDescriptor<OutputT> outputType =
-        (TypeDescriptor<OutputT>) getIterableElementType(iterableType);
-
-    return new FlatMapElements<>(fn, outputType);
-  }
-
-  /**
-   * An intermediate builder for a {@link FlatMapElements} transform. To complete the transform,
-   * provide an output type descriptor to {@link MissingOutputTypeDescriptor#withOutputType}. See
-   * {@link #via(SerializableFunction)} for a full example of use.
-   */
-  public static final class MissingOutputTypeDescriptor<InputT, OutputT> {
-
-    private final SerializableFunction<InputT, ? extends Iterable<OutputT>> fn;
-
-    private MissingOutputTypeDescriptor(
-        SerializableFunction<InputT, ? extends Iterable<OutputT>> fn) {
-      this.fn = fn;
-    }
-
-    public FlatMapElements<InputT, OutputT> withOutputType(TypeDescriptor<OutputT> outputType) {
-      return new FlatMapElements<>(fn, outputType);
-    }
-  }
-
-  private static TypeDescriptor<?> getIterableElementType(
-      TypeDescriptor<Iterable<?>> iterableTypeDescriptor) {
-
-    // If a rawtype was used, the type token may be for Object, not a subtype of Iterable.
-    // In this case, we rely on static typing of the function elsewhere to ensure it is
-    // at least some kind of iterable, and grossly overapproximate the element type to be Object.
-    if (!iterableTypeDescriptor.isSubtypeOf(new TypeDescriptor<Iterable<?>>() {})) {
-      return new TypeDescriptor<Object>() {};
-    }
-
-    // Otherwise we can do the proper thing and get the actual type parameter.
-    ParameterizedType iterableType =
-        (ParameterizedType) iterableTypeDescriptor.getSupertype(Iterable.class).getType();
-    return TypeDescriptor.of(iterableType.getActualTypeArguments()[0]);
-  }
-
-  //////////////////////////////////////////////////////////////////////////////////////////////////
-
-  private final SerializableFunction<InputT, ? extends Iterable<OutputT>> fn;
-  private final transient TypeDescriptor<OutputT> outputType;
-
-  private FlatMapElements(
-      SerializableFunction<InputT, ? extends Iterable<OutputT>> fn,
-      TypeDescriptor<OutputT> outputType) {
-    this.fn = fn;
-    this.outputType = outputType;
-  }
-
-  @Override
-  public PCollection<OutputT> apply(PCollection<InputT> input) {
-    return input.apply(ParDo.named("Map").of(new DoFn<InputT, OutputT>() {
-      private static final long serialVersionUID = 0L;
-      @Override
-      public void processElement(ProcessContext c) {
-        for (OutputT element : fn.apply(c.element())) {
-          c.output(element);
-        }
-      }
-    })).setTypeDescriptorInternal(outputType);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Flatten.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Flatten.java
deleted file mode 100644
index de6add0..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Flatten.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.IterableLikeCoder;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * {@code Flatten<T>} takes multiple {@code PCollection<T>}s bundled
- * into a {@code PCollectionList<T>} and returns a single
- * {@code PCollection<T>} containing all the elements in all the input
- * {@code PCollection}s.  The name "Flatten" suggests taking a list of
- * lists and flattening them into a single list.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<String> pc1 = ...;
- * PCollection<String> pc2 = ...;
- * PCollection<String> pc3 = ...;
- * PCollectionList<String> pcs = PCollectionList.of(pc1).and(pc2).and(pc3);
- * PCollection<String> merged = pcs.apply(Flatten.<String>pCollections());
- * } </pre>
- *
- * <p>By default, the {@code Coder} of the output {@code PCollection}
- * is the same as the {@code Coder} of the first {@code PCollection}
- * in the input {@code PCollectionList} (if the
- * {@code PCollectionList} is non-empty).
- *
- */
-public class Flatten {
-
-  /**
-   * Returns a {@link PTransform} that flattens a {@link PCollectionList}
-   * into a {@link PCollection} containing all the elements of all
-   * the {@link PCollection}s in its input.
-   *
-   * <p>All inputs must have equal {@link WindowFn}s.
-   * The output elements of {@code Flatten<T>} are in the same windows and
-   * have the same timestamps as their corresponding input elements.  The output
-   * {@code PCollection} will have the same
-   * {@link WindowFn} as all of the inputs.
-   *
-   * @param <T> the type of the elements in the input and output
-   * {@code PCollection}s.
-   */
-  public static <T> FlattenPCollectionList<T> pCollections() {
-    return new FlattenPCollectionList<>();
-  }
-
-  /**
-   * Returns a {@code PTransform} that takes a {@code PCollection<Iterable<T>>}
-   * and returns a {@code PCollection<T>} containing all the elements from
-   * all the {@code Iterable}s.
-   *
-   * <p>Example of use:
-   * <pre> {@code
-   * PCollection<Iterable<Integer>> pcOfIterables = ...;
-   * PCollection<Integer> pc = pcOfIterables.apply(Flatten.<Integer>iterables());
-   * } </pre>
-   *
-   * <p>By default, the output {@code PCollection} encodes its elements
-   * using the same {@code Coder} that the input uses for
-   * the elements in its {@code Iterable}.
-   *
-   * @param <T> the type of the elements of the input {@code Iterable} and
-   * the output {@code PCollection}
-   */
-  public static <T> FlattenIterables<T> iterables() {
-    return new FlattenIterables<>();
-  }
-
-  /**
-   * A {@link PTransform} that flattens a {@link PCollectionList}
-   * into a {@link PCollection} containing all the elements of all
-   * the {@link PCollection}s in its input.
-   * Implements {@link #pCollections}.
-   *
-   * @param <T> the type of the elements in the input and output
-   * {@code PCollection}s.
-   */
-  public static class FlattenPCollectionList<T>
-      extends PTransform<PCollectionList<T>, PCollection<T>> {
-
-    private FlattenPCollectionList() { }
-
-    @Override
-    public PCollection<T> apply(PCollectionList<T> inputs) {
-      WindowingStrategy<?, ?> windowingStrategy;
-      IsBounded isBounded = IsBounded.BOUNDED;
-      if (!inputs.getAll().isEmpty()) {
-        windowingStrategy = inputs.get(0).getWindowingStrategy();
-        for (PCollection<?> input : inputs.getAll()) {
-          WindowingStrategy<?, ?> other = input.getWindowingStrategy();
-          if (!windowingStrategy.getWindowFn().isCompatible(other.getWindowFn())) {
-            throw new IllegalStateException(
-                "Inputs to Flatten had incompatible window windowFns: "
-                + windowingStrategy.getWindowFn() + ", " + other.getWindowFn());
-          }
-
-          if (!windowingStrategy.getTrigger().getSpec()
-              .isCompatible(other.getTrigger().getSpec())) {
-            throw new IllegalStateException(
-                "Inputs to Flatten had incompatible triggers: "
-                + windowingStrategy.getTrigger() + ", " + other.getTrigger());
-          }
-          isBounded = isBounded.and(input.isBounded());
-        }
-      } else {
-        windowingStrategy = WindowingStrategy.globalDefault();
-      }
-
-      return PCollection.<T>createPrimitiveOutputInternal(
-          inputs.getPipeline(),
-          windowingStrategy,
-          isBounded);
-    }
-
-    @Override
-    protected Coder<?> getDefaultOutputCoder(PCollectionList<T> input)
-        throws CannotProvideCoderException {
-
-      // Take coder from first collection
-      for (PCollection<T> pCollection : input.getAll()) {
-        return pCollection.getCoder();
-      }
-
-      // No inputs
-      throw new CannotProvideCoderException(
-          this.getClass().getSimpleName() + " cannot provide a Coder for"
-          + " empty " + PCollectionList.class.getSimpleName());
-    }
-  }
-
-  /**
-   * {@code FlattenIterables<T>} takes a {@code PCollection<Iterable<T>>} and returns a
-   * {@code PCollection<T>} that contains all the elements from each iterable.
-   * Implements {@link #iterables}.
-   *
-   * @param <T> the type of the elements of the input {@code Iterable}s and
-   * the output {@code PCollection}
-   */
-  public static class FlattenIterables<T>
-      extends PTransform<PCollection<? extends Iterable<T>>, PCollection<T>> {
-
-    @Override
-    public PCollection<T> apply(PCollection<? extends Iterable<T>> in) {
-      Coder<? extends Iterable<T>> inCoder = in.getCoder();
-      if (!(inCoder instanceof IterableLikeCoder)) {
-        throw new IllegalArgumentException(
-            "expecting the input Coder<Iterable> to be an IterableLikeCoder");
-      }
-      @SuppressWarnings("unchecked")
-      Coder<T> elemCoder = ((IterableLikeCoder<T, ?>) inCoder).getElemCoder();
-
-      return in.apply(ParDo.named("FlattenIterables").of(
-          new DoFn<Iterable<T>, T>() {
-            @Override
-            public void processElement(ProcessContext c) {
-              for (T i : c.element()) {
-                c.output(i);
-              }
-            }
-          }))
-          .setCoder(elemCoder);
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  static {
-    DirectPipelineRunner.registerDefaultTransformEvaluator(
-        FlattenPCollectionList.class,
-        new DirectPipelineRunner.TransformEvaluator<FlattenPCollectionList>() {
-          @Override
-          public void evaluate(
-              FlattenPCollectionList transform,
-              DirectPipelineRunner.EvaluationContext context) {
-            evaluateHelper(transform, context);
-          }
-        });
-  }
-
-  private static <T> void evaluateHelper(
-      FlattenPCollectionList<T> transform,
-      DirectPipelineRunner.EvaluationContext context) {
-    List<DirectPipelineRunner.ValueWithMetadata<T>> outputElems = new ArrayList<>();
-    PCollectionList<T> inputs = context.getInput(transform);
-
-    for (PCollection<T> input : inputs.getAll()) {
-      outputElems.addAll(context.getPCollectionValuesWithMetadata(input));
-    }
-
-    context.setPCollectionValuesWithMetadata(context.getOutput(transform), outputElems);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
deleted file mode 100644
index 8fde3e0..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
+++ /dev/null
@@ -1,575 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.ValueWithMetadata;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn;
-import com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn;
-import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder;
-import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * {@code GroupByKey<K, V>} takes a {@code PCollection<KV<K, V>>},
- * groups the values by key and windows, and returns a
- * {@code PCollection<KV<K, Iterable<V>>>} representing a map from
- * each distinct key and window of the input {@code PCollection} to an
- * {@code Iterable} over all the values associated with that key in
- * the input per window.  Absent repeatedly-firing
- * {@link Window#triggering triggering}, each key in the output
- * {@code PCollection} is unique within each window.
- *
- * <p>{@code GroupByKey} is analogous to converting a multi-map into
- * a uni-map, and related to {@code GROUP BY} in SQL.  It corresponds
- * to the "shuffle" step between the Mapper and the Reducer in the
- * MapReduce framework.
- *
- * <p>Two keys of type {@code K} are compared for equality
- * <b>not</b> by regular Java {@link Object#equals}, but instead by
- * first encoding each of the keys using the {@code Coder} of the
- * keys of the input {@code PCollection}, and then comparing the
- * encoded bytes.  This admits efficient parallel evaluation.  Note that
- * this requires that the {@code Coder} of the keys be deterministic (see
- * {@link Coder#verifyDeterministic()}).  If the key {@code Coder} is not
- * deterministic, an exception is thrown at pipeline construction time.
- *
- * <p>By default, the {@code Coder} of the keys of the output
- * {@code PCollection} is the same as that of the keys of the input,
- * and the {@code Coder} of the elements of the {@code Iterable}
- * values of the output {@code PCollection} is the same as the
- * {@code Coder} of the values of the input.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<KV<String, Doc>> urlDocPairs = ...;
- * PCollection<KV<String, Iterable<Doc>>> urlToDocs =
- *     urlDocPairs.apply(GroupByKey.<String, Doc>create());
- * PCollection<R> results =
- *     urlToDocs.apply(ParDo.of(new DoFn<KV<String, Iterable<Doc>>, R>() {
- *       public void processElement(ProcessContext c) {
- *         String url = c.element().getKey();
- *         Iterable<Doc> docsWithThatUrl = c.element().getValue();
- *         ... process all docs having that url ...
- *       }}));
- * } </pre>
- *
- * <p>{@code GroupByKey} is a key primitive in data-parallel
- * processing, since it is the main way to efficiently bring
- * associated data together into one location.  It is also a key
- * determiner of the performance of a data-parallel pipeline.
- *
- * <p>See {@link com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey}
- * for a way to group multiple input PCollections by a common key at once.
- *
- * <p>See {@link Combine.PerKey} for a common pattern of
- * {@code GroupByKey} followed by {@link Combine.GroupedValues}.
- *
- * <p>When grouping, windows that can be merged according to the {@link WindowFn}
- * of the input {@code PCollection} will be merged together, and a window pane
- * corresponding to the new, merged window will be created. The items in this pane
- * will be emitted when a trigger fires. By default this will be when the input
- * sources estimate there will be no more data for the window. See
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark}
- * for details on the estimation.
- *
- * <p>The timestamp for each emitted pane is determined by the
- * {@link Window.Bound#withOutputTimeFn windowing operation}.
- * The output {@code PCollection} will have the same {@link WindowFn}
- * as the input.
- *
- * <p>If the input {@code PCollection} contains late data (see
- * {@link com.google.cloud.dataflow.sdk.io.PubsubIO.Read.Bound#timestampLabel}
- * for an example of how this can occur) or the
- * {@link Window#triggering requested TriggerFn} can fire before
- * the watermark, then there may be multiple elements
- * output by a {@code GroupByKey} that correspond to the same key and window.
- *
- * <p>If the {@link WindowFn} of the input requires merging, it is not
- * valid to apply another {@code GroupByKey} without first applying a new
- * {@link WindowFn} or applying {@link Window#remerge()}.
- *
- * @param <K> the type of the keys of the input and output
- * {@code PCollection}s
- * @param <V> the type of the values of the input {@code PCollection}
- * and the elements of the {@code Iterable}s in the output
- * {@code PCollection}
- */
-public class GroupByKey<K, V>
-    extends PTransform<PCollection<KV<K, V>>,
-                       PCollection<KV<K, Iterable<V>>>> {
-
-  private final boolean fewKeys;
-
-  private GroupByKey(boolean fewKeys) {
-    this.fewKeys = fewKeys;
-  }
-
-  /**
-   * Returns a {@code GroupByKey<K, V>} {@code PTransform}.
-   *
-   * @param <K> the type of the keys of the input and output
-   * {@code PCollection}s
-   * @param <V> the type of the values of the input {@code PCollection}
-   * and the elements of the {@code Iterable}s in the output
-   * {@code PCollection}
-   */
-  public static <K, V> GroupByKey<K, V> create() {
-    return new GroupByKey<>(false);
-  }
-
-  /**
-   * Returns a {@code GroupByKey<K, V>} {@code PTransform}.
-   *
-   * @param <K> the type of the keys of the input and output
-   * {@code PCollection}s
-   * @param <V> the type of the values of the input {@code PCollection}
-   * and the elements of the {@code Iterable}s in the output
-   * {@code PCollection}
-   * @param fewKeys whether it groups just few keys.
-   */
-  static <K, V> GroupByKey<K, V> create(boolean fewKeys) {
-    return new GroupByKey<>(fewKeys);
-  }
-
-  /**
-   * Returns whether it groups just few keys.
-   */
-  public boolean fewKeys() {
-    return fewKeys;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  public static void applicableTo(PCollection<?> input) {
-    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-    // Verify that the input PCollection is bounded, or that there is windowing/triggering being
-    // used. Without this, the watermark (at end of global window) will never be reached.
-    if (windowingStrategy.getWindowFn() instanceof GlobalWindows
-        && windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger
-        && input.isBounded() != IsBounded.BOUNDED) {
-      throw new IllegalStateException("GroupByKey cannot be applied to non-bounded PCollection in "
-          + "the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform "
-          + "prior to GroupByKey.");
-    }
-
-    // Validate the window merge function.
-    if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
-      String cause = ((InvalidWindows<?>) windowingStrategy.getWindowFn()).getCause();
-      throw new IllegalStateException(
-          "GroupByKey must have a valid Window merge function.  "
-              + "Invalid because: " + cause);
-    }
-  }
-
-  @Override
-  public void validate(PCollection<KV<K, V>> input) {
-    applicableTo(input);
-
-    // Verify that the input Coder<KV<K, V>> is a KvCoder<K, V>, and that
-    // the key coder is deterministic.
-    Coder<K> keyCoder = getKeyCoder(input.getCoder());
-    try {
-      keyCoder.verifyDeterministic();
-    } catch (NonDeterministicException e) {
-      throw new IllegalStateException(
-          "the keyCoder of a GroupByKey must be deterministic", e);
-    }
-  }
-
-  public WindowingStrategy<?, ?> updateWindowingStrategy(WindowingStrategy<?, ?> inputStrategy) {
-    WindowFn<?, ?> inputWindowFn = inputStrategy.getWindowFn();
-    if (!inputWindowFn.isNonMerging()) {
-      // Prevent merging windows again, without explicit user
-      // involvement, e.g., by Window.into() or Window.remerge().
-      inputWindowFn = new InvalidWindows<>(
-          "WindowFn has already been consumed by previous GroupByKey", inputWindowFn);
-    }
-
-    // We also switch to the continuation trigger associated with the current trigger.
-    return inputStrategy
-        .withWindowFn(inputWindowFn)
-        .withTrigger(inputStrategy.getTrigger().getSpec().getContinuationTrigger());
-  }
-
-  @Override
-  public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
-    // This operation groups by the combination of key and window,
-    // merging windows as needed, using the windows assigned to the
-    // key/value input elements and the window merge operation of the
-    // window function associated with the input PCollection.
-    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-
-    // By default, implement GroupByKey[AndWindow] via a series of lower-level
-    // operations.
-    return input
-        // Make each input element's timestamp and assigned windows
-        // explicit, in the value part.
-        .apply(new ReifyTimestampsAndWindows<K, V>())
-
-        // Group by just the key.
-        // Combiner lifting will not happen regardless of the disallowCombinerLifting value.
-        // There will be no combiners right after the GroupByKeyOnly because of the two ParDos
-        // introduced in here.
-        .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
-
-        // Sort each key's values by timestamp. GroupAlsoByWindow requires
-        // its input to be sorted by timestamp.
-        .apply(new SortValuesByTimestamp<K, V>())
-
-        // Group each key's values by window, merging windows as needed.
-        .apply(new GroupAlsoByWindow<K, V>(windowingStrategy))
-
-        // And update the windowing strategy as appropriate.
-        .setWindowingStrategyInternal(updateWindowingStrategy(windowingStrategy));
-  }
-
-  @Override
-  protected Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
-    return getOutputKvCoder(input.getCoder());
-  }
-
-  /**
-   * Returns the {@code Coder} of the input to this transform, which
-   * should be a {@code KvCoder}.
-   */
-  @SuppressWarnings("unchecked")
-  static <K, V> KvCoder<K, V> getInputKvCoder(Coder<KV<K, V>> inputCoder) {
-    if (!(inputCoder instanceof KvCoder)) {
-      throw new IllegalStateException(
-          "GroupByKey requires its input to use KvCoder");
-    }
-    return (KvCoder<K, V>) inputCoder;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Returns the {@code Coder} of the keys of the input to this
-   * transform, which is also used as the {@code Coder} of the keys of
-   * the output of this transform.
-   */
-  static <K, V> Coder<K> getKeyCoder(Coder<KV<K, V>> inputCoder) {
-    return getInputKvCoder(inputCoder).getKeyCoder();
-  }
-
-  /**
-   * Returns the {@code Coder} of the values of the input to this transform.
-   */
-  public static <K, V> Coder<V> getInputValueCoder(Coder<KV<K, V>> inputCoder) {
-    return getInputKvCoder(inputCoder).getValueCoder();
-  }
-
-  /**
-   * Returns the {@code Coder} of the {@code Iterable} values of the
-   * output of this transform.
-   */
-  static <K, V> Coder<Iterable<V>> getOutputValueCoder(Coder<KV<K, V>> inputCoder) {
-    return IterableCoder.of(getInputValueCoder(inputCoder));
-  }
-
-  /**
-   * Returns the {@code Coder} of the output of this transform.
-   */
-  static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, V>> inputCoder) {
-    return KvCoder.of(getKeyCoder(inputCoder), getOutputValueCoder(inputCoder));
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Helper transform that makes timestamps and window assignments
-   * explicit in the value part of each key/value pair.
-   */
-  public static class ReifyTimestampsAndWindows<K, V>
-      extends PTransform<PCollection<KV<K, V>>,
-                         PCollection<KV<K, WindowedValue<V>>>> {
-    @Override
-    public PCollection<KV<K, WindowedValue<V>>> apply(
-        PCollection<KV<K, V>> input) {
-      @SuppressWarnings("unchecked")
-      KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder();
-      Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-      Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
-      Coder<WindowedValue<V>> outputValueCoder = FullWindowedValueCoder.of(
-          inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
-      Coder<KV<K, WindowedValue<V>>> outputKvCoder =
-          KvCoder.of(keyCoder, outputValueCoder);
-      return input.apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>()))
-          .setCoder(outputKvCoder);
-    }
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Helper transform that sorts the values associated with each key
-   * by timestamp.
-   */
-  public static class SortValuesByTimestamp<K, V>
-      extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>,
-                         PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
-    @Override
-    public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
-        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
-      return input.apply(ParDo.of(
-          new DoFn<KV<K, Iterable<WindowedValue<V>>>,
-                   KV<K, Iterable<WindowedValue<V>>>>() {
-            @Override
-            public void processElement(ProcessContext c) {
-              KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
-              K key = kvs.getKey();
-              Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
-              List<WindowedValue<V>> sortedValues = new ArrayList<>();
-              for (WindowedValue<V> value : unsortedValues) {
-                sortedValues.add(value);
-              }
-              Collections.sort(sortedValues,
-                               new Comparator<WindowedValue<V>>() {
-                  @Override
-                  public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
-                    return e1.getTimestamp().compareTo(e2.getTimestamp());
-                  }
-                });
-              c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues));
-            }}))
-          .setCoder(input.getCoder());
-    }
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Helper transform that takes a collection of timestamp-ordered
-   * values associated with each key, groups the values by window,
-   * combines windows as needed, and for each window in each key,
-   * outputs a collection of key/value-list pairs implicitly assigned
-   * to the window and with the timestamp derived from that window.
-   */
-  public static class GroupAlsoByWindow<K, V>
-      extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>,
-                         PCollection<KV<K, Iterable<V>>>> {
-    private final WindowingStrategy<?, ?> windowingStrategy;
-
-    public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
-      this.windowingStrategy = windowingStrategy;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public PCollection<KV<K, Iterable<V>>> apply(
-        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
-      @SuppressWarnings("unchecked")
-      KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
-          (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
-
-      Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-      Coder<Iterable<WindowedValue<V>>> inputValueCoder =
-          inputKvCoder.getValueCoder();
-
-      IterableCoder<WindowedValue<V>> inputIterableValueCoder =
-          (IterableCoder<WindowedValue<V>>) inputValueCoder;
-      Coder<WindowedValue<V>> inputIterableElementCoder =
-          inputIterableValueCoder.getElemCoder();
-      WindowedValueCoder<V> inputIterableWindowedValueCoder =
-          (WindowedValueCoder<V>) inputIterableElementCoder;
-
-      Coder<V> inputIterableElementValueCoder =
-          inputIterableWindowedValueCoder.getValueCoder();
-      Coder<Iterable<V>> outputValueCoder =
-          IterableCoder.of(inputIterableElementValueCoder);
-      Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
-
-      return input
-          .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder)))
-          .setCoder(outputKvCoder);
-    }
-
-    private <W extends BoundedWindow> GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>
-        groupAlsoByWindowsFn(
-            WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) {
-      return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
-          strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
-    }
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Primitive helper transform that groups by key only, ignoring any
-   * window assignments.
-   */
-  public static class GroupByKeyOnly<K, V>
-      extends PTransform<PCollection<KV<K, V>>,
-                         PCollection<KV<K, Iterable<V>>>> {
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    @Override
-    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
-      return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
-    }
-
-    /**
-     * Returns the {@code Coder} of the input to this transform, which
-     * should be a {@code KvCoder}.
-     */
-    @SuppressWarnings("unchecked")
-    KvCoder<K, V> getInputKvCoder(Coder<KV<K, V>> inputCoder) {
-      if (!(inputCoder instanceof KvCoder)) {
-        throw new IllegalStateException(
-            "GroupByKey requires its input to use KvCoder");
-      }
-      return (KvCoder<K, V>) inputCoder;
-    }
-
-    @Override
-    protected Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
-      return GroupByKey.getOutputKvCoder(input.getCoder());
-    }
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  static {
-    registerWithDirectPipelineRunner();
-  }
-
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  private static <K, V> void registerWithDirectPipelineRunner() {
-    DirectPipelineRunner.registerDefaultTransformEvaluator(
-        GroupByKeyOnly.class,
-        new DirectPipelineRunner.TransformEvaluator<GroupByKeyOnly>() {
-          @Override
-          public void evaluate(
-              GroupByKeyOnly transform,
-              DirectPipelineRunner.EvaluationContext context) {
-            evaluateHelper(transform, context);
-          }
-        });
-  }
-
-  private static <K, V> void evaluateHelper(
-      GroupByKeyOnly<K, V> transform,
-      DirectPipelineRunner.EvaluationContext context) {
-    PCollection<KV<K, V>> input = context.getInput(transform);
-
-    List<ValueWithMetadata<KV<K, V>>> inputElems =
-        context.getPCollectionValuesWithMetadata(input);
-
-    Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder());
-
-    Map<GroupingKey<K>, List<V>> groupingMap = new HashMap<>();
-
-    for (ValueWithMetadata<KV<K, V>> elem : inputElems) {
-      K key = elem.getValue().getKey();
-      V value = elem.getValue().getValue();
-      byte[] encodedKey;
-      try {
-        encodedKey = encodeToByteArray(keyCoder, key);
-      } catch (CoderException exn) {
-        // TODO: Put in better element printing:
-        // truncate if too long.
-        throw new IllegalArgumentException(
-            "unable to encode key " + key + " of input to " + transform +
-            " using " + keyCoder,
-            exn);
-      }
-      GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
-      List<V> values = groupingMap.get(groupingKey);
-      if (values == null) {
-        values = new ArrayList<V>();
-        groupingMap.put(groupingKey, values);
-      }
-      values.add(value);
-    }
-
-    List<ValueWithMetadata<KV<K, Iterable<V>>>> outputElems =
-        new ArrayList<>();
-    for (Map.Entry<GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) {
-      GroupingKey<K> groupingKey = entry.getKey();
-      K key = groupingKey.getKey();
-      List<V> values = entry.getValue();
-      values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */);
-      outputElems.add(ValueWithMetadata
-                      .of(WindowedValue.valueInEmptyWindows(KV.<K, Iterable<V>>of(key, values)))
-                      .withKey(key));
-    }
-
-    context.setPCollectionValuesWithMetadata(context.getOutput(transform),
-                                             outputElems);
-  }
-
-  private static class GroupingKey<K> {
-    private K key;
-    private byte[] encodedKey;
-
-    public GroupingKey(K key, byte[] encodedKey) {
-      this.key = key;
-      this.encodedKey = encodedKey;
-    }
-
-    public K getKey() {
-      return key;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof GroupingKey) {
-        GroupingKey<?> that = (GroupingKey<?>) o;
-        return Arrays.equals(this.encodedKey, that.encodedKey);
-      } else {
-        return false;
-      }
-    }
-
-    @Override
-    public int hashCode() {
-      return Arrays.hashCode(encodedKey);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/IntraBundleParallelization.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/IntraBundleParallelization.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/IntraBundleParallelization.java
deleted file mode 100644
index b6497b7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/IntraBundleParallelization.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.options.GcsOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.WindowingInternals;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-
-import org.joda.time.Instant;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Provides multi-threading of {@link DoFn}s, using threaded execution to
- * process multiple elements concurrently within a bundle.
- *
- * <p>Note, that each Dataflow worker will already process multiple bundles
- * concurrently and usage of this class is meant only for cases where processing
- * elements from within a bundle is limited by blocking calls.
- *
- * <p>CPU intensive or IO intensive tasks are in general a poor fit for parallelization.
- * This is because a limited resource that is already maximally utilized does not
- * benefit from sub-division of work. The parallelization will increase the amount of time
- * to process each element yet the throughput for processing will remain relatively the same.
- * For example, if the local disk (an IO resource) has a maximum write rate of 10 MiB/s,
- * and processing each element requires to write 20 MiBs to disk, then processing one element
- * to disk will take 2 seconds. Yet processing 3 elements concurrently (each getting an equal
- * share of the maximum write rate) will take at least 6 seconds to complete (there is additional
- * overhead in the extra parallelization).
- *
- * <p>To parallelize a {@link DoFn} to 10 threads:
- * <pre>{@code
- * PCollection<T> data = ...;
- * data.apply(
- *   IntraBundleParallelization.of(new MyDoFn())
- *                             .withMaxParallelism(10)));
- * }</pre>
- *
- * <p>An uncaught exception from the wrapped {@link DoFn} will result in the exception
- * being rethrown in later calls to {@link MultiThreadedIntraBundleProcessingDoFn#processElement}
- * or a call to {@link MultiThreadedIntraBundleProcessingDoFn#finishBundle}.
- */
-public class IntraBundleParallelization {
-  /**
-   * Creates a {@link IntraBundleParallelization} {@link PTransform} for the given
-   * {@link DoFn} that processes elements using multiple threads.
-   *
-   * <p>Note that the specified {@code doFn} needs to be thread safe.
-   */
-  public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> doFn) {
-    return new Unbound().of(doFn);
-  }
-
-  /**
-   * Creates a {@link IntraBundleParallelization} {@link PTransform} with the specified
-   * maximum concurrency level.
-   */
-  public static Unbound withMaxParallelism(int maxParallelism) {
-    return new Unbound().withMaxParallelism(maxParallelism);
-  }
-
-  /**
-   * An incomplete {@code IntraBundleParallelization} transform, with unbound input/output types.
-   *
-   * <p>Before being applied, {@link IntraBundleParallelization.Unbound#of} must be
-   * invoked to specify the {@link DoFn} to invoke, which will also
-   * bind the input/output types of this {@code PTransform}.
-   */
-  public static class Unbound {
-    private final int maxParallelism;
-
-    Unbound() {
-      this(DEFAULT_MAX_PARALLELISM);
-    }
-
-    Unbound(int maxParallelism) {
-      Preconditions.checkArgument(maxParallelism > 0,
-          "Expected parallelism factor greater than zero, received %s.", maxParallelism);
-      this.maxParallelism = maxParallelism;
-    }
-
-    /**
-     * Returns a new {@link IntraBundleParallelization} {@link PTransform} like this one
-     * with the specified maximum concurrency level.
-     */
-    public Unbound withMaxParallelism(int maxParallelism) {
-      return new Unbound(maxParallelism);
-    }
-
-    /**
-     * Returns a new {@link IntraBundleParallelization} {@link PTransform} like this one
-     * with the specified {@link DoFn}.
-     *
-     * <p>Note that the specified {@code doFn} needs to be thread safe.
-     */
-    public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> doFn) {
-      return new Bound<>(doFn, maxParallelism);
-    }
-  }
-
-  /**
-   * A {@code PTransform} that, when applied to a {@code PCollection<InputT>},
-   * invokes a user-specified {@code DoFn<InputT, OutputT>} on all its elements,
-   * with all its outputs collected into an output
-   * {@code PCollection<OutputT>}.
-   *
-   * <p>Note that the specified {@code doFn} needs to be thread safe.
-   *
-   * @param <InputT> the type of the (main) input {@code PCollection} elements
-   * @param <OutputT> the type of the (main) output {@code PCollection} elements
-   */
-  public static class Bound<InputT, OutputT>
-      extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
-    private final DoFn<InputT, OutputT> doFn;
-    private final int maxParallelism;
-
-    Bound(DoFn<InputT, OutputT> doFn, int maxParallelism) {
-      Preconditions.checkArgument(maxParallelism > 0,
-          "Expected parallelism factor greater than zero, received %s.", maxParallelism);
-      this.doFn = doFn;
-      this.maxParallelism = maxParallelism;
-    }
-
-    /**
-     * Returns a new {@link IntraBundleParallelization} {@link PTransform} like this one
-     * with the specified maximum concurrency level.
-     */
-    public Bound<InputT, OutputT> withMaxParallelism(int maxParallelism) {
-      return new Bound<>(doFn, maxParallelism);
-    }
-
-    /**
-     * Returns a new {@link IntraBundleParallelization} {@link PTransform} like this one
-     * with the specified {@link DoFn}.
-     *
-     * <p>Note that the specified {@code doFn} needs to be thread safe.
-     */
-    public <NewInputT, NewOutputT> Bound<NewInputT, NewOutputT>
-        of(DoFn<NewInputT, NewOutputT> doFn) {
-      return new Bound<>(doFn, maxParallelism);
-    }
-
-    @Override
-    public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
-      return input.apply(
-          ParDo.of(new MultiThreadedIntraBundleProcessingDoFn<>(doFn, maxParallelism)));
-    }
-  }
-
-  /**
-   * A multi-threaded {@code DoFn} wrapper.
-   *
-   * @see IntraBundleParallelization#of(DoFn)
-   *
-   * @param <InputT> the type of the (main) input elements
-   * @param <OutputT> the type of the (main) output elements
-   */
-  public static class MultiThreadedIntraBundleProcessingDoFn<InputT, OutputT>
-      extends DoFn<InputT, OutputT> {
-
-    public MultiThreadedIntraBundleProcessingDoFn(DoFn<InputT, OutputT> doFn, int maxParallelism) {
-      Preconditions.checkArgument(maxParallelism > 0,
-          "Expected parallelism factor greater than zero, received %s.", maxParallelism);
-      this.doFn = doFn;
-      this.maxParallelism = maxParallelism;
-    }
-
-    @Override
-    public void startBundle(Context c) throws Exception {
-      doFn.startBundle(c);
-
-      executor = c.getPipelineOptions().as(GcsOptions.class).getExecutorService();
-      workTickets = new Semaphore(maxParallelism);
-      failure = new AtomicReference<>();
-    }
-
-    @Override
-    public void processElement(final ProcessContext c) throws Exception {
-      try {
-        workTickets.acquire();
-      } catch (InterruptedException e) {
-        throw new RuntimeException("Interrupted while scheduling work", e);
-      }
-
-      if (failure.get() != null) {
-        throw Throwables.propagate(failure.get());
-      }
-
-      executor.submit(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            doFn.processElement(new WrappedContext(c));
-          } catch (Throwable t) {
-            failure.compareAndSet(null, t);
-            Throwables.propagateIfPossible(t);
-            throw new AssertionError("Unexpected checked exception: " + t);
-          } finally {
-            workTickets.release();
-          }
-        }
-      });
-    }
-
-    @Override
-    public void finishBundle(Context c) throws Exception {
-      // Acquire all the work tickets to guarantee that all the previous
-      // processElement calls have finished.
-      workTickets.acquire(maxParallelism);
-      if (failure.get() != null) {
-        throw Throwables.propagate(failure.get());
-      }
-      doFn.finishBundle(c);
-    }
-
-    @Override
-    protected TypeDescriptor<InputT> getInputTypeDescriptor() {
-      return doFn.getInputTypeDescriptor();
-    }
-
-    @Override
-    protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-      return doFn.getOutputTypeDescriptor();
-    }
-
-    /////////////////////////////////////////////////////////////////////////////
-
-    /**
-     * Wraps a DoFn context, forcing single-thread output so that threads don't
-     * propagate through to downstream functions.
-     */
-    private class WrappedContext extends ProcessContext {
-      private final ProcessContext context;
-
-      WrappedContext(ProcessContext context) {
-        this.context = context;
-      }
-
-      @Override
-      public InputT element() {
-        return context.element();
-      }
-
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return context.getPipelineOptions();
-      }
-
-      @Override
-      public <T> T sideInput(PCollectionView<T> view) {
-        return context.sideInput(view);
-      }
-
-      @Override
-      public void output(OutputT output) {
-        synchronized (MultiThreadedIntraBundleProcessingDoFn.this) {
-          context.output(output);
-        }
-      }
-
-      @Override
-      public void outputWithTimestamp(OutputT output, Instant timestamp) {
-        synchronized (MultiThreadedIntraBundleProcessingDoFn.this) {
-          context.outputWithTimestamp(output, timestamp);
-        }
-      }
-
-      @Override
-      public <T> void sideOutput(TupleTag<T> tag, T output) {
-        synchronized (MultiThreadedIntraBundleProcessingDoFn.this) {
-          context.sideOutput(tag, output);
-        }
-      }
-
-      @Override
-      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-        synchronized (MultiThreadedIntraBundleProcessingDoFn.this) {
-          context.sideOutputWithTimestamp(tag, output, timestamp);
-        }
-      }
-
-      @Override
-      public Instant timestamp() {
-        return context.timestamp();
-      }
-
-      @Override
-      public BoundedWindow window() {
-        return context.window();
-      }
-
-      @Override
-      public PaneInfo pane() {
-        return context.pane();
-      }
-
-      @Override
-      public WindowingInternals<InputT, OutputT> windowingInternals() {
-        return context.windowingInternals();
-      }
-
-      @Override
-      protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-          String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-        return context.createAggregatorInternal(name, combiner);
-      }
-    }
-
-    private final DoFn<InputT, OutputT> doFn;
-    private int maxParallelism;
-
-    private transient ExecutorService executor;
-    private transient Semaphore workTickets;
-    private transient AtomicReference<Throwable> failure;
-  }
-
-  /**
-   * Default maximum for number of concurrent elements to process.
-   */
-  private static final int DEFAULT_MAX_PARALLELISM = 16;
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Keys.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Keys.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Keys.java
deleted file mode 100644
index 370d43d..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Keys.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * {@code Keys<K>} takes a {@code PCollection} of {@code KV<K, V>}s and
- * returns a {@code PCollection<K>} of the keys.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<KV<String, Long>> wordCounts = ...;
- * PCollection<String> words = wordCounts.apply(Keys.<String>create());
- * } </pre>
- *
- * <p>Each output element has the same timestamp and is in the same windows
- * as its corresponding input element, and the output {@code PCollection}
- * has the same
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
- * associated with it as the input.
- *
- * <p>See also {@link Values}.
- *
- * @param <K> the type of the keys in the input {@code PCollection},
- * and the type of the elements in the output {@code PCollection}
- */
-public class Keys<K> extends PTransform<PCollection<? extends KV<K, ?>>,
-                                        PCollection<K>> {
-  /**
-   * Returns a {@code Keys<K>} {@code PTransform}.
-   *
-   * @param <K> the type of the keys in the input {@code PCollection},
-   * and the type of the elements in the output {@code PCollection}
-   */
-  public static <K> Keys<K> create() {
-    return new Keys<>();
-  }
-
-  private Keys() { }
-
-  @Override
-  public PCollection<K> apply(PCollection<? extends KV<K, ?>> in) {
-    return
-        in.apply(ParDo.named("Keys")
-                 .of(new DoFn<KV<K, ?>, K>() {
-                     @Override
-                     public void processElement(ProcessContext c) {
-                       c.output(c.element().getKey());
-                     }
-                    }));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/KvSwap.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/KvSwap.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/KvSwap.java
deleted file mode 100644
index 5a9cc87..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/KvSwap.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms;
-
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * {@code KvSwap<K, V>} takes a {@code PCollection<KV<K, V>>} and
- * returns a {@code PCollection<KV<V, K>>}, where all the keys and
- * values have been swapped.
- *
- * <p>Example of use:
- * <pre> {@code
- * PCollection<String, Long> wordsToCounts = ...;
- * PCollection<Long, String> countsToWords =
- *     wordToCounts.apply(KvSwap.<String, Long>create());
- * } </pre>
- *
- * <p>Each output element has the same timestamp and is in the same windows
- * as its corresponding input element, and the output {@code PCollection}
- * has the same
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn}
- * associated with it as the input.
- *
- * @param <K> the type of the keys in the input {@code PCollection}
- * and the values in the output {@code PCollection}
- * @param <V> the type of the values in the input {@code PCollection}
- * and the keys in the output {@code PCollection}
- */
-public class KvSwap<K, V> extends PTransform<PCollection<KV<K, V>>,
-                                             PCollection<KV<V, K>>> {
-  /**
-   * Returns a {@code KvSwap<K, V>} {@code PTransform}.
-   *
-   * @param <K> the type of the keys in the input {@code PCollection}
-   * and the values in the output {@code PCollection}
-   * @param <V> the type of the values in the input {@code PCollection}
-   * and the keys in the output {@code PCollection}
-   */
-  public static <K, V> KvSwap<K, V> create() {
-    return new KvSwap<>();
-  }
-
-  private KvSwap() { }
-
-  @Override
-  public PCollection<KV<V, K>> apply(PCollection<KV<K, V>> in) {
-    return
-        in.apply(ParDo.named("KvSwap")
-                 .of(new DoFn<KV<K, V>, KV<V, K>>() {
-                     @Override
-                     public void processElement(ProcessContext c) {
-                       KV<K, V> e = c.element();
-                       c.output(KV.of(e.getValue(), e.getKey()));
-                     }
-                    }));
-  }
-}