You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/03/24 03:47:42 UTC

[18/67] [partial] incubator-beam git commit: Directory reorganization

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/CoGbkResult.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/CoGbkResult.java
deleted file mode 100644
index aac57bc..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/CoGbkResult.java
+++ /dev/null
@@ -1,463 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.join;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addObject;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.coders.StandardCoder;
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.common.Reiterator;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TupleTagList;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * A row result of a {@link CoGroupByKey}.  This is a tuple of {@link Iterable}s produced for
- * a given key, and these can be accessed in different ways.
- */
-public class CoGbkResult {
-  /**
-   * A map of integer union tags to a list of union objects.
-   * Note: the key and the embedded union tag are the same, so it is redundant
-   * to store it multiple times, but for now it makes encoding easier.
-   */
-  private final List<Iterable<?>> valueMap;
-
-  private final CoGbkResultSchema schema;
-
-  private static final int DEFAULT_IN_MEMORY_ELEMENT_COUNT = 10_000;
-
-  private static final Logger LOG = LoggerFactory.getLogger(CoGbkResult.class);
-
-  /**
-   * A row in the {@link PCollection} resulting from a {@link CoGroupByKey} transform.
-   * Currently, this row must fit into memory.
-   *
-   * @param schema the set of tuple tags used to refer to input tables and
-   *               result values
-   * @param taggedValues the raw results from a group-by-key
-   */
-  public CoGbkResult(
-      CoGbkResultSchema schema,
-      Iterable<RawUnionValue> taggedValues) {
-    this(schema, taggedValues, DEFAULT_IN_MEMORY_ELEMENT_COUNT);
-  }
-
-  @SuppressWarnings("unchecked")
-  public CoGbkResult(
-      CoGbkResultSchema schema,
-      Iterable<RawUnionValue> taggedValues,
-      int inMemoryElementCount) {
-    this.schema = schema;
-    valueMap = new ArrayList<>();
-    for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
-      valueMap.add(new ArrayList<>());
-    }
-
-    // Demultiplex the first imMemoryElementCount tagged union values
-    // according to their tag.
-    final Iterator<RawUnionValue> taggedIter = taggedValues.iterator();
-    int elementCount = 0;
-    while (taggedIter.hasNext()) {
-      if (elementCount++ >= inMemoryElementCount && taggedIter instanceof Reiterator) {
-        // Let the tails be lazy.
-        break;
-      }
-      RawUnionValue value = taggedIter.next();
-      // Make sure the given union tag has a corresponding tuple tag in the
-      // schema.
-      int unionTag = value.getUnionTag();
-      if (schema.size() <= unionTag) {
-        throw new IllegalStateException("union tag " + unionTag +
-            " has no corresponding tuple tag in the result schema");
-      }
-      List<Object> valueList = (List<Object>) valueMap.get(unionTag);
-      valueList.add(value.getValue());
-    }
-
-    if (taggedIter.hasNext()) {
-      // If we get here, there were more elements than we can afford to
-      // keep in memory, so we copy the re-iterable of remaining items
-      // and append filtered views to each of the sorted lists computed earlier.
-      LOG.info("CoGbkResult has more than " + inMemoryElementCount + " elements,"
-               + " reiteration (which may be slow) is required.");
-      final Reiterator<RawUnionValue> tail = (Reiterator<RawUnionValue>) taggedIter;
-      // This is a trinary-state array recording whether a given tag is present in the tail. The
-      // initial value is null (unknown) for all tags, and the first iteration through the entire
-      // list will set these values to true or false to avoid needlessly iterating if filtering
-      // against a given tag would not match anything.
-      final Boolean[] containsTag = new Boolean[schema.size()];
-      for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
-        final int unionTag0 = unionTag;
-        updateUnionTag(tail, containsTag, unionTag, unionTag0);
-      }
-    }
-  }
-
-  private <T> void updateUnionTag(
-      final Reiterator<RawUnionValue> tail, final Boolean[] containsTag,
-      int unionTag, final int unionTag0) {
-    @SuppressWarnings("unchecked")
-    final Iterable<T> head = (Iterable<T>) valueMap.get(unionTag);
-    valueMap.set(
-        unionTag,
-        new Iterable<T>() {
-          @Override
-          public Iterator<T> iterator() {
-            return Iterators.concat(
-                head.iterator(),
-                new UnionValueIterator<T>(unionTag0, tail.copy(), containsTag));
-          }
-        });
-  }
-
-  public boolean isEmpty() {
-    for (Iterable<?> tagValues : valueMap) {
-      if (tagValues.iterator().hasNext()) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Returns the schema used by this {@link CoGbkResult}.
-   */
-  public CoGbkResultSchema getSchema() {
-    return schema;
-  }
-
-  @Override
-  public String toString() {
-    return valueMap.toString();
-  }
-
-  /**
-   * Returns the values from the table represented by the given
-   * {@code TupleTag<V>} as an {@code Iterable<V>} (which may be empty if there
-   * are no results).
-   *
-   * <p>If tag was not part of the original {@link CoGroupByKey},
-   * throws an IllegalArgumentException.
-   */
-  public <V> Iterable<V> getAll(TupleTag<V> tag) {
-    int index = schema.getIndex(tag);
-    if (index < 0) {
-      throw new IllegalArgumentException("TupleTag " + tag +
-          " is not in the schema");
-    }
-    @SuppressWarnings("unchecked")
-    Iterable<V> unions = (Iterable<V>) valueMap.get(index);
-    return unions;
-  }
-
-  /**
-   * If there is a singleton value for the given tag, returns it.
-   * Otherwise, throws an IllegalArgumentException.
-   *
-   * <p>If tag was not part of the original {@link CoGroupByKey},
-   * throws an IllegalArgumentException.
-   */
-  public <V> V getOnly(TupleTag<V> tag) {
-    return innerGetOnly(tag, null, false);
-  }
-
-  /**
-   * If there is a singleton value for the given tag, returns it.  If there is
-   * no value for the given tag, returns the defaultValue.
-   *
-   * <p>If tag was not part of the original {@link CoGroupByKey},
-   * throws an IllegalArgumentException.
-   */
-  public <V> V getOnly(TupleTag<V> tag, V defaultValue) {
-    return innerGetOnly(tag, defaultValue, true);
-  }
-
-  /**
-   * A {@link Coder} for {@link CoGbkResult}s.
-   */
-  public static class CoGbkResultCoder extends StandardCoder<CoGbkResult> {
-
-    private final CoGbkResultSchema schema;
-    private final UnionCoder unionCoder;
-
-    /**
-     * Returns a {@link CoGbkResultCoder} for the given schema and {@link UnionCoder}.
-     */
-    public static CoGbkResultCoder of(
-        CoGbkResultSchema schema,
-        UnionCoder unionCoder) {
-      return new CoGbkResultCoder(schema, unionCoder);
-    }
-
-    @JsonCreator
-    public static CoGbkResultCoder of(
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-        List<Coder<?>> components,
-        @JsonProperty(PropertyNames.CO_GBK_RESULT_SCHEMA) CoGbkResultSchema schema) {
-      Preconditions.checkArgument(components.size() == 1,
-          "Expecting 1 component, got " + components.size());
-      return new CoGbkResultCoder(schema, (UnionCoder) components.get(0));
-    }
-
-    private CoGbkResultCoder(
-        CoGbkResultSchema tupleTags,
-        UnionCoder unionCoder) {
-      this.schema = tupleTags;
-      this.unionCoder = unionCoder;
-    }
-
-
-    @Override
-    public List<? extends Coder<?>> getCoderArguments() {
-      return null;
-    }
-
-    @Override
-    public List<? extends Coder<?>> getComponents() {
-      return Arrays.<Coder<?>>asList(unionCoder);
-    }
-
-    @Override
-    public CloudObject asCloudObject() {
-      CloudObject result = super.asCloudObject();
-      addObject(result, PropertyNames.CO_GBK_RESULT_SCHEMA, schema.asCloudObject());
-      return result;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void encode(
-        CoGbkResult value,
-        OutputStream outStream,
-        Context context) throws CoderException,
-        IOException {
-      if (!schema.equals(value.getSchema())) {
-        throw new CoderException("input schema does not match coder schema");
-      }
-      for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
-        tagListCoder(unionTag).encode(value.valueMap.get(unionTag), outStream, Context.NESTED);
-      }
-    }
-
-    @Override
-    public CoGbkResult decode(
-        InputStream inStream,
-        Context context)
-        throws CoderException, IOException {
-      List<Iterable<?>> valueMap = new ArrayList<>();
-      for (int unionTag = 0; unionTag < schema.size(); unionTag++) {
-        valueMap.add(tagListCoder(unionTag).decode(inStream, Context.NESTED));
-      }
-      return new CoGbkResult(schema, valueMap);
-    }
-
-    @SuppressWarnings("rawtypes")
-    private IterableCoder tagListCoder(int unionTag) {
-      return IterableCoder.of(unionCoder.getComponents().get(unionTag));
-    }
-
-    @Override
-    public boolean equals(Object object) {
-      if (this == object) {
-        return true;
-      }
-      if (!(object instanceof CoGbkResultCoder)) {
-        return false;
-      }
-      CoGbkResultCoder other = (CoGbkResultCoder) object;
-      return schema.equals(other.schema) && unionCoder.equals(other.unionCoder);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(schema);
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      verifyDeterministic(
-          "CoGbkResult requires the union coder to be deterministic", unionCoder);
-    }
-  }
-
-
-  //////////////////////////////////////////////////////////////////////////////
-  // Methods for directly constructing a CoGbkResult
-  //
-  // (for example, creating test data for a transform that consumes a
-  // CoGbkResult)
-
-  /**
-   * Returns a new CoGbkResult that contains just the given tag and given data.
-   */
-  public static <V> CoGbkResult of(TupleTag<V> tag, List<V> data) {
-    return CoGbkResult.empty().and(tag, data);
-  }
-
-  /**
-   * Returns a new {@link CoGbkResult} based on this, with the given tag and given data
-   * added to it.
-   */
-  public <V> CoGbkResult and(TupleTag<V> tag, List<V> data) {
-    if (nextTestUnionId != schema.size()) {
-      throw new IllegalArgumentException(
-          "Attempting to call and() on a CoGbkResult apparently not created by"
-          + " of().");
-    }
-    List<Iterable<?>> valueMap = new ArrayList<>(this.valueMap);
-    valueMap.add(data);
-    return new CoGbkResult(
-        new CoGbkResultSchema(schema.getTupleTagList().and(tag)), valueMap,
-        nextTestUnionId + 1);
-  }
-
-  /**
-   * Returns an empty {@link CoGbkResult}.
-   */
-  public static <V> CoGbkResult empty() {
-    return new CoGbkResult(new CoGbkResultSchema(TupleTagList.empty()),
-        new ArrayList<Iterable<?>>());
-  }
-
-  //////////////////////////////////////////////////////////////////////////////
-
-  private int nextTestUnionId = 0;
-
-  private CoGbkResult(
-      CoGbkResultSchema schema,
-      List<Iterable<?>> valueMap,
-      int nextTestUnionId) {
-    this(schema, valueMap);
-    this.nextTestUnionId = nextTestUnionId;
-  }
-
-  private CoGbkResult(
-      CoGbkResultSchema schema,
-      List<Iterable<?>> valueMap) {
-    this.schema = schema;
-    this.valueMap = valueMap;
-  }
-
-  private <V> V innerGetOnly(
-      TupleTag<V> tag,
-      V defaultValue,
-      boolean useDefault) {
-    int index = schema.getIndex(tag);
-    if (index < 0) {
-      throw new IllegalArgumentException("TupleTag " + tag
-          + " is not in the schema");
-    }
-    @SuppressWarnings("unchecked")
-    Iterator<V> unions = (Iterator<V>) valueMap.get(index).iterator();
-    if (!unions.hasNext()) {
-      if (useDefault) {
-        return defaultValue;
-      } else {
-        throw new IllegalArgumentException("TupleTag " + tag
-            + " corresponds to an empty result, and no default was provided");
-      }
-    }
-    V value = unions.next();
-    if (unions.hasNext()) {
-      throw new IllegalArgumentException("TupleTag " + tag
-          + " corresponds to a non-singleton result");
-    }
-    return value;
-  }
-
-  /**
-   * Lazily filters and recasts an {@code Iterator<RawUnionValue>} into an
-   * {@code Iterator<V>}, where V is the type of the raw union value's contents.
-   */
-  private static class UnionValueIterator<V> implements Iterator<V> {
-
-    private final int tag;
-    private final PeekingIterator<RawUnionValue> unions;
-    private final Boolean[] containsTag;
-
-    private UnionValueIterator(int tag, Iterator<RawUnionValue> unions, Boolean[] containsTag) {
-      this.tag = tag;
-      this.unions = Iterators.peekingIterator(unions);
-      this.containsTag = containsTag;
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (containsTag[tag] == Boolean.FALSE) {
-        return false;
-      }
-      advance();
-      if (unions.hasNext()) {
-        return true;
-      } else {
-        // Now that we've iterated over all the values, we can resolve all the "unknown" null
-        // values to false.
-        for (int i = 0; i < containsTag.length; i++) {
-          if (containsTag[i] == null) {
-            containsTag[i] = false;
-          }
-        }
-        return false;
-      }
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public V next() {
-      advance();
-      return (V) unions.next().getValue();
-    }
-
-    private void advance() {
-      while (unions.hasNext()) {
-        int curTag = unions.peek().getUnionTag();
-        containsTag[curTag] = true;
-        if (curTag == tag) {
-          break;
-        }
-        unions.next();
-      }
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/CoGbkResultSchema.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/CoGbkResultSchema.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/CoGbkResultSchema.java
deleted file mode 100644
index 2860ba7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/CoGbkResultSchema.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.join;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addList;
-
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TupleTagList;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * A schema for the results of a {@link CoGroupByKey}.  This maintains the full
- * set of {@link TupleTag}s for the results of a {@link CoGroupByKey} and
- * facilitates mapping between {@link TupleTag}s and
- * {@link RawUnionValue} tags (which are used as secondary keys in the
- * {@link CoGroupByKey}).
- */
-public class CoGbkResultSchema implements Serializable {
-
-  private final TupleTagList tupleTagList;
-
-  @JsonCreator
-  public static CoGbkResultSchema of(
-      @JsonProperty(PropertyNames.TUPLE_TAGS) List<TupleTag<?>> tags) {
-    TupleTagList tupleTags = TupleTagList.empty();
-    for (TupleTag<?> tag : tags) {
-      tupleTags = tupleTags.and(tag);
-    }
-    return new CoGbkResultSchema(tupleTags);
-  }
-
-  /**
-   * Maps TupleTags to union tags.  This avoids needing to encode the tags
-   * themselves.
-   */
-  private final HashMap<TupleTag<?>, Integer> tagMap = new HashMap<>();
-
-  /**
-   * Builds a schema from a tuple of {@code TupleTag<?>}s.
-   */
-  public CoGbkResultSchema(TupleTagList tupleTagList) {
-    this.tupleTagList = tupleTagList;
-    int index = -1;
-    for (TupleTag<?> tag : tupleTagList.getAll()) {
-      index++;
-      tagMap.put(tag, index);
-    }
-  }
-
-  /**
-   * Returns the index for the given tuple tag, if the tag is present in this
-   * schema, -1 if it isn't.
-   */
-  public int getIndex(TupleTag<?> tag) {
-    Integer index = tagMap.get(tag);
-    return index == null ? -1 : index;
-  }
-
-  /**
-   * Returns the tuple tag at the given index.
-   */
-  public TupleTag<?> getTag(int index) {
-    return tupleTagList.get(index);
-  }
-
-  /**
-   * Returns the number of columns for this schema.
-   */
-  public int size() {
-    return tupleTagList.getAll().size();
-  }
-
-  /**
-   * Returns the TupleTagList tuple associated with this schema.
-   */
-  public TupleTagList getTupleTagList() {
-    return tupleTagList;
-  }
-
-  public CloudObject asCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
-    List<CloudObject> serializedTags = new ArrayList<>(tupleTagList.size());
-    for (TupleTag<?> tag : tupleTagList.getAll()) {
-      serializedTags.add(tag.asCloudObject());
-    }
-    addList(result, PropertyNames.TUPLE_TAGS, serializedTags);
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj == this) {
-      return true;
-    }
-    if (!(obj instanceof CoGbkResultSchema)) {
-      return false;
-    }
-    CoGbkResultSchema other = (CoGbkResultSchema) obj;
-    return tupleTagList.getAll().equals(other.tupleTagList.getAll());
-  }
-
-  @Override
-  public int hashCode() {
-    return tupleTagList.getAll().hashCode();
-  }
-
-  @Override
-  public String toString() {
-    return "CoGbkResultSchema: " + tupleTagList.getAll();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/CoGroupByKey.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/CoGroupByKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/CoGroupByKey.java
deleted file mode 100644
index b840682..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/CoGroupByKey.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.join;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
-import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple.TaggedKeyedPCollection;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A {@link PTransform} that performs a {@link CoGroupByKey} on a tuple
- * of tables.  A {@link CoGroupByKey} groups results from all
- * tables by like keys into {@link CoGbkResult}s,
- * from which the results for any specific table can be accessed by the
- * {@link com.google.cloud.dataflow.sdk.values.TupleTag}
- * supplied with the initial table.
- *
- * <p>Example of performing a {@link CoGroupByKey} followed by a
- * {@link ParDo} that consumes
- * the results:
- * <pre> {@code
- * PCollection<KV<K, V1>> pt1 = ...;
- * PCollection<KV<K, V2>> pt2 = ...;
- *
- * final TupleTag<V1> t1 = new TupleTag<>();
- * final TupleTag<V2> t2 = new TupleTag<>();
- * PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
- *   KeyedPCollectionTuple.of(t1, pt1)
- *                        .and(t2, pt2)
- *                        .apply(CoGroupByKey.<K>create());
- *
- * PCollection<T> finalResultCollection =
- *   coGbkResultCollection.apply(ParDo.of(
- *     new DoFn<KV<K, CoGbkResult>, T>() {
- *       @Override
- *       public void processElement(ProcessContext c) {
- *         KV<K, CoGbkResult> e = c.element();
- *         Iterable<V1> pt1Vals = e.getValue().getAll(t1);
- *         V2 pt2Val = e.getValue().getOnly(t2);
- *          ... Do Something ....
- *         c.output(...some T...);
- *       }
- *     }));
- * } </pre>
- *
- * @param <K> the type of the keys in the input and output
- * {@code PCollection}s
- */
-public class CoGroupByKey<K> extends
-    PTransform<KeyedPCollectionTuple<K>,
-               PCollection<KV<K, CoGbkResult>>> {
-  /**
-   * Returns a {@code CoGroupByKey<K>} {@code PTransform}.
-   *
-   * @param <K> the type of the keys in the input and output
-   * {@code PCollection}s
-   */
-  public static <K> CoGroupByKey<K> create() {
-    return new CoGroupByKey<>();
-  }
-
-  private CoGroupByKey() { }
-
-  @Override
-  public PCollection<KV<K, CoGbkResult>> apply(
-      KeyedPCollectionTuple<K> input) {
-    if (input.isEmpty()) {
-      throw new IllegalArgumentException(
-          "must have at least one input to a KeyedPCollections");
-    }
-
-    // First build the union coder.
-    // TODO: Look at better integration of union types with the
-    // schema specified in the input.
-    List<Coder<?>> codersList = new ArrayList<>();
-    for (TaggedKeyedPCollection<K, ?> entry : input.getKeyedCollections()) {
-      codersList.add(getValueCoder(entry.pCollection));
-    }
-    UnionCoder unionCoder = UnionCoder.of(codersList);
-    Coder<K> keyCoder = input.getKeyCoder();
-    KvCoder<K, RawUnionValue> kVCoder =
-        KvCoder.of(keyCoder, unionCoder);
-
-    PCollectionList<KV<K, RawUnionValue>> unionTables =
-        PCollectionList.empty(input.getPipeline());
-
-    // TODO: Use the schema to order the indices rather than depending
-    // on the fact that the schema ordering is identical to the ordering from
-    // input.getJoinCollections().
-    int index = -1;
-    for (TaggedKeyedPCollection<K, ?> entry : input.getKeyedCollections()) {
-      index++;
-      PCollection<KV<K, RawUnionValue>> unionTable =
-          makeUnionTable(index, entry.pCollection, kVCoder);
-      unionTables = unionTables.and(unionTable);
-    }
-
-    PCollection<KV<K, RawUnionValue>> flattenedTable =
-        unionTables.apply(Flatten.<KV<K, RawUnionValue>>pCollections());
-
-    PCollection<KV<K, Iterable<RawUnionValue>>> groupedTable =
-        flattenedTable.apply(GroupByKey.<K, RawUnionValue>create());
-
-    CoGbkResultSchema tupleTags = input.getCoGbkResultSchema();
-    PCollection<KV<K, CoGbkResult>> result = groupedTable.apply(
-        ParDo.of(new ConstructCoGbkResultFn<K>(tupleTags))
-          .named("ConstructCoGbkResultFn"));
-    result.setCoder(KvCoder.of(keyCoder,
-        CoGbkResultCoder.of(tupleTags, unionCoder)));
-
-    return result;
-  }
-
-  //////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Returns the value coder for the given PCollection.  Assumes that the value
-   * coder is an instance of {@code KvCoder<K, V>}.
-   */
-  private <V> Coder<V> getValueCoder(PCollection<KV<K, V>> pCollection) {
-    // Assumes that the PCollection uses a KvCoder.
-    Coder<?> entryCoder = pCollection.getCoder();
-    if (!(entryCoder instanceof KvCoder<?, ?>)) {
-      throw new IllegalArgumentException("PCollection does not use a KvCoder");
-    }
-    @SuppressWarnings("unchecked")
-    KvCoder<K, V> coder = (KvCoder<K, V>) entryCoder;
-    return coder.getValueCoder();
-  }
-
-  /**
-   * Returns a UnionTable for the given input PCollection, using the given
-   * union index and the given unionTableEncoder.
-   */
-  private <V> PCollection<KV<K, RawUnionValue>> makeUnionTable(
-      final int index,
-      PCollection<KV<K, V>> pCollection,
-      KvCoder<K, RawUnionValue> unionTableEncoder) {
-
-    return pCollection.apply(ParDo.of(
-        new ConstructUnionTableFn<K, V>(index)).named("MakeUnionTable" + index))
-                                               .setCoder(unionTableEncoder);
-  }
-
-  /**
-   * A DoFn to construct a UnionTable (i.e., a
-   * {@code PCollection<KV<K, RawUnionValue>>} from a
-   * {@code PCollection<KV<K, V>>}.
-   */
-  private static class ConstructUnionTableFn<K, V> extends
-      DoFn<KV<K, V>, KV<K, RawUnionValue>> {
-
-    private final int index;
-
-    public ConstructUnionTableFn(int index) {
-      this.index = index;
-    }
-
-    @Override
-    public void processElement(ProcessContext c) {
-      KV<K, ?> e = c.element();
-      c.output(KV.of(e.getKey(), new RawUnionValue(index, e.getValue())));
-    }
-  }
-
-  /**
-   * A DoFn to construct a CoGbkResult from an input grouped union
-   * table.
-    */
-  private static class ConstructCoGbkResultFn<K>
-    extends DoFn<KV<K, Iterable<RawUnionValue>>,
-                 KV<K, CoGbkResult>> {
-
-    private final CoGbkResultSchema schema;
-
-    public ConstructCoGbkResultFn(CoGbkResultSchema schema) {
-      this.schema = schema;
-    }
-
-    @Override
-    public void processElement(ProcessContext c) {
-      KV<K, Iterable<RawUnionValue>> e = c.element();
-      c.output(KV.of(e.getKey(), new CoGbkResult(schema, e.getValue())));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/KeyedPCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/KeyedPCollectionTuple.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/KeyedPCollectionTuple.java
deleted file mode 100644
index abfbe08..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/KeyedPCollectionTuple.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.join;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.cloud.dataflow.sdk.values.TupleTagList;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * An immutable tuple of keyed {@link PCollection PCollections}
- * with key type K.
- * ({@link PCollection PCollections} containing values of type
- * {@code KV<K, ?>})
- *
- * @param <K> the type of key shared by all constituent PCollections
- */
-public class KeyedPCollectionTuple<K> implements PInput {
-  /**
-   * Returns an empty {@code KeyedPCollectionTuple<K>} on the given pipeline.
-   */
-  public static <K> KeyedPCollectionTuple<K> empty(Pipeline pipeline) {
-    return new KeyedPCollectionTuple<>(pipeline);
-  }
-
-  /**
-   * Returns a new {@code KeyedPCollectionTuple<K>} with the given tag and initial
-   * PCollection.
-   */
-  public static <K, InputT> KeyedPCollectionTuple<K> of(
-      TupleTag<InputT> tag,
-      PCollection<KV<K, InputT>> pc) {
-    return new KeyedPCollectionTuple<K>(pc.getPipeline()).and(tag, pc);
-  }
-
-  /**
-   * Returns a new {@code KeyedPCollectionTuple<K>} that is the same as this,
-   * appended with the given PCollection.
-   */
-  public <V> KeyedPCollectionTuple<K> and(
-      TupleTag< V> tag,
-      PCollection<KV<K, V>> pc) {
-    if (pc.getPipeline() != getPipeline()) {
-      throw new IllegalArgumentException(
-          "PCollections come from different Pipelines");
-    }
-    TaggedKeyedPCollection<K, ?> wrapper =
-        new TaggedKeyedPCollection<>(tag, pc);
-    Coder<K> myKeyCoder = keyCoder == null ? getKeyCoder(pc) : keyCoder;
-    List<TaggedKeyedPCollection<K, ?>>
-      newKeyedCollections =
-        copyAddLast(
-            keyedCollections,
-            wrapper);
-    return new KeyedPCollectionTuple<>(
-        getPipeline(),
-        newKeyedCollections,
-        schema.getTupleTagList().and(tag),
-        myKeyCoder);
-  }
-
-  public boolean isEmpty() {
-    return keyedCollections.isEmpty();
-  }
-
-  /**
-   * Returns a list of {@link TaggedKeyedPCollection TaggedKeyedPCollections} for the
-   * {@link PCollection PCollections} contained in this {@link KeyedPCollectionTuple}.
-   */
-  public List<TaggedKeyedPCollection<K, ?>> getKeyedCollections() {
-    return keyedCollections;
-  }
-
-  /**
-   * Like {@link #apply(String, PTransform)} but defaulting to the name
-   * provided by the {@link PTransform}.
-   */
-  public <OutputT extends POutput> OutputT apply(
-      PTransform<KeyedPCollectionTuple<K>, OutputT> transform) {
-    return Pipeline.applyTransform(this, transform);
-  }
-
-  /**
-   * Applies the given {@link PTransform} to this input {@code KeyedPCollectionTuple} and returns
-   * its {@code OutputT}. This uses {@code name} to identify the specific application of
-   * the transform. This name is used in various places, including the monitoring UI,
-   * logging, and to stably identify this application node in the job graph.
-   */
-  public <OutputT extends POutput> OutputT apply(
-      String name, PTransform<KeyedPCollectionTuple<K>, OutputT> transform) {
-    return Pipeline.applyTransform(name, this, transform);
-  }
-
-  /**
-   * Expands the component {@link PCollection PCollections}, stripping off
-   * any tag-specific information.
-   */
-  @Override
-  public Collection<? extends PValue> expand() {
-    List<PCollection<?>> retval = new ArrayList<>();
-    for (TaggedKeyedPCollection<K, ?> taggedPCollection : keyedCollections) {
-      retval.add(taggedPCollection.pCollection);
-    }
-    return retval;
-  }
-
-  /**
-   * Returns the key {@link Coder} for all {@link PCollection PCollections}
-   * in this {@link KeyedPCollectionTuple}.
-   */
-  public Coder<K> getKeyCoder() {
-    if (keyCoder == null) {
-      throw new IllegalStateException("cannot return null keyCoder");
-    }
-    return keyCoder;
-  }
-
-  /**
-   * Returns the {@link CoGbkResultSchema} associated with this
-   * {@link KeyedPCollectionTuple}.
-   */
-  public CoGbkResultSchema getCoGbkResultSchema() {
-    return schema;
-  }
-
-  @Override
-  public Pipeline getPipeline() {
-    return pipeline;
-  }
-
-  @Override
-  public void finishSpecifying() {
-    for (TaggedKeyedPCollection<K, ?> taggedPCollection : keyedCollections) {
-      taggedPCollection.pCollection.finishSpecifying();
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * A utility class to help ensure coherence of tag and input PCollection
-   * types.
-   */
-  public static class TaggedKeyedPCollection<K, V> {
-
-    final TupleTag<V> tupleTag;
-    final PCollection<KV<K, V>> pCollection;
-
-    public TaggedKeyedPCollection(
-        TupleTag<V> tupleTag,
-        PCollection<KV<K, V>> pCollection) {
-      this.tupleTag = tupleTag;
-      this.pCollection = pCollection;
-    }
-
-    /**
-     * Returns the underlying PCollection of this TaggedKeyedPCollection.
-     */
-    public PCollection<KV<K, V>> getCollection() {
-      return pCollection;
-    }
-
-    /**
-     * Returns the TupleTag of this TaggedKeyedPCollection.
-     */
-    public TupleTag<V> getTupleTag() {
-      return tupleTag;
-    }
-  }
-
-  /**
-   * We use a List to properly track the order in which collections are added.
-   */
-  private final List<TaggedKeyedPCollection<K, ?>> keyedCollections;
-
-  private final Coder<K> keyCoder;
-
-  private final CoGbkResultSchema schema;
-
-  private final Pipeline pipeline;
-
-  KeyedPCollectionTuple(Pipeline pipeline) {
-    this(pipeline,
-         new ArrayList<TaggedKeyedPCollection<K, ?>>(),
-         TupleTagList.empty(),
-         null);
-  }
-
-  KeyedPCollectionTuple(
-      Pipeline pipeline,
-      List<TaggedKeyedPCollection<K, ?>> keyedCollections,
-      TupleTagList tupleTagList,
-      Coder<K> keyCoder) {
-    this.pipeline = pipeline;
-    this.keyedCollections = keyedCollections;
-    this.schema = new CoGbkResultSchema(tupleTagList);
-    this.keyCoder = keyCoder;
-  }
-
-  private static <K, V> Coder<K> getKeyCoder(PCollection<KV<K, V>> pc) {
-    // Need to run coder inference on this PCollection before inspecting it.
-    pc.finishSpecifying();
-
-    // Assumes that the PCollection uses a KvCoder.
-    Coder<?> entryCoder = pc.getCoder();
-    if (!(entryCoder instanceof KvCoder<?, ?>)) {
-      throw new IllegalArgumentException("PCollection does not use a KvCoder");
-    }
-    @SuppressWarnings("unchecked")
-    KvCoder<K, V> coder = (KvCoder<K, V>) entryCoder;
-    return coder.getKeyCoder();
-  }
-
-  private static <K> List<TaggedKeyedPCollection<K, ?>> copyAddLast(
-        List<TaggedKeyedPCollection<K, ?>> keyedCollections,
-        TaggedKeyedPCollection<K, ?> taggedCollection) {
-    List<TaggedKeyedPCollection<K, ?>> retval =
-        new ArrayList<>(keyedCollections);
-    retval.add(taggedCollection);
-    return retval;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/RawUnionValue.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/RawUnionValue.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/RawUnionValue.java
deleted file mode 100644
index 514853e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/RawUnionValue.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.join;
-
-// TODO: Think about making this a complete dynamic union by adding
-// a schema.  Type would then be defined by the corresponding schema entry.
-
-/**
- * This corresponds to an integer union tag and value.  The mapping of
- * union tag to type must come from elsewhere.
- */
-public class RawUnionValue {
-  private final int unionTag;
-  private final Object value;
-
-  /**
-   * Constructs a partial union from the given union tag and value.
-   */
-  public RawUnionValue(int unionTag, Object value) {
-    this.unionTag = unionTag;
-    this.value = value;
-  }
-
-  public int getUnionTag() {
-    return unionTag;
-  }
-
-  public Object getValue() {
-    return value;
-  }
-
-  @Override
-  public String toString() {
-    return unionTag + ":" + value;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/UnionCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/UnionCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/UnionCoder.java
deleted file mode 100644
index 2f1c2be..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/UnionCoder.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.join;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.StandardCoder;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.VarInt;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-
-/**
- * A UnionCoder encodes RawUnionValues.
- */
-class UnionCoder extends StandardCoder<RawUnionValue> {
-  // TODO: Think about how to integrate this with a schema object (i.e.
-  // a tuple of tuple tags).
-  /**
-   * Builds a union coder with the given list of element coders.  This list
-   * corresponds to a mapping of union tag to Coder.  Union tags start at 0.
-   */
-  public static UnionCoder of(List<Coder<?>> elementCoders) {
-    return new UnionCoder(elementCoders);
-  }
-
-  @JsonCreator
-  public static UnionCoder jsonOf(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> elements) {
-    return UnionCoder.of(elements);
-  }
-
-  private int getIndexForEncoding(RawUnionValue union) {
-    if (union == null) {
-      throw new IllegalArgumentException("cannot encode a null tagged union");
-    }
-    int index = union.getUnionTag();
-    if (index < 0 || index >= elementCoders.size()) {
-      throw new IllegalArgumentException(
-          "union value index " + index + " not in range [0.." +
-          (elementCoders.size() - 1) + "]");
-    }
-    return index;
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void encode(
-      RawUnionValue union,
-      OutputStream outStream,
-      Context context)
-      throws IOException, CoderException  {
-    int index = getIndexForEncoding(union);
-    // Write out the union tag.
-    VarInt.encode(index, outStream);
-
-    // Write out the actual value.
-    Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
-    coder.encode(
-        union.getValue(),
-        outStream,
-        context);
-  }
-
-  @Override
-  public RawUnionValue decode(InputStream inStream, Context context)
-      throws IOException, CoderException {
-    int index = VarInt.decodeInt(inStream);
-    Object value = elementCoders.get(index).decode(inStream, context);
-    return new RawUnionValue(index, value);
-  }
-
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return null;
-  }
-
-  @Override
-  public List<? extends Coder<?>> getComponents() {
-    return elementCoders;
-  }
-
-  /**
-   * Since this coder uses elementCoders.get(index) and coders that are known to run in constant
-   * time, we defer the return value to that coder.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(RawUnionValue union, Context context) {
-    int index = getIndexForEncoding(union);
-    @SuppressWarnings("unchecked")
-    Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
-    return coder.isRegisterByteSizeObserverCheap(union.getValue(), context);
-  }
-
-  /**
-   * Notifies ElementByteSizeObserver about the byte size of the encoded value using this coder.
-   */
-  @Override
-  public void registerByteSizeObserver(
-      RawUnionValue union, ElementByteSizeObserver observer, Context context)
-      throws Exception {
-    int index = getIndexForEncoding(union);
-    // Write out the union tag.
-    observer.update(VarInt.getLength(index));
-    // Write out the actual value.
-    @SuppressWarnings("unchecked")
-    Coder<Object> coder = (Coder<Object>) elementCoders.get(index);
-    coder.registerByteSizeObserver(union.getValue(), observer, context);
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private final List<Coder<?>> elementCoders;
-
-  private UnionCoder(List<Coder<?>> elementCoders) {
-    this.elementCoders = elementCoders;
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic(
-        "UnionCoder is only deterministic if all element coders are",
-        elementCoders);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/package-info.java
deleted file mode 100644
index be8bffa..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/join/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Defines the {@link com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey} transform
- * for joining multiple PCollections.
- */
-package com.google.cloud.dataflow.sdk.transforms.join;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/package-info.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/package-info.java
deleted file mode 100644
index 3c041f6..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/package-info.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Defines {@link com.google.cloud.dataflow.sdk.transforms.PTransform}s for transforming
- * data in a pipeline.
- *
- * <p>A {@link com.google.cloud.dataflow.sdk.transforms.PTransform} is an operation that takes an
- * {@code InputT} (some subtype of {@link com.google.cloud.dataflow.sdk.values.PInput})
- * and produces an
- * {@code OutputT} (some subtype of {@link com.google.cloud.dataflow.sdk.values.POutput}).
- *
- * <p>Common PTransforms include root PTransforms like
- * {@link com.google.cloud.dataflow.sdk.io.TextIO.Read} and
- * {@link com.google.cloud.dataflow.sdk.transforms.Create}, processing and
- * conversion operations like {@link com.google.cloud.dataflow.sdk.transforms.ParDo},
- * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey},
- * {@link com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey},
- * {@link com.google.cloud.dataflow.sdk.transforms.Combine}, and
- * {@link com.google.cloud.dataflow.sdk.transforms.Count}, and outputting
- * PTransforms like
- * {@link com.google.cloud.dataflow.sdk.io.TextIO.Write}.
- *
- * <p>New PTransforms can be created by composing existing PTransforms.
- * Most PTransforms in this package are composites, and users can also create composite PTransforms
- * for their own application-specific logic.
- *
- */
-package com.google.cloud.dataflow.sdk.transforms;
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterAll.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterAll.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterAll.java
deleted file mode 100644
index bb43010..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterAll.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger.OnceTrigger;
-import com.google.cloud.dataflow.sdk.util.ExecutableTrigger;
-import com.google.common.base.Preconditions;
-
-import org.joda.time.Instant;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Create a {@link Trigger} that fires and finishes once after all of its sub-triggers have fired.
- *
- * @param <W> {@link BoundedWindow} subclass used to represent the windows used by this
- *            {@code Trigger}
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class AfterAll<W extends BoundedWindow> extends OnceTrigger<W> {
-
-  private AfterAll(List<Trigger<W>> subTriggers) {
-    super(subTriggers);
-    Preconditions.checkArgument(subTriggers.size() > 1);
-  }
-
-  /**
-   * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers.
-   */
-  @SafeVarargs
-  public static <W extends BoundedWindow> OnceTrigger<W> of(
-      OnceTrigger<W>... triggers) {
-    return new AfterAll<W>(Arrays.<Trigger<W>>asList(triggers));
-  }
-
-  @Override
-  public void onElement(OnElementContext c) throws Exception {
-    for (ExecutableTrigger<W> subTrigger : c.trigger().unfinishedSubTriggers()) {
-      // Since subTriggers are all OnceTriggers, they must either CONTINUE or FIRE_AND_FINISH.
-      // invokeElement will automatically mark the finish bit if they return FIRE_AND_FINISH.
-      subTrigger.invokeOnElement(c);
-    }
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-    for (ExecutableTrigger<W> subTrigger : c.trigger().subTriggers()) {
-      subTrigger.invokeOnMerge(c);
-    }
-    boolean allFinished = true;
-    for (ExecutableTrigger<W> subTrigger1 : c.trigger().subTriggers()) {
-      allFinished &= c.forTrigger(subTrigger1).trigger().isFinished();
-    }
-    c.trigger().setFinished(allFinished);
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(W window) {
-    // This trigger will fire after the latest of its sub-triggers.
-    Instant deadline = BoundedWindow.TIMESTAMP_MIN_VALUE;
-    for (Trigger<W> subTrigger : subTriggers) {
-      Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window);
-      if (deadline.isBefore(subDeadline)) {
-        deadline = subDeadline;
-      }
-    }
-    return deadline;
-  }
-
-  @Override
-  public OnceTrigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
-    return new AfterAll<W>(continuationTriggers);
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true} if all subtriggers return {@code true}.
-   */
-  @Override
-  public boolean shouldFire(TriggerContext context) throws Exception {
-    for (ExecutableTrigger<W> subtrigger : context.trigger().subTriggers()) {
-      if (!context.forTrigger(subtrigger).trigger().isFinished()
-          && !subtrigger.invokeShouldFire(context)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Invokes {@link #onFire} for all subtriggers, eliding redundant calls to {@link #shouldFire}
-   * because they all must be ready to fire.
-   */
-  @Override
-  public void onOnlyFiring(TriggerContext context) throws Exception {
-    for (ExecutableTrigger<W> subtrigger : context.trigger().subTriggers()) {
-      subtrigger.invokeOnFire(context);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterDelayFromFirstElement.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterDelayFromFirstElement.java
deleted file mode 100644
index 71968e9..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterDelayFromFirstElement.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.coders.InstantCoder;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.Min;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger.OnceTrigger;
-import com.google.cloud.dataflow.sdk.util.TimeDomain;
-import com.google.cloud.dataflow.sdk.util.state.AccumulatorCombiningState;
-import com.google.cloud.dataflow.sdk.util.state.CombiningState;
-import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateMerging;
-import com.google.cloud.dataflow.sdk.util.state.StateTag;
-import com.google.cloud.dataflow.sdk.util.state.StateTags;
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.util.List;
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * A base class for triggers that happen after a processing time delay from the arrival
- * of the first element in a pane.
- *
- * <p>This class is for internal use only and may change at any time.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public abstract class AfterDelayFromFirstElement<W extends BoundedWindow> extends OnceTrigger<W> {
-
-  protected static final List<SerializableFunction<Instant, Instant>> IDENTITY =
-      ImmutableList.<SerializableFunction<Instant, Instant>>of();
-
-  protected static final StateTag<Object, AccumulatorCombiningState<Instant,
-                                              Combine.Holder<Instant>, Instant>> DELAYED_UNTIL_TAG =
-      StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
-          "delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder()));
-
-  /**
-   * To complete an implementation, return the desired time from the TriggerContext.
-   */
-  @Nullable
-  public abstract Instant getCurrentTime(Trigger<W>.TriggerContext context);
-
-  /**
-   * To complete an implementation, return a new instance like this one, but incorporating
-   * the provided timestamp mapping functions. Generally should be used by calling the
-   * constructor of this class from the constructor of the subclass.
-   */
-  protected abstract AfterDelayFromFirstElement<W> newWith(
-      List<SerializableFunction<Instant, Instant>> transform);
-
-  /**
-   * A list of timestampMappers m1, m2, m3, ... m_n considered to be composed in sequence. The
-   * overall mapping for an instance `instance` is `m_n(... m3(m2(m1(instant))`,
-   * implemented via #computeTargetTimestamp
-   */
-  protected final List<SerializableFunction<Instant, Instant>> timestampMappers;
-
-  private final TimeDomain timeDomain;
-
-  public AfterDelayFromFirstElement(
-      TimeDomain timeDomain,
-      List<SerializableFunction<Instant, Instant>> timestampMappers) {
-    super(null);
-    this.timestampMappers = timestampMappers;
-    this.timeDomain = timeDomain;
-  }
-
-  private Instant getTargetTimestamp(OnElementContext c) {
-    return computeTargetTimestamp(c.currentProcessingTime());
-  }
-
-  /**
-   * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater
-   * than the timestamp.
-   *
-   * <p>TODO: Consider sharing this with FixedWindows, and bring over the equivalent of
-   * CalendarWindows.
-   */
-  public AfterDelayFromFirstElement<W> alignedTo(final Duration size, final Instant offset) {
-    return newWith(new AlignFn(size, offset));
-  }
-
-  /**
-   * Aligns the time to be the smallest multiple of {@code size} greater than the timestamp
-   * since the epoch.
-   */
-  public AfterDelayFromFirstElement<W> alignedTo(final Duration size) {
-    return alignedTo(size, new Instant(0));
-  }
-
-  /**
-   * Adds some delay to the original target time.
-   *
-   * @param delay the delay to add
-   * @return An updated time trigger that will wait the additional time before firing.
-   */
-  public AfterDelayFromFirstElement<W> plusDelayOf(final Duration delay) {
-    return newWith(new DelayFn(delay));
-  }
-
-  /**
-   * @deprecated This will be removed in the next major version. Please use only
-   *             {@link #plusDelayOf} and {@link #alignedTo}.
-   */
-  @Deprecated
-  public OnceTrigger<W> mappedTo(SerializableFunction<Instant, Instant> timestampMapper) {
-    return newWith(timestampMapper);
-  }
-
-  @Override
-  public boolean isCompatible(Trigger<?> other) {
-    if (!getClass().equals(other.getClass())) {
-      return false;
-    }
-
-    AfterDelayFromFirstElement<?> that = (AfterDelayFromFirstElement<?>) other;
-    return this.timestampMappers.equals(that.timestampMappers);
-  }
-
-
-  private AfterDelayFromFirstElement<W> newWith(
-      SerializableFunction<Instant, Instant> timestampMapper) {
-    return newWith(
-        ImmutableList.<SerializableFunction<Instant, Instant>>builder()
-            .addAll(timestampMappers)
-            .add(timestampMapper)
-            .build());
-  }
-
-  @Override
-  public void prefetchOnElement(StateAccessor<?> state) {
-    state.access(DELAYED_UNTIL_TAG).readLater();
-  }
-
-  @Override
-  public void onElement(OnElementContext c) throws Exception {
-    CombiningState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG);
-    Instant oldDelayUntil = delayUntilState.read();
-
-    // Since processing time can only advance, resulting in target wake-up times we would
-    // ignore anyhow, we don't bother with it if it is already set.
-    if (oldDelayUntil != null) {
-      return;
-    }
-
-    Instant targetTimestamp = getTargetTimestamp(c);
-    delayUntilState.add(targetTimestamp);
-    c.setTimer(targetTimestamp, timeDomain);
-  }
-
-  @Override
-  public void prefetchOnMerge(MergingStateAccessor<?, W> state) {
-    super.prefetchOnMerge(state);
-    StateMerging.prefetchCombiningValues(state, DELAYED_UNTIL_TAG);
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-    // NOTE: We could try to delete all timers which are still active, but we would
-    // need access to a timer context for each merging window.
-    // for (CombiningValueStateInternal<Instant, Combine.Holder<Instant>, Instant> state :
-    //    c.state().accessInEachMergingWindow(DELAYED_UNTIL_TAG).values()) {
-    //   Instant timestamp = state.get().read();
-    //   if (timestamp != null) {
-    //     <context for merging window>.deleteTimer(timestamp, timeDomain);
-    //   }
-    // }
-    // Instead let them fire and be ignored.
-
-    // If the trigger is already finished, there is no way it will become re-activated
-    if (c.trigger().isFinished()) {
-      StateMerging.clear(c.state(), DELAYED_UNTIL_TAG);
-      // NOTE: We do not attempt to delete  the timers.
-      return;
-    }
-
-    // Determine the earliest point across all the windows, and delay to that.
-    StateMerging.mergeCombiningValues(c.state(), DELAYED_UNTIL_TAG);
-
-    Instant earliestTargetTime = c.state().access(DELAYED_UNTIL_TAG).read();
-    if (earliestTargetTime != null) {
-      c.setTimer(earliestTargetTime, timeDomain);
-    }
-  }
-
-  @Override
-  public void prefetchShouldFire(StateAccessor<?> state) {
-    state.access(DELAYED_UNTIL_TAG).readLater();
-  }
-
-  @Override
-  public void clear(TriggerContext c) throws Exception {
-    c.state().access(DELAYED_UNTIL_TAG).clear();
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(W window) {
-    return BoundedWindow.TIMESTAMP_MAX_VALUE;
-  }
-
-  @Override
-  public boolean shouldFire(Trigger<W>.TriggerContext context) throws Exception {
-    Instant delayedUntil = context.state().access(DELAYED_UNTIL_TAG).read();
-    return delayedUntil != null
-        && getCurrentTime(context) != null
-        && getCurrentTime(context).isAfter(delayedUntil);
-  }
-
-  @Override
-  protected void onOnlyFiring(Trigger<W>.TriggerContext context) throws Exception {
-    clear(context);
-  }
-
-  protected Instant computeTargetTimestamp(Instant time) {
-    Instant result = time;
-    for (SerializableFunction<Instant, Instant> timestampMapper : timestampMappers) {
-      result = timestampMapper.apply(result);
-    }
-    return result;
-  }
-
-  /**
-   * A {@link SerializableFunction} to delay the timestamp at which this triggers fires.
-   */
-  private static final class DelayFn implements SerializableFunction<Instant, Instant> {
-    private final Duration delay;
-
-    public DelayFn(Duration delay) {
-      this.delay = delay;
-    }
-
-    @Override
-    public Instant apply(Instant input) {
-      return input.plus(delay);
-    }
-
-    @Override
-    public boolean equals(Object object) {
-      if (object == this) {
-        return true;
-      }
-
-      if (!(object instanceof DelayFn)) {
-        return false;
-      }
-
-      return this.delay.equals(((DelayFn) object).delay);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(delay);
-    }
-  }
-
-  /**
-   * A {@link SerializableFunction} to align an instant to the nearest interval boundary.
-   */
-  static final class AlignFn implements SerializableFunction<Instant, Instant> {
-    private final Duration size;
-    private final Instant offset;
-
-
-    /**
-     * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater
-     * than the timestamp.
-     */
-    public AlignFn(Duration size, Instant offset) {
-      this.size = size;
-      this.offset = offset;
-    }
-
-    @Override
-    public Instant apply(Instant point) {
-      long millisSinceStart = new Duration(offset, point).getMillis() % size.getMillis();
-      return millisSinceStart == 0 ? point : point.plus(size).minus(millisSinceStart);
-    }
-
-    @Override
-    public boolean equals(Object object) {
-      if (object == this) {
-        return true;
-      }
-
-      if (!(object instanceof AlignFn)) {
-        return false;
-      }
-
-      AlignFn other = (AlignFn) object;
-      return other.size.equals(this.size)
-          && other.offset.equals(this.offset);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(size, offset);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterEach.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterEach.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterEach.java
deleted file mode 100644
index 4b052fa..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterEach.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.util.ExecutableTrigger;
-
-import org.joda.time.Instant;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * A composite {@link Trigger} that executes its sub-triggers in order.
- * Only one sub-trigger is executing at a time,
- * and any time it fires the {@code AfterEach} fires. When the currently executing
- * sub-trigger finishes, the {@code AfterEach} starts executing the next sub-trigger.
- *
- * <p>{@code AfterEach.inOrder(t1, t2, ...)} finishes when all of the sub-triggers have finished.
- *
- * <p>The following properties hold:
- * <ul>
- *   <li> {@code AfterEach.inOrder(AfterEach.inOrder(a, b), c)} behaves the same as
- *   {@code AfterEach.inOrder(a, b, c)} and {@code AfterEach.inOrder(a, AfterEach.inOrder(b, c)}.
- *   <li> {@code AfterEach.inOrder(Repeatedly.forever(a), b)} behaves the same as
- *   {@code Repeatedly.forever(a)}, since the repeated trigger never finishes.
- * </ul>
- *
- * @param <W> {@link BoundedWindow} subclass used to represent the windows used by this
- *            {@code Trigger}
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class AfterEach<W extends BoundedWindow> extends Trigger<W> {
-
-  private AfterEach(List<Trigger<W>> subTriggers) {
-    super(subTriggers);
-    checkArgument(subTriggers.size() > 1);
-  }
-
-  /**
-   * Returns an {@code AfterEach} {@code Trigger} with the given subtriggers.
-   */
-  @SafeVarargs
-  public static <W extends BoundedWindow> Trigger<W> inOrder(Trigger<W>... triggers) {
-    return new AfterEach<W>(Arrays.<Trigger<W>>asList(triggers));
-  }
-
-  @Override
-  public void onElement(OnElementContext c) throws Exception {
-    if (!c.trigger().isMerging()) {
-      // If merges are not possible, we need only run the first unfinished subtrigger
-      c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
-    } else {
-      // If merges are possible, we need to run all subtriggers in parallel
-      for (ExecutableTrigger<W> subTrigger :  c.trigger().subTriggers()) {
-        // Even if the subTrigger is done, it may be revived via merging and must have
-        // adequate state.
-        subTrigger.invokeOnElement(c);
-      }
-    }
-  }
-
-  @Override
-  public void onMerge(OnMergeContext context) throws Exception {
-    // If merging makes a subtrigger no-longer-finished, it will automatically
-    // begin participating in shouldFire and onFire appropriately.
-
-    // All the following triggers are retroactively "not started" but that is
-    // also automatic because they are cleared whenever this trigger
-    // fires.
-    boolean priorTriggersAllFinished = true;
-    for (ExecutableTrigger<W> subTrigger : context.trigger().subTriggers()) {
-      if (priorTriggersAllFinished) {
-        subTrigger.invokeOnMerge(context);
-        priorTriggersAllFinished &= context.forTrigger(subTrigger).trigger().isFinished();
-      } else {
-        subTrigger.invokeClear(context);
-      }
-    }
-    updateFinishedState(context);
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(W window) {
-    // This trigger will fire at least once when the first trigger in the sequence
-    // fires at least once.
-    return subTriggers.get(0).getWatermarkThatGuaranteesFiring(window);
-  }
-
-  @Override
-  public Trigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
-    return Repeatedly.forever(new AfterFirst<W>(continuationTriggers));
-  }
-
-  @Override
-  public boolean shouldFire(Trigger<W>.TriggerContext context) throws Exception {
-    ExecutableTrigger<W> firstUnfinished = context.trigger().firstUnfinishedSubTrigger();
-    return firstUnfinished.invokeShouldFire(context);
-  }
-
-  @Override
-  public void onFire(Trigger<W>.TriggerContext context) throws Exception {
-    context.trigger().firstUnfinishedSubTrigger().invokeOnFire(context);
-
-    // Reset all subtriggers if in a merging context; any may be revived by merging so they are
-    // all run in parallel for each pending pane.
-    if (context.trigger().isMerging()) {
-      for (ExecutableTrigger<W> subTrigger : context.trigger().subTriggers()) {
-        subTrigger.invokeClear(context);
-      }
-    }
-
-    updateFinishedState(context);
-  }
-
-  private void updateFinishedState(TriggerContext context) {
-    context.trigger().setFinished(context.trigger().firstUnfinishedSubTrigger() == null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterFirst.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterFirst.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterFirst.java
deleted file mode 100644
index 29b19bf..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterFirst.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger.OnceTrigger;
-import com.google.cloud.dataflow.sdk.util.ExecutableTrigger;
-import com.google.common.base.Preconditions;
-
-import org.joda.time.Instant;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Create a composite {@link Trigger} that fires once after at least one of its sub-triggers have
- * fired.
- *
- * @param <W> {@link BoundedWindow} subclass used to represent the windows used by this
- *            {@code Trigger}
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class AfterFirst<W extends BoundedWindow> extends OnceTrigger<W> {
-
-  AfterFirst(List<Trigger<W>> subTriggers) {
-    super(subTriggers);
-    Preconditions.checkArgument(subTriggers.size() > 1);
-  }
-
-  /**
-   * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers.
-   */
-  @SafeVarargs
-  public static <W extends BoundedWindow> OnceTrigger<W> of(
-      OnceTrigger<W>... triggers) {
-    return new AfterFirst<W>(Arrays.<Trigger<W>>asList(triggers));
-  }
-
-  @Override
-  public void onElement(OnElementContext c) throws Exception {
-    for (ExecutableTrigger<W> subTrigger : c.trigger().subTriggers()) {
-      subTrigger.invokeOnElement(c);
-    }
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-    for (ExecutableTrigger<W> subTrigger : c.trigger().subTriggers()) {
-      subTrigger.invokeOnMerge(c);
-    }
-    updateFinishedStatus(c);
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(W window) {
-    // This trigger will fire after the earliest of its sub-triggers.
-    Instant deadline = BoundedWindow.TIMESTAMP_MAX_VALUE;
-    for (Trigger<W> subTrigger : subTriggers) {
-      Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window);
-      if (deadline.isAfter(subDeadline)) {
-        deadline = subDeadline;
-      }
-    }
-    return deadline;
-  }
-
-  @Override
-  public OnceTrigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
-    return new AfterFirst<W>(continuationTriggers);
-  }
-
-  @Override
-  public boolean shouldFire(Trigger<W>.TriggerContext context) throws Exception {
-    for (ExecutableTrigger<W> subtrigger : context.trigger().subTriggers()) {
-      if (context.forTrigger(subtrigger).trigger().isFinished()
-          || subtrigger.invokeShouldFire(context)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  protected void onOnlyFiring(TriggerContext context) throws Exception {
-    for (ExecutableTrigger<W> subtrigger : context.trigger().subTriggers()) {
-      TriggerContext subContext = context.forTrigger(subtrigger);
-      if (subtrigger.invokeShouldFire(subContext)) {
-        // If the trigger is ready to fire, then do whatever it needs to do.
-        subtrigger.invokeOnFire(subContext);
-      } else {
-        // If the trigger is not ready to fire, it is nonetheless true that whatever
-        // pending pane it was tracking is now gone.
-        subtrigger.invokeClear(subContext);
-      }
-    }
-  }
-
-  private void updateFinishedStatus(TriggerContext c) {
-    boolean anyFinished = false;
-    for (ExecutableTrigger<W> subTrigger : c.trigger().subTriggers()) {
-      anyFinished |= c.forTrigger(subTrigger).trigger().isFinished();
-    }
-    c.trigger().setFinished(anyFinished);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterPane.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterPane.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterPane.java
deleted file mode 100644
index 28c8560..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterPane.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger.OnceTrigger;
-import com.google.cloud.dataflow.sdk.util.state.AccumulatorCombiningState;
-import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
-import com.google.cloud.dataflow.sdk.util.state.StateMerging;
-import com.google.cloud.dataflow.sdk.util.state.StateTag;
-import com.google.cloud.dataflow.sdk.util.state.StateTags;
-
-import org.joda.time.Instant;
-
-import java.util.List;
-import java.util.Objects;
-
-/**
- * {@link Trigger}s that fire based on properties of the elements in the current pane.
- *
- * @param <W> {@link BoundedWindow} subclass used to represent the windows used by this
- *            {@link Trigger}
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class AfterPane<W extends BoundedWindow> extends OnceTrigger<W>{
-
-private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>>
-      ELEMENTS_IN_PANE_TAG =
-      StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal(
-          "count", VarLongCoder.of(), new Sum.SumLongFn()));
-
-  private final int countElems;
-
-  private AfterPane(int countElems) {
-    super(null);
-    this.countElems = countElems;
-  }
-
-  /**
-   * Creates a trigger that fires when the pane contains at least {@code countElems} elements.
-   */
-  public static <W extends BoundedWindow> AfterPane<W> elementCountAtLeast(int countElems) {
-    return new AfterPane<>(countElems);
-  }
-
-  @Override
-  public void onElement(OnElementContext c) throws Exception {
-    c.state().access(ELEMENTS_IN_PANE_TAG).add(1L);
-  }
-
-  @Override
-  public void prefetchOnMerge(MergingStateAccessor<?, W> state) {
-    super.prefetchOnMerge(state);
-    StateMerging.prefetchCombiningValues(state, ELEMENTS_IN_PANE_TAG);
-  }
-
-  @Override
-  public void onMerge(OnMergeContext context) throws Exception {
-    // If we've already received enough elements and finished in some window,
-    // then this trigger is just finished.
-    if (context.trigger().finishedInAnyMergingWindow()) {
-      context.trigger().setFinished(true);
-      StateMerging.clear(context.state(), ELEMENTS_IN_PANE_TAG);
-      return;
-    }
-
-    // Otherwise, compute the sum of elements in all the active panes.
-    StateMerging.mergeCombiningValues(context.state(), ELEMENTS_IN_PANE_TAG);
-  }
-
-  @Override
-  public void prefetchShouldFire(StateAccessor<?> state) {
-    state.access(ELEMENTS_IN_PANE_TAG).readLater();
-  }
-
-  @Override
-  public boolean shouldFire(Trigger<W>.TriggerContext context) throws Exception {
-    long count = context.state().access(ELEMENTS_IN_PANE_TAG).read();
-    return count >= countElems;
-  }
-
-  @Override
-  public void clear(TriggerContext c) throws Exception {
-    c.state().access(ELEMENTS_IN_PANE_TAG).clear();
-  }
-
-  @Override
-  public boolean isCompatible(Trigger<?> other) {
-    return this.equals(other);
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(W window) {
-    return BoundedWindow.TIMESTAMP_MAX_VALUE;
-  }
-
-  @Override
-  public OnceTrigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
-    return AfterPane.elementCountAtLeast(1);
-  }
-
-  @Override
-  public String toString() {
-    return "AfterPane.elementCountAtLeast(" + countElems + ")";
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (!(obj instanceof AfterPane)) {
-      return false;
-    }
-    AfterPane<?> that = (AfterPane<?>) obj;
-    return this.countElems == that.countElems;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(countElems);
-  }
-
-  @Override
-  protected void onOnlyFiring(Trigger<W>.TriggerContext context) throws Exception {
-    clear(context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterProcessingTime.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterProcessingTime.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterProcessingTime.java
deleted file mode 100644
index 7e89902..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterProcessingTime.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.util.TimeDomain;
-
-import org.joda.time.Instant;
-
-import java.util.List;
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * {@code AfterProcessingTime} triggers fire based on the current processing time. They operate in
- * the real-time domain.
- *
- * <p>The time at which to fire the timer can be adjusted via the methods in {@link TimeTrigger},
- * such as {@link TimeTrigger#plusDelayOf} or {@link TimeTrigger#alignedTo}.
- *
- * @param <W> {@link BoundedWindow} subclass used to represent the windows used
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class AfterProcessingTime<W extends BoundedWindow> extends AfterDelayFromFirstElement<W> {
-
-  @Override
-  @Nullable
-  public Instant getCurrentTime(Trigger<W>.TriggerContext context) {
-    return context.currentProcessingTime();
-  }
-
-  private AfterProcessingTime(List<SerializableFunction<Instant, Instant>> transforms) {
-    super(TimeDomain.PROCESSING_TIME, transforms);
-  }
-
-  /**
-   * Creates a trigger that fires when the current processing time passes the processing time
-   * at which this trigger saw the first element in a pane.
-   */
-  public static <W extends BoundedWindow> AfterProcessingTime<W> pastFirstElementInPane() {
-    return new AfterProcessingTime<W>(IDENTITY);
-  }
-
-  @Override
-  protected AfterProcessingTime<W> newWith(
-      List<SerializableFunction<Instant, Instant>> transforms) {
-    return new AfterProcessingTime<W>(transforms);
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(W window) {
-    return BoundedWindow.TIMESTAMP_MAX_VALUE;
-  }
-
-  @Override
-  protected Trigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
-    return new AfterSynchronizedProcessingTime<W>();
-  }
-
-  @Override
-  public String toString() {
-    return "AfterProcessingTime.pastFirstElementInPane(" + timestampMappers + ")";
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (!(obj instanceof AfterProcessingTime)) {
-      return false;
-    }
-    AfterProcessingTime<?> that = (AfterProcessingTime<?>) obj;
-    return Objects.equals(this.timestampMappers, that.timestampMappers);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(getClass(), this.timestampMappers);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
deleted file mode 100644
index 0a274c9..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.util.TimeDomain;
-import com.google.common.base.Objects;
-
-import org.joda.time.Instant;
-
-import java.util.Collections;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-class AfterSynchronizedProcessingTime<W extends BoundedWindow>
-    extends AfterDelayFromFirstElement<W> {
-
-  @Override
-  @Nullable
-  public Instant getCurrentTime(Trigger<W>.TriggerContext context) {
-    return context.currentSynchronizedProcessingTime();
-  }
-
-  public AfterSynchronizedProcessingTime() {
-    super(TimeDomain.SYNCHRONIZED_PROCESSING_TIME,
-        Collections.<SerializableFunction<Instant, Instant>>emptyList());
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(W window) {
-    return BoundedWindow.TIMESTAMP_MAX_VALUE;
-  }
-
-  @Override
-  protected Trigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
-    return this;
-  }
-
-  @Override
-  public String toString() {
-    return "AfterSynchronizedProcessingTime.pastFirstElementInPane()";
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    return this == obj || obj instanceof AfterSynchronizedProcessingTime;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(AfterSynchronizedProcessingTime.class);
-  }
-
-  @Override
-  protected AfterSynchronizedProcessingTime<W>
-      newWith(List<SerializableFunction<Instant, Instant>> transforms) {
-    // ignore transforms
-    return this;
-  }
-
-}